feat(telemetry): Add metrics for query and compression schedulers/workers (fixes #2346).#2288
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds OpenTelemetry metrics and lifecycle wiring across CLP services: a telemetry helper and dependencies; counters in compression tasks; observable gauges in compression and query schedulers; Celery worker signal handlers; deployment, compose, config, and test updates to expose telemetry environment variables. ChangesOpenTelemetry Metrics Integration
Sequence DiagramsequenceDiagram
participant WorkerProcess
participant telemetry_init as init_telemetry()
participant OTLPExporter
participant MeterProvider
participant MetricsAPI as metrics.get_meter(...)
WorkerProcess->>telemetry_init: call on startup (Celery/scheduler)
telemetry_init->>OTLPExporter: create OTLPMetricExporter
telemetry_init->>MeterProvider: create MeterProvider with reader
telemetry_init->>WorkerProcess: metrics.set_meter_provider(...)
WorkerProcess->>MetricsAPI: meter = metrics.get_meter("compression-worker")
WorkerProcess->>MetricsAPI: record counters/gauges (bytes/counts)
WorkerProcess->>telemetry_init: register shutdown (atexit / worker signal)
WorkerProcess->>telemetry_init: call shutdown_telemetry() on exit
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@components/job-orchestration/job_orchestration/executor/compress/compression_task.py`:
- Line 34: Replace the non-public import "from opentelemetry.api.metrics import
get_meter" with the public API import "from opentelemetry.metrics import
get_meter" in compression_task.py; make the same change in telemetry.py and
query/utils.py where opentelemetry.api.metrics is used so all modules import
get_meter (or other metrics symbols) from opentelemetry.metrics to avoid runtime
failures.
In `@components/job-orchestration/job_orchestration/executor/query/utils.py`:
- Around line 175-180: The code interpolates table_name into SQL for
db_cursor.execute which can lead to SQL injection if dataset influences the
name; replace the string interpolation by validating/whitelisting the
dataset/table name and using proper SQL identifier composition (e.g.,
psycopg2.sql.SQL and psycopg2.sql.Identifier) when calling db_cursor.execute, or
ensure get_archives_table_name only returns a safe validated identifier; update
the call site that currently uses table_name and archive_id so the query uses an
escaped Identifier for the table and a parameterized value for id, and add
validation logic around get_archives_table_name/dataset to enforce allowed
characters or a whitelist.
- Line 14: The import line using get_meter from opentelemetry.api.metrics is
invalid and prevents worker startup; change the import to pull get_meter from
the correct public module opentelemetry.metrics (i.e., replace the import of
get_meter in the module where it's used, e.g., in query/utils.py) so the code
imports get_meter from opentelemetry.metrics instead of
opentelemetry.api.metrics.
In `@components/job-orchestration/job_orchestration/executor/telemetry.py`:
- Line 14: Replace the unsupported import path "from opentelemetry.api.metrics
import set_meter_provider" with the supported metrics package by importing
set_meter_provider from "opentelemetry.metrics" instead; update the import in
telemetry.py so any references to set_meter_provider continue to work (no other
code changes required).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 3148caed-aa04-4c16-84a2-6938fb44db0f
📒 Files selected for processing (9)
components/job-orchestration/job_orchestration/executor/compress/celery.pycomponents/job-orchestration/job_orchestration/executor/compress/celery_compress.pycomponents/job-orchestration/job_orchestration/executor/compress/compression_task.pycomponents/job-orchestration/job_orchestration/executor/query/celery.pycomponents/job-orchestration/job_orchestration/executor/query/extract_stream_task.pycomponents/job-orchestration/job_orchestration/executor/query/fs_search_task.pycomponents/job-orchestration/job_orchestration/executor/query/utils.pycomponents/job-orchestration/job_orchestration/executor/telemetry.pycomponents/job-orchestration/pyproject.toml
There was a problem hiding this comment.
Actionable comments posted: 4
♻️ Duplicate comments (1)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (1)
34-34:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winUse the public OpenTelemetry metrics API path.
opentelemetry.api.metricsis not a valid public module; this will raiseModuleNotFoundErrorat import time. The public API isopentelemetry.metrics.🐛 Proposed fix
-from opentelemetry.api.metrics import get_meter +from opentelemetry.metrics import get_meterIn opentelemetry-python 1.20.0, is get_meter importable from opentelemetry.metrics or opentelemetry.api.metrics?🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@components/job-orchestration/job_orchestration/executor/compress/compression_task.py` at line 34, The import uses a non-public path "opentelemetry.api.metrics" which will raise ModuleNotFoundError; change the import to use the public API by importing get_meter from "opentelemetry.metrics" (update the import statement that currently references get_meter to use the public module name) so subsequent uses of get_meter in this file (e.g., within compression_task.py) work with the supported OpenTelemetry public API.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py`:
- Around line 26-27: The code imports the private symbol _bytes_output_counter
from utils which breaks encapsulation; add a public helper
emit_bytes_output(byte_count: int) to utils.py (matching the pattern of
emit_bytes_scanned) that increments/records bytes output and use that from
extract_stream_task.py instead of importing _bytes_output_counter directly;
update any references to _bytes_output_counter in functions like the extract
stream task to call utils.emit_bytes_output(...) and remove the
underscore-prefixed import.
- Around line 260-269: The telemetry (emit_bytes_scanned and
_bytes_output_counter.add) is emitted unconditionally after run_query_task which
records metrics even on failure; change the code to only emit metrics when
task_results.status == QueryTaskStatus.SUCCEEDED by wrapping the
emit_bytes_scanned(...) call and the
_bytes_output_counter.add(stdout_byte_count) call in an if-check against
QueryTaskStatus.SUCCEEDED (use the existing task_results.status), leaving other
failure handling unchanged so failed runs do not increment the bytes-scanned or
bytes-output counters.
In `@components/job-orchestration/job_orchestration/executor/telemetry.py`:
- Around line 65-66: The metrics_endpoint construction should respect
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT and avoid double slashes: replace the
current metrics_endpoint = f"{endpoint}/v1/metrics" with logic that first reads
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT (falling back to
OTEL_EXPORTER_OTLP_ENDPOINT/_DEFAULT_OTEL_ENDPOINT), and if building from the
generic endpoint, normalize by stripping any trailing slash from endpoint before
appending "/v1/metrics" (or use a safe URL join utility) so you never produce
"//v1/metrics"; update references to endpoint, metrics_endpoint and
_DEFAULT_OTEL_ENDPOINT accordingly.
In `@components/job-orchestration/pyproject.toml`:
- Around line 14-16: The opentelemetry exporter dependency
"opentelemetry-exporter-otlp-proto-http>=0.41b0" is using an incorrect/legacy
floor that doesn’t align with the API/SDK pins; update the dependency entry in
pyproject.toml to "opentelemetry-exporter-otlp-proto-http>=1.20.0" so it matches
"opentelemetry-api>=1.20.0" and "opentelemetry-sdk>=1.20.0" (or otherwise
document the intended mapping) to ensure consistent, real-version constraints.
---
Duplicate comments:
In
`@components/job-orchestration/job_orchestration/executor/compress/compression_task.py`:
- Line 34: The import uses a non-public path "opentelemetry.api.metrics" which
will raise ModuleNotFoundError; change the import to use the public API by
importing get_meter from "opentelemetry.metrics" (update the import statement
that currently references get_meter to use the public module name) so subsequent
uses of get_meter in this file (e.g., within compression_task.py) work with the
supported OpenTelemetry public API.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 311a0665-7cd1-46a7-ade5-b9487365c9ac
📒 Files selected for processing (9)
components/job-orchestration/job_orchestration/executor/compress/celery.pycomponents/job-orchestration/job_orchestration/executor/compress/celery_compress.pycomponents/job-orchestration/job_orchestration/executor/compress/compression_task.pycomponents/job-orchestration/job_orchestration/executor/query/celery.pycomponents/job-orchestration/job_orchestration/executor/query/extract_stream_task.pycomponents/job-orchestration/job_orchestration/executor/query/fs_search_task.pycomponents/job-orchestration/job_orchestration/executor/query/utils.pycomponents/job-orchestration/job_orchestration/executor/telemetry.pycomponents/job-orchestration/pyproject.toml
This reverts commit 2b4a4ef.
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
components/core/src/clp_s/search/Output.hpp (1)
41-47: 🧹 Nitpick | 🔵 Trivial | 💤 Low valueDocument the new out-parameter.
The method now takes
bytes_output, but the comment block only describes the return value. Add a@param bytes_outputline clarifying that it is accumulated (added to, not reset) with the total length of emitted messages.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@components/core/src/clp_s/search/Output.hpp` around lines 41 - 47, Update the comment for the method filter(uint64_t& bytes_output) to document the new out-parameter: add a `@param` bytes_output line stating that bytes_output is an output accumulator which the method adds the total length of emitted messages to (it is added to, not reset), and keep the existing `@return` description unchanged so readers understand both the side-effect and the boolean success value.components/core/src/clp/clo/clo.cpp (2)
606-613:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winStray
}will break compilation.The
try/catchblock already closes at Line 609. The extra}on Line 610 prematurely closesmain(), leaving the subsequent initialization and argument-parsing code outside any function. This won't compile.🐛 Proposed fix
} catch (std::exception& e) { // NOTE: We can't log an exception if the logger couldn't be constructed return -1; } - } clp::Profiler::init(); clp::TimestampPattern::init(); clp::telemetry::init();🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@components/core/src/clp/clo/clo.cpp` around lines 606 - 613, There is a stray closing brace that prematurely ends main() after the catch block; remove the extra '}' that appears immediately after the catch (the one that closes main) so the subsequent initialization calls (clp::Profiler::init(), clp::TimestampPattern::init(), clp::telemetry::init()) remain inside main(), and verify the try/catch and main() brace balance is correct around the existing try { ... } catch (std::exception& e) { ... } and main() function.
631-648: 🧹 Nitpick | 🔵 Trivial | 💤 Low valueTelemetry records counters even for non-search commands.
bytes_scanned/bytes_outputremain0forExtractIr, yetrecord_query_metrics(...)still emitsclp.query.*counters afterward. Consider recording query metrics only when theSearchcommand ran to avoid emitting zero-valued query data points on the extraction path.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@components/core/src/clp/clo/clo.cpp` around lines 631 - 648, The telemetry call currently records bytes_scanned/bytes_output (which stay zero) for non-Search commands; restrict clp::telemetry::record_query_metrics(bytes_scanned, bytes_output) so it is only invoked when the Search path ran successfully (i.e. inside the Command::Search branch after a successful search() call), leaving clp::telemetry::shutdown() where it is called for all commands; update the logic around the search branch and the variables bytes_scanned/bytes_output to ensure metrics are emitted only for search operations and not for extract_ir().
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@components/core/CMakeLists.txt`:
- Around line 349-353: Replace the unconditional
FetchContent_Declare(opentelemetry-cpp ...) that uses GIT_TAG v1.14.2 with a
reproducible, shallow fetch: pin the GIT repository to the tag's commit SHA
instead of the tag name and add GIT_SHALLOW TRUE to the FetchContent_Declare
call; also gate the entire opentelemetry-cpp FetchContent block behind a
dedicated option (e.g., ENABLE_OPENTELEMETRY or BUILD_TELEMETRY) in addition to
CLP_BUILD_EXECUTABLES so the dependency is only fetched when telemetry is
explicitly enabled.
- Around line 342-355: The FetchContent block is forcing BUILD_TESTING into the
cache which pollutes global state and disables CLP tests; instead, save/restore
BUILD_TESTING around the opentelemetry-cpp FetchContent: before
FetchContent_Declare/FetchContent_MakeAvailable capture whether BUILD_TESTING is
defined (e.g., save to a temporary variable like _OLD_BUILD_TESTING), then set
BUILD_TESTING OFF locally (without CACHE/FORCE) for the duration of
FetchContent_MakeAvailable(opentelemetry-cpp), and finally restore the original
BUILD_TESTING value (or undefine it) after FetchContent_MakeAvailable so
CLP_ENABLE_TESTS / CLP_BUILD_TESTING behavior is unchanged.
In `@components/core/src/clp_s/clp-s.cpp`:
- Line 321: Telemetry is initialized unconditionally via clp::telemetry::init()
but clp::telemetry::shutdown() (and record_query_metrics) are only called on the
successful search path, causing exporter flush/leak issues; fix by introducing
an RAII guard (e.g., a TelemetryGuard class or scoped object) constructed
immediately after clp::telemetry::init() that calls clp::telemetry::shutdown()
from its destructor, and ensure record_query_metrics is invoked before shutdown
where appropriate (for search path) so that the destructor always runs on every
early return or exception; update main flow to create this guard (referencing
clp::telemetry::init, TelemetryGuard, record_query_metrics, and
clp::telemetry::shutdown) so shutdown is guaranteed on all exit paths.
In `@components/core/src/clp/telemetry/telemetry.cpp`:
- Around line 62-66: The pointer guard in record_query_metrics uses a negation
check; change the conditional from if (!provider) to an explicit comparison if
(nullptr == provider) to follow the repo's pointer-guard style; update the check
immediately after retrieving provider in record_query_metrics to use nullptr ==
provider while leaving the surrounding logic unchanged.
In
`@components/job-orchestration/job_orchestration/executor/compress/celery_compress.py`:
- Around line 13-15: The worker_process_init_handler in celery_compress.py calls
init_worker_telemetry but that symbol is not importable due to a broken export
in telemetry_utils; fix by either correcting telemetry_utils to properly export
and return init_worker_telemetry (or the correctly named function) or update the
import in this module to the correct symbol name, and add a safe guard
(try/except around the import or the call inside worker_process_init_handler) so
a missing telemetry import won't crash the worker process; reference the
init_worker_telemetry function and the worker_process_init_handler signal
handler when making the change.
In `@components/job-orchestration/job_orchestration/executor/telemetry_utils.py`:
- Line 5: Replace the faulty import of get_meter from opentelemetry.api.metrics
with the correct module opentelemetry.metrics in both locations: update the
import in telemetry_utils.py (where get_meter is currently imported) and the
import in compression_task.py (the line importing get_meter at or near the top
of the file); ensure the import reads "from opentelemetry.metrics import
get_meter" so worker startup no longer raises ModuleNotFoundError.
---
Outside diff comments:
In `@components/core/src/clp_s/search/Output.hpp`:
- Around line 41-47: Update the comment for the method filter(uint64_t&
bytes_output) to document the new out-parameter: add a `@param` bytes_output line
stating that bytes_output is an output accumulator which the method adds the
total length of emitted messages to (it is added to, not reset), and keep the
existing `@return` description unchanged so readers understand both the
side-effect and the boolean success value.
In `@components/core/src/clp/clo/clo.cpp`:
- Around line 606-613: There is a stray closing brace that prematurely ends
main() after the catch block; remove the extra '}' that appears immediately
after the catch (the one that closes main) so the subsequent initialization
calls (clp::Profiler::init(), clp::TimestampPattern::init(),
clp::telemetry::init()) remain inside main(), and verify the try/catch and
main() brace balance is correct around the existing try { ... } catch
(std::exception& e) { ... } and main() function.
- Around line 631-648: The telemetry call currently records
bytes_scanned/bytes_output (which stay zero) for non-Search commands; restrict
clp::telemetry::record_query_metrics(bytes_scanned, bytes_output) so it is only
invoked when the Search path ran successfully (i.e. inside the Command::Search
branch after a successful search() call), leaving clp::telemetry::shutdown()
where it is called for all commands; update the logic around the search branch
and the variables bytes_scanned/bytes_output to ensure metrics are emitted only
for search operations and not for extract_ir().
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: f2055520-266f-4e06-94d9-de2a2788cb68
📒 Files selected for processing (16)
components/clp-py-utils/clp_py_utils/telemetry.pycomponents/clp-py-utils/pyproject.tomlcomponents/core/CMakeLists.txtcomponents/core/src/clp/clo/CMakeLists.txtcomponents/core/src/clp/clo/clo.cppcomponents/core/src/clp/telemetry/telemetry.cppcomponents/core/src/clp/telemetry/telemetry.hppcomponents/core/src/clp_s/CMakeLists.txtcomponents/core/src/clp_s/clp-s.cppcomponents/core/src/clp_s/search/Output.cppcomponents/core/src/clp_s/search/Output.hppcomponents/job-orchestration/job_orchestration/executor/compress/celery_compress.pycomponents/job-orchestration/job_orchestration/executor/compress/compression_task.pycomponents/job-orchestration/job_orchestration/executor/compress/spider_compress.pycomponents/job-orchestration/job_orchestration/executor/query/celery.pycomponents/job-orchestration/job_orchestration/executor/telemetry_utils.py
💤 Files with no reviewable changes (1)
- components/job-orchestration/job_orchestration/executor/compress/compression_task.py
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/core/src/clp/clo/clo.cpp (1)
422-429:⚠️ Potential issue | 🟠 Major | ⚡ Quick winCount output bytes only after successful emission.
Line 422 increments
total_bytes_outputbeforeoutput_handler->add_result(...)succeeds. If result delivery fails, emitted-byte metrics are overstated.Proposed fix
- total_bytes_output += decompressed_message.length(); if (ErrorCode_Success != output_handler->add_result( compressed_file.get_orig_path(), compressed_file.get_orig_file_id_as_string(), encoded_message, decompressed_message )) { result = SearchFilesResult::ResultSendFailure; break; } + total_bytes_output += decompressed_message.length();🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@components/core/src/clp/clo/clo.cpp` around lines 422 - 429, The code increments total_bytes_output before verifying delivery; change the flow so you only add decompressed_message.length() to total_bytes_output after output_handler->add_result(...) returns ErrorCode_Success. Locate the block using total_bytes_output, output_handler->add_result(...), and compressed_file (methods get_orig_path/get_orig_file_id_as_string) and move or conditionally apply the increment so it occurs only on successful add_result.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@components/core/src/clp_s/search/Output.cpp`:
- Around line 146-147: Do not hardcode stats.bytes_scanned to zero in
Output.cpp; either populate json_msg["stats"]["bytes_scanned"] with the real
scan metric (e.g., use the existing bytes_scanned/scan_bytes variable or a
Stats/QueryStats object used when computing total_bytes_output) or omit the
field entirely if the metric is unavailable. Update the assignment that sets
json_msg["stats"]["bytes_scanned"] (paired with
json_msg["stats"]["bytes_output"]) so it reads the actual metric or is not
emitted, ensuring the query worker does not ingest a fabricated zero value.
In `@components/core/src/clp/clo/clo.cpp`:
- Line 459: The code increments total_bytes_scanned using
file_metadata_ix.get_num_uncompressed_bytes() before calling search_file(...),
which inflates stats.bytes_scanned when search_file returns
SearchFilesResult::OpenFailure; modify both occurrences (the additions at the
sites where total_bytes_scanned is incremented) so that you only add
file_metadata_ix.get_num_uncompressed_bytes() after search_file(...) completes
successfully (i.e., result != SearchFilesResult::OpenFailure) or guard the
increment with a check of the search result; update the logic that sets
stats.bytes_scanned to use total_bytes_scanned that only includes successfully
opened files.
In
`@components/job-orchestration/job_orchestration/executor/query/fs_search_task.py`:
- Around line 315-318: Guard against non-dict stats before accessing its keys:
when you extract stats = data["stats"] (in the block that currently checks
isinstance(data, dict) and "stats" in data), add a type check like
isinstance(stats, dict) and only call stats.get(...) if true; otherwise set
bytes_scanned and bytes_output to None (or a safe default) so accessing
bytes_scanned and bytes_output cannot raise when stats is not an object. Ensure
you update the code paths that use bytes_scanned/bytes_output accordingly.
---
Outside diff comments:
In `@components/core/src/clp/clo/clo.cpp`:
- Around line 422-429: The code increments total_bytes_output before verifying
delivery; change the flow so you only add decompressed_message.length() to
total_bytes_output after output_handler->add_result(...) returns
ErrorCode_Success. Locate the block using total_bytes_output,
output_handler->add_result(...), and compressed_file (methods
get_orig_path/get_orig_file_id_as_string) and move or conditionally apply the
increment so it occurs only on successful add_result.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: d846e0ce-b602-43fc-bf1f-11de43766da6
⛔ Files ignored due to path filters (5)
components/clp-mcp-server/uv.lockis excluded by!**/*.lockcomponents/clp-package-utils/uv.lockis excluded by!**/*.lockcomponents/clp-py-utils/uv.lockis excluded by!**/*.lockcomponents/job-orchestration/uv.lockis excluded by!**/*.lockintegration-tests/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (10)
components/clp-py-utils/clp_py_utils/telemetry.pycomponents/core/src/clp/clo/clo.cppcomponents/core/src/clp_s/search/Output.cppcomponents/job-orchestration/job_orchestration/executor/compress/celery.pycomponents/job-orchestration/job_orchestration/executor/compress/compression_task.pycomponents/job-orchestration/job_orchestration/executor/query/celery.pycomponents/job-orchestration/job_orchestration/executor/query/fs_search_task.pytools/deployment/package-helm/templates/compression-worker-deployment.yamltools/deployment/package-helm/templates/query-worker-deployment.yamltools/deployment/package/docker-compose-all.yaml
fix lint Update Chart.yaml lint
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
components/job-orchestration/job_orchestration/executor/query/fs_search_task.py (1)
312-315:⚠️ Potential issue | 🟠 Major | ⚡ Quick winGuard
statsshape before key access.At Line 313-Line 315,
stats.get(...)can raise ifdata["stats"]is not a dict, which can fail the task in telemetry parsing.Proposed fix
- if isinstance(data, dict) and "stats" in data: - stats = data["stats"] - bytes_scanned = stats.get("bytes_scanned", None) - bytes_output = stats.get("bytes_output", None) - break + if isinstance(data, dict) and "stats" in data: + stats = data["stats"] + if isinstance(stats, dict): + bytes_scanned = stats.get("bytes_scanned") + bytes_output = stats.get("bytes_output") + break🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@components/job-orchestration/job_orchestration/executor/query/fs_search_task.py` around lines 312 - 315, The current code assumes data["stats"] is a dict and calls stats.get(...), which will raise if stats is not a mapping; update the block that reads data and stats (the variables data, stats, bytes_scanned, bytes_output in fs_search_task.py) to first verify stats is a dict (e.g., if isinstance(stats, dict)) before calling stats.get, otherwise set bytes_scanned and bytes_output to None (or safe defaults); ensure the guard is applied where data is checked ("if isinstance(data, dict) and 'stats' in data") so you extract stats = data["stats"] then check isinstance(stats, dict) before accessing keys.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@integration-tests/tests/utils/fs_validation.py`:
- Around line 44-55: The current per-line JSON parsing in
is_json_file_structurally_equal (using json_fp and building lines[]) breaks for
pretty-printed/multi-line JSON; instead read the entire file content, try
json.loads(content) and on success append json.dumps(obj, separators=(",", ":"),
sort_keys=True) to lines for normalization; if full-file parse fails, fall back
to treating the file as JSONL by iterating non-empty lines and normalizing each
via json.loads as before (or keeping raw line on JSONDecodeError) so both single
JSON documents and JSONL files are handled.
---
Duplicate comments:
In
`@components/job-orchestration/job_orchestration/executor/query/fs_search_task.py`:
- Around line 312-315: The current code assumes data["stats"] is a dict and
calls stats.get(...), which will raise if stats is not a mapping; update the
block that reads data and stats (the variables data, stats, bytes_scanned,
bytes_output in fs_search_task.py) to first verify stats is a dict (e.g., if
isinstance(stats, dict)) before calling stats.get, otherwise set bytes_scanned
and bytes_output to None (or safe defaults); ensure the guard is applied where
data is checked ("if isinstance(data, dict) and 'stats' in data") so you extract
stats = data["stats"] then check isinstance(stats, dict) before accessing keys.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: f74fd339-f913-4b59-a949-3cda8a38cee8
📒 Files selected for processing (9)
components/core/CMakeLists.txtcomponents/job-orchestration/job_orchestration/executor/compress/compression_task.pycomponents/job-orchestration/job_orchestration/executor/query/fs_search_task.pyintegration-tests/tests/binary_tests/test_log_converter.pyintegration-tests/tests/package_tests/utils/modes.pyintegration-tests/tests/utils/classes.pyintegration-tests/tests/utils/fs_validation.pyintegration-tests/tests/utils/port_utils.pytools/deployment/package-helm/Chart.yaml
sitaowang1998
left a comment
There was a problem hiding this comment.
Hasn't gone into details, but I am concerned with the overall method we use to collect telemetry.
Changing the stdout is a significant breaking change, which probably breaks a lot of things. Since we do need IPC, how about creating separate pipes for metrics so we don't pollute the stdout? @junhaoliao What's your opinion on this?
I think I'd personally prefer just properly integrating open telemetry into the c++ code. Writing to stdout/stderr unconditionally definitely causes problems here (since we already have output to stdout/stderr), and even using a pipe is probably a temporary solution that we will probably want to change once we integrate with spider. |
|
Note Unit test generation is a public access feature. Expect some limitations and changes as we gather feedback and continue to improve it. Generating unit tests... This may take up to 20 minutes. |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
components/clp-py-utils/clp_py_utils/telemetry.py (2)
19-20: 🧹 Nitpick | 🔵 Trivial | 💤 Low valueUpdate the docstring to reflect actual behaviour.
The docstring states "this function does nothing" when telemetry is disabled, but Line 25 actually sets a
NoOpMeterProvider. Consider revising to "this function installs a no-op meter provider" for accuracy.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@components/clp-py-utils/clp_py_utils/telemetry.py` around lines 19 - 20, Update the function docstring in telemetry.py to accurately describe behavior when telemetry is disabled: instead of saying "this function does nothing", state that the function installs a NoOpMeterProvider (i.e., a no-op meter provider) and returns/continues without active telemetry. Mention the NoOpMeterProvider install performed in the function where the provider is set (the block that assigns NoOpMeterProvider) so readers know the function still configures a no-op provider rather than doing nothing.
46-52: 🧹 Nitpick | 🔵 TrivialUse
timeout_millisexplicitly for OpenTelemetryforce_flush()/shutdown()inshutdown_telemetry()
MeterProvider.force_flush()andMeterProvider.shutdown()already supporttimeout_millis(defaults: 10s and 30s), so these calls shouldn’t block indefinitely. Passing explicittimeout_millisvalues would make the intended shutdown budget clear.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@components/clp-py-utils/clp_py_utils/telemetry.py` around lines 46 - 52, In shutdown_telemetry(), explicitly pass timeout_millis to the OpenTelemetry provider calls so they don't block indefinitely: call provider.force_flush(timeout_millis=10000) (or another chosen ms value) and provider.shutdown(timeout_millis=30000) (or chosen ms), and keep the existing try/except/logger.warning handling around those calls (referencing provider.force_flush(), provider.shutdown(), shutdown_telemetry(), and logger.warning).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@integration-tests/tests/package_tests/utils/modes.py`:
- Line 40: Add an OTEL_COLLECTOR_COMPONENT_NAME constant and replace the literal
"otel-collector" with
_to_docker_compose_service_name(OTEL_COLLECTOR_COMPONENT_NAME) to match the
existing pattern; define the constant alongside other component constants (or
import it from clp_py_utils.clp_config) and update any reference in modes.py
where the literal is used so the service name is derived via the
_to_docker_compose_service_name function.
---
Outside diff comments:
In `@components/clp-py-utils/clp_py_utils/telemetry.py`:
- Around line 19-20: Update the function docstring in telemetry.py to accurately
describe behavior when telemetry is disabled: instead of saying "this function
does nothing", state that the function installs a NoOpMeterProvider (i.e., a
no-op meter provider) and returns/continues without active telemetry. Mention
the NoOpMeterProvider install performed in the function where the provider is
set (the block that assigns NoOpMeterProvider) so readers know the function
still configures a no-op provider rather than doing nothing.
- Around line 46-52: In shutdown_telemetry(), explicitly pass timeout_millis to
the OpenTelemetry provider calls so they don't block indefinitely: call
provider.force_flush(timeout_millis=10000) (or another chosen ms value) and
provider.shutdown(timeout_millis=30000) (or chosen ms), and keep the existing
try/except/logger.warning handling around those calls (referencing
provider.force_flush(), provider.shutdown(), shutdown_telemetry(), and
logger.warning).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 5af9bdf9-1462-4c21-bad6-9f0cd45469c5
📒 Files selected for processing (12)
components/clp-py-utils/clp_py_utils/telemetry.pycomponents/job-orchestration/job_orchestration/executor/compress/compression_task.pycomponents/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.pycomponents/job-orchestration/job_orchestration/scheduler/query/query_scheduler.pyintegration-tests/tests/package_tests/utils/modes.pyintegration-tests/tests/utils/classes.pyintegration-tests/tests/utils/port_utils.pytools/deployment/package-helm/Chart.yamltools/deployment/package-helm/templates/compression-scheduler-deployment.yamltools/deployment/package-helm/templates/compression-worker-deployment.yamltools/deployment/package-helm/templates/query-scheduler-deployment.yamltools/deployment/package/docker-compose-all.yaml
For what it's worth, I suspect that this was related to the issue fixed by #2318 where forked dispatch workers inherit some things from the parent process. If these gauges were inherited across a fork that had non-empty active jobs it might have caused the behaviour you see here. Since that fix has been merged into this branch now, could you try reproducing this issue again and see if it still happens? |
gibber9809
left a comment
There was a problem hiding this comment.
Partial review for compression scheduler +worker changes + query scheduler changes. One nit, but otherwise looks good to me.
I've re-ran the test and now the telemetry sent to the telemetry server is correct. However, running the script for reproducing still gives the same output. The query_tasks table still shows pending |
junhaoliao
left a comment
There was a problem hiding this comment.
good job. there are some data race and missing key issues. other ones are just nitpicking
-
Error-path return shape may cause exceptions
- Worker failure paths return only
error_message, but later code unconditionally readstotal_uncompressed_sizeandtotal_compressed_size. - Fix by returning zero values for both size fields on all error paths.
- Worker failure paths return only
-
Potential data races in metric callbacks
- Observable metric callbacks may run in a background thread while scheduler state dictionaries are being mutated.
- Fix by snapshotting
scheduled_jobs.values()/active_jobs.values()inside atry, catchingRuntimeError, and computing counts from the snapshot.
-
Out-of-scope integration-test fixes
- Port-check and timeout changes appear unrelated to the telemetry PR.
- Revert them and track the port occupancy issue separately.
-
Telemetry config duplication
- Telemetry-disable env-var logic is duplicated.
- Add a shared helper such as
clp_py_utils.telemetry_config.is_telemetry_disabled_by_env()and use it from both call sites.
-
Metric instrument semantics
- Active jobs and outstanding tasks are additive but not monotonically increasing.
- Use asynchronous up-down counters instead of gauges.
-
OpenTelemetry instrumentation scope
- Meters should use the module name as the instrumentation scope.
- Replace hardcoded names like
"compression-worker"/"compression-scheduler"/"query-scheduler"with__name__.
-
Metric units
- Workload metrics should declare units.
- Use
unit="{job}"for active jobs andunit="{task}"for outstanding tasks.
-
Unused callback parameters
- Callback parameters named
optionsare unused. - Rename them to
_options.
- Callback parameters named
-
Docs alignment
- Update docs to mention Python services using
OTEL_SERVICE_NAME, list supported telemetry components, and describe the workload metrics as up-down counters.
- Update docs to mention Python services using
-
Lint / formatting / config ordering
- Some comments, docstrings, and YAML examples need cleanup.
- Use a one-line docstring where requested and sort config keys consistently.
Co-authored-by: Junhao Liao <junhao@junhao.ca>
Apply suggestions from code review fix doc Co-Authored-By: Junhao Liao <junhao@junhao.ca>
Co-authored-by: Junhao Liao <junhao@junhao.ca>
Co-authored-by: Junhao Liao <junhao@junhao.ca>
junhaoliao
left a comment
There was a problem hiding this comment.
for the title, how about:
feat(telemetry): Add metrics for query and compression schedulers/workers (fixes #2346).
done. if everything is all good, I will go ahead and merge. |

Description
Adds OpenTelemetry metrics collection to the compression and query schedulers/workers, plus deployment config for the OTel collector.
Counters
clp.compression.bytes_input_total: Size of uncompressed data processed by compression, tagged by success/failure statusclp.compression.bytes_output_total: Size of compressed output generated by the compression, tagged by success/failure statusclp.compression.tasks.completed/failed: Task completion counts on the compression schedulerclp.query.tasks.completed/failed: Task completion counts on the query schedulerGauges (point-in-time snapshots)
clp.compression.active_jobs/outstanding_tasks: In-flight job and task levels on the compression schedulerclp.query.active_jobs/outstanding_tasks: In-flight job and task levels on the query schedulerHistograms
clp.compression.job.duration/task.duration: Execution durations for compression jobs and tasksclp.compression.input_rate/output_rate: Speed of uncompressed bytes processed and compressed bytes output per taskclp.query.job.duration/task.duration: Execution durations for query jobs and tasksAlso small fixes to fix integration test:
SO_REUSEADDRon port checks and increased default command timeout.Checklist
breaking change.
Validation performed
Integration test all passed:
End-to-end test:
Run this on the client
Run this on the telemetry server:
Everything works:
Summary by CodeRabbit
New Features
Chores
Tests
Documentation