Skip to content

Commit 2b4a4ef

Browse files
committed
test
1 parent a96f975 commit 2b4a4ef

9 files changed

Lines changed: 207 additions & 5 deletions

File tree

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
from celery import Celery
22

33
from job_orchestration.executor.compress import celeryconfig
4+
from job_orchestration.executor.telemetry import init_telemetry
45

56
app = Celery("compress")
67
app.config_from_object(celeryconfig)
78

9+
init_telemetry()
10+
811
if "__main__" == __name__:
912
app.start()

components/job-orchestration/job_orchestration/executor/compress/celery_compress.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from job_orchestration.executor.compress.celery import app
66
from job_orchestration.executor.compress.compression_task import compression_entry_point
7+
from job_orchestration.executor.telemetry import shutdown_telemetry
78

89
# Setup logging
910
logger = get_task_logger(__name__)
@@ -12,6 +13,7 @@
1213
@signals.worker_shutdown.connect
1314
def worker_shutdown_handler(signal=None, sender=None, **kwargs):
1415
logger.info("Shutdown signal received.")
16+
shutdown_telemetry()
1517

1618

1719
@app.task(bind=True)

components/job-orchestration/job_orchestration/executor/compress/compression_task.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
s3_put,
3232
)
3333
from clp_py_utils.sql_adapter import SqlAdapter
34+
from opentelemetry.api.metrics import get_meter
3435

3536
from job_orchestration.scheduler.constants import CompressionTaskStatus
3637
from job_orchestration.scheduler.job_config import (
@@ -43,6 +44,21 @@
4344
from job_orchestration.scheduler.task_result import CompressionTaskResult
4445
from job_orchestration.scheduler.utils import is_s3_based_input
4546

47+
# OpenTelemetry counters for compression metrics.
48+
# Created at module-import time; when telemetry is disabled the meter is a
49+
# no-op so these counters silently accept ``add()`` calls.
50+
_compression_meter = get_meter("compression-worker")
51+
_bytes_input_counter = _compression_meter.create_counter(
52+
"clp.compression.bytes_input_total",
53+
description="Total bytes of uncompressed log data input for compression",
54+
unit="By",
55+
)
56+
_bytes_output_counter = _compression_meter.create_counter(
57+
"clp.compression.bytes_output_total",
58+
description="Total bytes of compressed archive data output from compression",
59+
unit="By",
60+
)
61+
4662

4763
def update_compression_task_metadata(db_cursor, task_id, kv):
4864
if not len(kv):
@@ -628,6 +644,11 @@ def compression_entry_point(
628644
duration = (datetime.datetime.now() - start_time).total_seconds()
629645
logger.info(f"[job_id={job_id} task_id={task_id}] COMPRESSION COMPLETED.")
630646

647+
# Emit telemetry counters for bytes input/output.
648+
# Even on partial failure these represent actual work done.
649+
_bytes_input_counter.add(worker_output["total_uncompressed_size"])
650+
_bytes_output_counter.add(worker_output["total_compressed_size"])
651+
631652
with (
632653
closing(sql_adapter.create_connection(True)) as db_conn,
633654
closing(db_conn.cursor(dictionary=True)) as db_cursor,
Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,18 @@
1-
from celery import Celery
1+
from celery import Celery, signals
22

33
from job_orchestration.executor.query import celeryconfig
4+
from job_orchestration.executor.telemetry import init_telemetry, shutdown_telemetry
45

56
app = Celery("query")
67
app.config_from_object(celeryconfig)
78

9+
init_telemetry()
10+
11+
12+
@signals.worker_shutdown.connect
13+
def worker_shutdown_handler(**kwargs):
14+
shutdown_telemetry()
15+
16+
817
if "__main__" == __name__:
918
app.start()

components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323

2424
from job_orchestration.executor.query.celery import app
2525
from job_orchestration.executor.query.utils import (
26+
_bytes_output_counter,
27+
emit_bytes_scanned,
2628
report_task_failure,
2729
run_query_task,
2830
)
@@ -243,7 +245,7 @@ def extract_stream(
243245
start_time=start_time,
244246
)
245247

246-
task_results, task_stdout_str = run_query_task(
248+
task_results, task_stdout_str, stdout_byte_count = run_query_task(
247249
sql_adapter=sql_adapter,
248250
logger=logger,
249251
clp_logs_dir=clp_logs_dir,
@@ -255,6 +257,17 @@ def extract_stream(
255257
start_time=start_time,
256258
)
257259

260+
# Emit telemetry counters.
261+
# bytes_scanned: the uncompressed size of the archive that was extracted.
262+
emit_bytes_scanned(
263+
sql_adapter=sql_adapter,
264+
clp_metadata_db_conn_params=clp_metadata_db_conn_params,
265+
archive_id=archive_id,
266+
dataset=dataset,
267+
logger=logger,
268+
)
269+
_bytes_output_counter.add(stdout_byte_count)
270+
258271
if enable_s3_upload and QueryTaskStatus.SUCCEEDED == task_results.status:
259272
logger.info("Uploading streams to S3...")
260273

components/job-orchestration/job_orchestration/executor/query/fs_search_task.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222

2323
from job_orchestration.executor.query.celery import app
2424
from job_orchestration.executor.query.utils import (
25+
_bytes_output_counter,
26+
emit_bytes_scanned,
2527
report_task_failure,
2628
run_query_task,
2729
)
@@ -262,7 +264,7 @@ def search(
262264
start_time=start_time,
263265
)
264266

265-
task_results, _ = run_query_task(
267+
task_results, _, stdout_byte_count = run_query_task(
266268
sql_adapter=sql_adapter,
267269
logger=logger,
268270
clp_logs_dir=clp_logs_dir,
@@ -274,6 +276,17 @@ def search(
274276
start_time=start_time,
275277
)
276278

279+
# Emit telemetry counters.
280+
# bytes_scanned: the uncompressed size of the archive that was queried.
281+
emit_bytes_scanned(
282+
sql_adapter=sql_adapter,
283+
clp_metadata_db_conn_params=clp_metadata_db_conn_params,
284+
archive_id=archive_id,
285+
dataset=dataset,
286+
logger=logger,
287+
)
288+
_bytes_output_counter.add(stdout_byte_count)
289+
277290
storage_config = worker_config.stream_output.storage
278291
if (
279292
StorageType.S3 == storage_config.type

components/job-orchestration/job_orchestration/executor/query/utils.py

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,27 @@
99
from typing import Any
1010

1111
from clp_py_utils.clp_config import QUERY_TASKS_TABLE_NAME
12+
from clp_py_utils.clp_metadata_db_utils import get_archives_table_name
1213
from clp_py_utils.sql_adapter import SqlAdapter
14+
from opentelemetry.api.metrics import get_meter
1315

1416
from job_orchestration.scheduler.scheduler_data import QueryTaskResult, QueryTaskStatus
1517

18+
# OpenTelemetry counters for query metrics.
19+
# Created at module-import time; when telemetry is disabled the meter is a
20+
# no-op so these counters silently accept ``add()`` calls.
21+
_query_meter = get_meter("query-worker")
22+
_bytes_scanned_counter = _query_meter.create_counter(
23+
"clp.query.bytes_scanned_total",
24+
description="Total bytes of uncompressed log data scanned during queries",
25+
unit="By",
26+
)
27+
_bytes_output_counter = _query_meter.create_counter(
28+
"clp.query.bytes_output_total",
29+
description="Total bytes of query results output",
30+
unit="By",
31+
)
32+
1633

1734
def get_task_log_file_path(clp_logs_dir: Path, job_id: str, task_id: int) -> Path:
1835
worker_logs_dir = clp_logs_dir / job_id
@@ -49,7 +66,11 @@ def run_query_task(
4966
job_id: str,
5067
task_id: int,
5168
start_time: datetime.datetime,
52-
) -> tuple[QueryTaskResult, str]:
69+
) -> tuple[QueryTaskResult, str, int]:
70+
"""Run a query subprocess and return the result, stdout string, and stdout byte count.
71+
72+
:return: Tuple of (task_result, stdout_str, stdout_byte_count).
73+
"""
5374
clo_log_path = get_task_log_file_path(clp_logs_dir, job_id, task_id)
5475
clo_log_file = open(clo_log_path, "w")
5576

@@ -113,7 +134,7 @@ def sigterm_handler(_signo, _stack_frame):
113134
if QueryTaskStatus.FAILED == task_status:
114135
task_result.error_log_path = str(clo_log_path)
115136

116-
return task_result, stdout_data.decode("utf-8")
137+
return task_result, stdout_data.decode("utf-8"), len(stdout_data)
117138

118139

119140
def update_query_task_metadata(
@@ -134,3 +155,32 @@ def update_query_task_metadata(
134155
WHERE id = {task_id}
135156
"""
136157
db_cursor.execute(query)
158+
159+
160+
def emit_bytes_scanned(
161+
sql_adapter: SqlAdapter,
162+
clp_metadata_db_conn_params: dict,
163+
archive_id: str,
164+
dataset: str | None,
165+
logger: Logger,
166+
) -> None:
167+
"""Emit the ``clp.query.bytes_scanned_total`` counter by looking up the
168+
archive's ``uncompressed_size`` from the metadata database.
169+
"""
170+
try:
171+
with (
172+
closing(sql_adapter.create_connection(True)) as db_conn,
173+
closing(db_conn.cursor(dictionary=True)) as db_cursor,
174+
):
175+
table_name = get_archives_table_name(
176+
clp_metadata_db_conn_params["table_prefix"], dataset
177+
)
178+
db_cursor.execute(
179+
f"SELECT uncompressed_size FROM {table_name} WHERE id = %s",
180+
(archive_id,),
181+
)
182+
row = db_cursor.fetchone()
183+
if row is not None:
184+
_bytes_scanned_counter.add(row["uncompressed_size"])
185+
except Exception:
186+
logger.exception("Failed to emit bytes_scanned_total telemetry")
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
"""OpenTelemetry metrics initialization for CLP workers.
2+
3+
This module mirrors the Rust implementation in clp-rust-utils/src/telemetry.rs,
4+
providing equivalent telemetry initialization, shutdown, and consent-checking
5+
behaviour for Python Celery workers.
6+
7+
NOTE: TELEMETRY_DISABLE_VALUES must be kept consistent with the Rust
8+
implementation in ``clp-rust-utils/src/telemetry.rs`` and the Python
9+
implementation in ``clp_package_utils/scripts/start_clp.py``.
10+
"""
11+
12+
import os
13+
14+
from opentelemetry.api.metrics import set_meter_provider
15+
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
16+
from opentelemetry.sdk.metrics import MeterProvider
17+
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
18+
from opentelemetry.sdk.resources import Resource
19+
20+
# Values accepted by CLP_DISABLE_TELEMETRY and DO_NOT_TRACK to disable telemetry.
21+
TELEMETRY_DISABLE_VALUES = ("1", "true", "yes", "y")
22+
23+
# Default OTLP HTTP endpoint for the OpenTelemetry Collector.
24+
_DEFAULT_OTEL_ENDPOINT = "http://localhost:4318"
25+
26+
_meter_provider: MeterProvider | None = None
27+
28+
29+
def is_telemetry_disabled() -> bool:
30+
"""Check if telemetry is disabled via environment variables.
31+
32+
Returns True if either ``CLP_DISABLE_TELEMETRY`` or ``DO_NOT_TRACK`` is
33+
set to one of :data:`TELEMETRY_DISABLE_VALUES`.
34+
"""
35+
disable_telemetry = os.getenv("CLP_DISABLE_TELEMETRY")
36+
if disable_telemetry is not None and disable_telemetry.lower() in TELEMETRY_DISABLE_VALUES:
37+
return True
38+
39+
do_not_track = os.getenv("DO_NOT_TRACK")
40+
if do_not_track is not None and do_not_track.lower() in TELEMETRY_DISABLE_VALUES:
41+
return True
42+
43+
return False
44+
45+
46+
def init_telemetry() -> None:
47+
"""Initialize OpenTelemetry metrics collection.
48+
49+
Sets the global :class:`MeterProvider` so that subsequent calls to
50+
:func:`opentelemetry.api.metrics.get_meter` return a real meter. When
51+
telemetry is disabled (via env vars) this function is a no-op; callers
52+
that create counters on the resulting no-op meter will silently do
53+
nothing.
54+
55+
The collector endpoint is read from the standard
56+
``OTEL_EXPORTER_OTLP_ENDPOINT`` environment variable. ``OTEL_SERVICE_NAME``
57+
and ``OTEL_RESOURCE_ATTRIBUTES`` are picked up automatically by
58+
:meth:`Resource.create`.
59+
"""
60+
global _meter_provider
61+
62+
if is_telemetry_disabled():
63+
return
64+
65+
endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", _DEFAULT_OTEL_ENDPOINT)
66+
metrics_endpoint = f"{endpoint}/v1/metrics"
67+
68+
exporter = OTLPMetricExporter(endpoint=metrics_endpoint)
69+
reader = PeriodicExportingMetricReader(exporter)
70+
provider = MeterProvider(
71+
resource=Resource.create(),
72+
metric_readers=[reader],
73+
)
74+
75+
set_meter_provider(provider)
76+
_meter_provider = provider
77+
78+
79+
def shutdown_telemetry() -> None:
80+
"""Shutdown OpenTelemetry metrics, flushing any pending metric exports.
81+
82+
Safe to call even when telemetry was never initialized or was disabled.
83+
"""
84+
global _meter_provider
85+
86+
if _meter_provider is not None:
87+
_meter_provider.shutdown()
88+
_meter_provider = None

components/job-orchestration/pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ dependencies = [
1111
"mariadb>=1.1.14",
1212
"msgpack>=1.1.2",
1313
"mysql-connector-python>=9.5.0",
14+
"opentelemetry-api>=1.20.0",
15+
"opentelemetry-exporter-otlp-proto-http>=0.41b0",
16+
"opentelemetry-sdk>=1.20.0",
1417
"pika>=1.3.2",
1518
"pydantic>=2.12.5",
1619
"pymongo>=4.16.0",

0 commit comments

Comments
 (0)