Skip to content

Commit bbab948

Browse files
committed
change metric
1 parent 7bd17a3 commit bbab948

11 files changed

Lines changed: 57 additions & 141 deletions

File tree

components/core/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ endif()
2222

2323
if (CMAKE_BUILD_TYPE MATCHES "Release")
2424
include(CheckIPOSupported)
25-
check_ipo_supported(RESULT IPO_SUPPORTED)
25+
check_ipo_supported(RESULT IPO_SUPPORTED OUTPUT)
2626
if(IPO_SUPPORTED)
2727
message(STATUS "Link-time optimization enabled.")
2828
set(CMAKE_INTERPROCEDURAL_OPTIMIZATION ON)

components/core/src/clp/clo/clo.cpp

Lines changed: 7 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,7 @@ static SearchFilesResult search_file(
8282
Query& query,
8383
Archive& archive,
8484
MetadataDB::FileIterator& file_metadata_ix,
85-
std::unique_ptr<OutputHandler>& output_handler,
86-
size_t& total_bytes_output
85+
std::unique_ptr<OutputHandler>& output_handler
8786
);
8887
/**
8988
* Searches all files referenced by a given database cursor
@@ -98,9 +97,7 @@ static void search_files(
9897
Archive& archive,
9998
MetadataDB::FileIterator& file_metadata_ix,
10099
std::unique_ptr<OutputHandler>& output_handler,
101-
std::set<clp::segment_id_t> const& segments_to_search,
102-
size_t& total_bytes_scanned,
103-
size_t& total_bytes_output
100+
std::set<clp::segment_id_t> const& segments_to_search
104101
);
105102
/**
106103
* Searches an archive with the given path
@@ -390,8 +387,7 @@ static SearchFilesResult search_file(
390387
Query& query,
391388
Archive& archive,
392389
MetadataDB::FileIterator& file_metadata_ix,
393-
std::unique_ptr<OutputHandler>& output_handler,
394-
size_t& total_bytes_output
390+
std::unique_ptr<OutputHandler>& output_handler
395391
) {
396392
File compressed_file;
397393
Message encoded_message;
@@ -419,7 +415,6 @@ static SearchFilesResult search_file(
419415
decompressed_message
420416
))
421417
{
422-
total_bytes_output += decompressed_message.length();
423418
if (ErrorCode_Success
424419
!= output_handler->add_result(
425420
compressed_file.get_orig_path(),
@@ -442,9 +437,7 @@ void search_files(
442437
Archive& archive,
443438
MetadataDB::FileIterator& file_metadata_ix,
444439
std::unique_ptr<OutputHandler>& output_handler,
445-
std::set<clp::segment_id_t> const& segments_to_search,
446-
size_t& total_bytes_scanned,
447-
size_t& total_bytes_output
440+
std::set<clp::segment_id_t> const& segments_to_search
448441
) {
449442
if (query.contains_sub_queries()) {
450443
for (; file_metadata_ix.has_next(); file_metadata_ix.next()) {
@@ -456,15 +449,7 @@ void search_files(
456449
continue;
457450
}
458451

459-
total_bytes_scanned += file_metadata_ix.get_num_uncompressed_bytes();
460-
461-
auto result = search_file(
462-
query,
463-
archive,
464-
file_metadata_ix,
465-
output_handler,
466-
total_bytes_output
467-
);
452+
auto result = search_file(query, archive, file_metadata_ix, output_handler);
468453
if (SearchFilesResult::OpenFailure == result) {
469454
continue;
470455
}
@@ -478,15 +463,7 @@ void search_files(
478463
continue;
479464
}
480465

481-
total_bytes_scanned += file_metadata_ix.get_num_uncompressed_bytes();
482-
483-
auto result = search_file(
484-
query,
485-
archive,
486-
file_metadata_ix,
487-
output_handler,
488-
total_bytes_output
489-
);
466+
auto result = search_file(query, archive, file_metadata_ix, output_handler);
490467
if (SearchFilesResult::OpenFailure == result) {
491468
continue;
492469
}
@@ -572,16 +549,12 @@ static bool search_archive(
572549
true
573550
);
574551
auto& file_metadata_ix = *file_metadata_ix_ptr;
575-
size_t total_bytes_scanned = 0;
576-
size_t total_bytes_output = 0;
577552
search_files(
578553
query,
579554
archive_reader,
580555
file_metadata_ix,
581556
output_handler,
582-
ids_of_segments_to_search,
583-
total_bytes_scanned,
584-
total_bytes_output
557+
ids_of_segments_to_search
585558
);
586559
file_metadata_ix_ptr.reset(nullptr);
587560

@@ -595,12 +568,6 @@ static bool search_archive(
595568
);
596569
return false;
597570
}
598-
599-
nlohmann::json json_msg;
600-
json_msg["stats"]["bytes_scanned"] = total_bytes_scanned;
601-
json_msg["stats"]["bytes_output"] = total_bytes_output;
602-
std::cout << json_msg.dump(-1, ' ', true, nlohmann::json::error_handler_t::ignore) << std::endl;
603-
604571
return true;
605572
}
606573

components/core/src/clp_s/search/Output.cpp

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
#include "Output.hpp"
22

3-
#include <iostream>
43
#include <memory>
54
#include <vector>
65

7-
#include <nlohmann/json.hpp>
86
#include <spdlog/spdlog.h>
97

108
#include "../../clp/type_utils.hpp"
@@ -91,7 +89,6 @@ bool Output::filter() {
9189
m_archive_reader->open_packed_streams();
9290

9391
std::string message;
94-
size_t total_bytes_output = 0;
9592
auto const archive_id = m_archive_reader->get_archive_id();
9693
for (int32_t schema_id : matched_schemas) {
9794
if (EvaluatedValue::False == m_query_runner.schema_init(schema_id)) {
@@ -115,12 +112,10 @@ bool Output::filter() {
115112
m_query_runner
116113
))
117114
{
118-
total_bytes_output += message.length();
119115
m_output_handler->write(message, timestamp, archive_id, log_event_idx);
120116
}
121117
} else {
122118
while (reader.get_next_message(message, m_query_runner)) {
123-
total_bytes_output += message.length();
124119
m_output_handler->write(message);
125120
}
126121
}
@@ -141,12 +136,6 @@ bool Output::filter() {
141136
);
142137
return false;
143138
}
144-
145-
nlohmann::json json_msg;
146-
json_msg["stats"] = nlohmann::json::object();
147-
json_msg["stats"]["bytes_output"] = total_bytes_output;
148-
std::cout << json_msg.dump(-1, ' ', true, nlohmann::json::error_handler_t::ignore) << std::endl;
149-
150139
return true;
151140
}
152141
} // namespace clp_s::search
Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,9 @@
11
from celery import Celery
2-
from celery.signals import worker_process_init, worker_process_shutdown
3-
from clp_py_utils.telemetry import init_telemetry, shutdown_telemetry
42

53
from job_orchestration.executor.query import celeryconfig
64

75
app = Celery("query")
86
app.config_from_object(celeryconfig)
97

10-
11-
@worker_process_init.connect
12-
def setup_telemetry(**kwargs) -> None:
13-
init_telemetry()
14-
15-
16-
@worker_process_shutdown.connect
17-
def teardown_telemetry(**kwargs) -> None:
18-
shutdown_telemetry()
19-
20-
218
if "__main__" == __name__:
229
app.start()

components/job-orchestration/job_orchestration/executor/query/fs_search_task.py

Lines changed: 1 addition & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import datetime
2-
import json
32
import os
43
from pathlib import Path
54
from typing import Any
@@ -8,19 +7,6 @@
87
from celery.app.task import Task
98
from celery.exceptions import SoftTimeLimitExceeded
109
from celery.utils.log import get_task_logger
11-
from opentelemetry import metrics
12-
13-
meter = metrics.get_meter("clp_py_utils")
14-
bytes_scanned_counter = meter.create_counter(
15-
"clp.query.bytes_scanned_total",
16-
unit="By",
17-
description="Total uncompressed bytes scanned by the query",
18-
)
19-
bytes_output_counter = meter.create_counter(
20-
"clp.query.bytes_output_total", unit="By", description="Total bytes output by the query"
21-
)
22-
23-
2410
from clp_py_utils.clp_config import (
2511
Database,
2612
StorageEngine,
@@ -275,7 +261,7 @@ def search_entry_point(
275261
start_time=start_time,
276262
)
277263

278-
task_results, stdout_data = run_query_task(
264+
task_results, _ = run_query_task(
279265
sql_adapter=sql_adapter,
280266
logger=logger,
281267
clp_logs_dir=clp_logs_dir,
@@ -298,48 +284,6 @@ def search_entry_point(
298284
dest_path = f"{job_id}/{archive_id}"
299285
upload_results_to_s3(task_results, s3_config, src_file, dest_path)
300286

301-
# Telemetry
302-
bytes_scanned = None
303-
bytes_output = None
304-
305-
if stdout_data:
306-
for line in reversed(stdout_data.rsplit("\n", 10)):
307-
if '"stats"' not in line:
308-
continue
309-
try:
310-
data = json.loads(line)
311-
if isinstance(data, dict) and "stats" in data:
312-
stats = data["stats"]
313-
bytes_scanned = stats.get("bytes_scanned", None)
314-
bytes_output = stats.get("bytes_output", None)
315-
break
316-
except json.JSONDecodeError:
317-
pass
318-
319-
status_str = "success" if QueryTaskStatus.SUCCEEDED == task_results.status else "failure"
320-
321-
storage_engine_str = (
322-
"clp_s" if worker_config.package.storage_engine == StorageEngine.CLP_S else "clp"
323-
)
324-
if search_config.aggregation_config is not None:
325-
output_type = "reducer"
326-
elif search_config.network_address is not None:
327-
output_type = "network"
328-
elif search_config.write_to_file:
329-
output_type = "file"
330-
else:
331-
output_type = "results_cache"
332-
333-
attributes = {
334-
"status": status_str,
335-
"clp.storage.engine": storage_engine_str,
336-
"clp.query.output_type": output_type,
337-
}
338-
if bytes_scanned is not None:
339-
bytes_scanned_counter.add(bytes_scanned, attributes)
340-
if bytes_output is not None:
341-
bytes_output_counter.add(bytes_output, attributes)
342-
343287
return task_results.model_dump()
344288

345289

components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@
4444
from clp_py_utils.core import read_yaml_config_file
4545
from clp_py_utils.decorators import exception_default_value
4646
from clp_py_utils.sql_adapter import ConnectionPoolWrapper, SqlAdapter
47+
from clp_py_utils.telemetry import init_telemetry, shutdown_telemetry
48+
from opentelemetry import metrics
49+
from opentelemetry.metrics import Observation
4750
from pydantic import ValidationError
4851

4952
from job_orchestration.executor.query.celery import app
@@ -94,6 +97,37 @@
9497

9598
reducer_connection_queue: asyncio.Queue | None = None
9699

100+
# OpenTelemetry metrics
101+
meter = metrics.get_meter("clp_py_utils")
102+
103+
104+
def _observe_active_jobs(options) -> list[Observation]:
105+
return [Observation(len(active_jobs), {})]
106+
107+
108+
def _observe_outstanding_tasks(options) -> list[Observation]:
109+
total = 0
110+
for job in active_jobs.values():
111+
if isinstance(job, SearchJob):
112+
total += job.num_archives_to_search - job.num_archives_searched
113+
else:
114+
total += 1
115+
return [Observation(total, {})]
116+
117+
118+
active_jobs_gauge = meter.create_observable_gauge(
119+
"clp.query.active_jobs",
120+
callbacks=[_observe_active_jobs],
121+
unit="1",
122+
description="Number of active query jobs",
123+
)
124+
outstanding_tasks_gauge = meter.create_observable_gauge(
125+
"clp.query.outstanding_tasks",
126+
callbacks=[_observe_outstanding_tasks],
127+
unit="1",
128+
description="Total number of outstanding tasks across all active query jobs",
129+
)
130+
97131

98132
class DispatchExecutor:
99133
# Globals for dispatch executor pool
@@ -1137,6 +1171,8 @@ async def handle_jobs(
11371171
async def main(argv: list[str]) -> int:
11381172
global reducer_connection_queue
11391173

1174+
init_telemetry()
1175+
11401176
args_parser = argparse.ArgumentParser(description="Wait for and run query jobs.")
11411177
args_parser.add_argument("--config", "-c", required=True, help="CLP configuration file.")
11421178

@@ -1229,6 +1265,7 @@ async def main(argv: list[str]) -> int:
12291265
except Exception:
12301266
logger.exception("Uncaught exception in job handling loop.")
12311267

1268+
shutdown_telemetry()
12321269
return 0
12331270

12341271

integration-tests/tests/binary_tests/test_log_converter.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,6 @@ def _convert_and_compress(
8989
search_result = search_action.verify_returncode()
9090
assert search_result, search_result.failure_message
9191
lines = search_action.completed_proc.stdout.splitlines()
92-
# clp-s now outputs a stats object at the end of its stdout for telemetry
93-
lines = [line for line in lines if not line.strip().startswith('{"stats":')]
9492
if len(lines) != test_paths.num_log_events:
9593
pytest.fail(
9694
f"Expected {test_paths.num_log_events} log events after conversion, "

integration-tests/tests/utils/fs_validation.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
"""File structure validators."""
22

3-
import json
43
from pathlib import Path
54
from tempfile import NamedTemporaryFile
65
from typing import IO
@@ -41,20 +40,15 @@ def _sort_json_keys_and_rows(json_fp: Path) -> IO[str]:
4140
4241
:param json_fp:
4342
:return: A named temporary file (delete on close) that contains the sorted JSON content.
43+
:raise: RuntimeError if jq is missing or fails due to execution errors.
4444
"""
45-
lines = []
46-
with json_fp.open("r") as f:
47-
for line in f:
48-
if not line.strip():
49-
continue
50-
try:
51-
obj = json.loads(line)
52-
lines.append(json.dumps(obj, separators=(",", ":"), sort_keys=True))
53-
except json.JSONDecodeError:
54-
lines.append(line.strip())
45+
jq_action = NonClpAction(
46+
cmd=[get_binary_path("jq"), "--sort-keys", "--compact-output", ".", str(json_fp)],
47+
)
48+
jq_action.check_returncode()
5549

5650
sorted_fp = NamedTemporaryFile(mode="w+") # noqa: SIM115
57-
sorted_lines = sorted(lines)
51+
sorted_lines = sorted(jq_action.completed_proc.stdout.splitlines())
5852
for line in sorted_lines:
5953
sorted_fp.write(f"{line}\n")
6054
sorted_fp.flush()

0 commit comments

Comments
 (0)