Skip to content

feat(telemetry): Add metrics for query and compression schedulers/workers (fixes #2346).#2288

Merged
Nathan903 merged 55 commits into
y-scope:mainfrom
Nathan903:pr4test
Jun 24, 2026
Merged

feat(telemetry): Add metrics for query and compression schedulers/workers (fixes #2346).#2288
Nathan903 merged 55 commits into
y-scope:mainfrom
Nathan903:pr4test

Conversation

@Nathan903

@Nathan903 Nathan903 commented May 19, 2026

Copy link
Copy Markdown
Contributor

Description

Adds OpenTelemetry metrics collection to the compression and query schedulers/workers, plus deployment config for the OTel collector.

Counters

  • clp.compression.bytes_input_total: Size of uncompressed data processed by compression, tagged by success/failure status
  • clp.compression.bytes_output_total: Size of compressed output generated by the compression, tagged by success/failure status
  • clp.compression.tasks.completed / failed: Task completion counts on the compression scheduler
  • clp.query.tasks.completed / failed: Task completion counts on the query scheduler

Gauges (point-in-time snapshots)

  • clp.compression.active_jobs / outstanding_tasks: In-flight job and task levels on the compression scheduler
  • clp.query.active_jobs / outstanding_tasks: In-flight job and task levels on the query scheduler

Histograms

  • clp.compression.job.duration / task.duration: Execution durations for compression jobs and tasks
  • clp.compression.input_rate / output_rate: Speed of uncompressed bytes processed and compressed bytes output per task
  • clp.query.job.duration / task.duration: Execution durations for query jobs and tasks

Also small fixes to fix integration test: SO_REUSEADDR on port checks and increased default command timeout.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

Integration test all passed:

task tests:integration

End-to-end test:
Run this on the client

sbin/start-clp.sh
sbin/compress.sh ~/hive-24hr-ts/hive-24hr-ts.jsonl
sbin/search.sh "ERROR"

Run this on the telemetry server:

docker exec clp-telemetry-server-clickhouse-1 clickhouse-client --user default --password clickhouse --query "SELECT * FROM clp_telemetry.otel_metrics_gauge ORDER BY TimeUnix DESC LIMIT 5 FORMAT Vertical"
docker exec clp-telemetry-server-clickhouse-1 clickhouse-client --user default --password clickhouse --query "SELECT * FROM clp_telemetry.otel_metrics_sum ORDER BY TimeUnix DESC LIMIT 5 FORMAT Vertical"
docker exec clp-telemetry-server-clickhouse-1 clickhouse-client --user default --password clickhouse --query "SELECT MetricName, Value, TimeUnix FROM clp_telemetry.otel_metrics_gauge WHERE ServiceName = 'compression-scheduler' ORDER BY TimeUnix DESC LIMIT 10 FORMAT Vertical"

Everything works:

nathanhung@baker18:/home/nathanhung/clp/build/clp-package$ sbin/compress.sh ~/hive-24hr-ts/hive-24hr-ts.jsonl
Container clp-package-clp-runtime-run-36fe1dd72209 Creating
Container clp-package-clp-runtime-run-36fe1dd72209 Created
2026-06-08T20:25:56.123 WARNING [compress] `--timestamp-key` not specified. Events will not have assigned timestamps and can only be searched from the command line without a timestamp filter.
2026-06-08T20:25:58.656 INFO [compress] Compression job 1 submitted.
2026-06-08T20:26:15.203 INFO [compress] Compressed 751.38MB into 24.35MB (30.86x). Speed: 46.82MB/s.
2026-06-08T20:26:21.719 INFO [compress] Compression finished.
2026-06-08T20:26:21.719 INFO [compress] Compressed 2.19GB into 65.82MB (34.10x). Speed: 101.02MB/s.
nathanhung@baker18:/home/nathanhung/clp/build/clp-package$ sbin/search.sh "ERROR"
Container clp-package-clp-runtime-run-d342868c114c Creating
Container clp-package-clp-runtime-run-d342868c114c Created
ubuntu@ip-172-26-13-138:~/clp-telemetry-server$  docker exec clp-telemetry-server-clickhouse-1 clickhouse-client --user default --password clickhouse --query "SELECT * FROM clp_telemetry.otel_metrics_gauge ORDER BY TimeUnix DESC LIMIT 5 FORMAT Vertical"
Row 1:
──────
ResourceAttributes:           {'clp.deployment.id':'38af27ab-6e8a-441a-b78a-d90702fc15ba','clp.deployment.method':'docker-compose','clp.storage.engine':'clp-s','host.arch':'amd64','host.cpu.family':'6','host.cpu.model.id':'63','host.cpu.vendor.id':'GenuineIntel','os.type':'linux','service.name':'compression-scheduler','service.version':'0.12.1-dev','telemetry.sdk.language':'python','telemetry.sdk.name':'opentelemetry','telemetry.sdk.version':'1.42.1'}
ResourceSchemaUrl:            https://opentelemetry.io/schemas/1.40.0
ScopeName:                    compression-scheduler
ScopeVersion:
ScopeAttributes:              {}
ScopeDroppedAttrCount:        0
ScopeSchemaUrl:
ServiceName:                  compression-scheduler
MetricName:                   clp.compression.active_jobs
MetricDescription:            Number of active compression jobs
MetricUnit:
Attributes:                   {}
StartTimeUnix:                1970-01-01 00:00:00.000000000
TimeUnix:                     2026-06-08 20:27:17.674228146
Value:                        0
Flags:                        0
Exemplars.FilteredAttributes: []
Exemplars.TimeUnix:           []
Exemplars.Value:              []
Exemplars.SpanId:             []
Exemplars.TraceId:            []

Row 2:
──────
ResourceAttributes:           {'clp.deployment.id':'38af27ab-6e8a-441a-b78a-d90702fc15ba','clp.deployment.method':'docker-compose','clp.storage.engine':'clp-s','host.arch':'amd64','host.cpu.family':'6','host.cpu.model.id':'63','host.cpu.vendor.id':'GenuineIntel','os.type':'linux','service.name':'compression-scheduler','service.version':'0.12.1-dev','telemetry.sdk.language':'python','telemetry.sdk.name':'opentelemetry','telemetry.sdk.version':'1.42.1'}
ResourceSchemaUrl:            https://opentelemetry.io/schemas/1.40.0
ScopeName:                    compression-scheduler
ScopeVersion:
ScopeAttributes:              {}
ScopeDroppedAttrCount:        0
ScopeSchemaUrl:
ServiceName:                  compression-scheduler
MetricName:                   clp.compression.outstanding_tasks
MetricDescription:            Total number of outstanding compression tasks
MetricUnit:
Attributes:                   {}
StartTimeUnix:                1970-01-01 00:00:00.000000000
TimeUnix:                     2026-06-08 20:27:17.674228146
Value:                        0
Flags:                        0
Exemplars.FilteredAttributes: []
Exemplars.TimeUnix:           []
Exemplars.Value:              []
Exemplars.SpanId:             []
Exemplars.TraceId:            []

Row 3:
──────
ResourceAttributes:           {'clp.deployment.id':'38af27ab-6e8a-441a-b78a-d90702fc15ba','clp.deployment.method':'docker-compose','clp.storage.engine':'clp-s','host.arch':'amd64','host.cpu.family':'6','host.cpu.model.id':'63','host.cpu.vendor.id':'GenuineIntel','os.type':'linux','service.name':'compression-scheduler','service.version':'0.12.1-dev','telemetry.sdk.language':'python','telemetry.sdk.name':'opentelemetry','telemetry.sdk.version':'1.42.1'}
ResourceSchemaUrl:            https://opentelemetry.io/schemas/1.40.0
ScopeName:                    compression-scheduler
ScopeVersion:
ScopeAttributes:              {}
ScopeDroppedAttrCount:        0
ScopeSchemaUrl:
ServiceName:                  compression-scheduler
MetricName:                   clp.compression.outstanding_tasks
MetricDescription:            Total number of outstanding compression tasks
MetricUnit:
Attributes:                   {}
StartTimeUnix:                1970-01-01 00:00:00.000000000
TimeUnix:                     2026-06-08 20:27:17.672156077
Value:                        0
Flags:                        0
Exemplars.FilteredAttributes: []
Exemplars.TimeUnix:           []
Exemplars.Value:              []
Exemplars.SpanId:             []
Exemplars.TraceId:            []

Row 4:
──────
ResourceAttributes:           {'clp.deployment.id':'38af27ab-6e8a-441a-b78a-d90702fc15ba','clp.deployment.method':'docker-compose','clp.storage.engine':'clp-s','host.arch':'amd64','host.cpu.family':'6','host.cpu.model.id':'63','host.cpu.vendor.id':'GenuineIntel','os.type':'linux','service.name':'compression-scheduler','service.version':'0.12.1-dev','telemetry.sdk.language':'python','telemetry.sdk.name':'opentelemetry','telemetry.sdk.version':'1.42.1'}
ResourceSchemaUrl:            https://opentelemetry.io/schemas/1.40.0
ScopeName:                    compression-scheduler
ScopeVersion:
ScopeAttributes:              {}
ScopeDroppedAttrCount:        0
ScopeSchemaUrl:
ServiceName:                  compression-scheduler
MetricName:                   clp.compression.active_jobs
MetricDescription:            Number of active compression jobs
MetricUnit:
Attributes:                   {}
StartTimeUnix:                1970-01-01 00:00:00.000000000
TimeUnix:                     2026-06-08 20:27:17.672156077
Value:                        0
Flags:                        0
Exemplars.FilteredAttributes: []
Exemplars.TimeUnix:           []
Exemplars.Value:              []
Exemplars.SpanId:             []
Exemplars.TraceId:            []

Row 5:
──────
ResourceAttributes:           {'clp.deployment.id':'38af27ab-6e8a-441a-b78a-d90702fc15ba','clp.deployment.method':'docker-compose','clp.storage.engine':'clp-s','host.arch':'amd64','host.cpu.family':'6','host.cpu.model.id':'63','host.cpu.vendor.id':'GenuineIntel','os.type':'linux','service.name':'query-scheduler','service.version':'0.12.1-dev','telemetry.sdk.language':'python','telemetry.sdk.name':'opentelemetry','telemetry.sdk.version':'1.42.1'}
ResourceSchemaUrl:            https://opentelemetry.io/schemas/1.40.0
ScopeName:                    query-scheduler
ScopeVersion:
ScopeAttributes:              {}
ScopeDroppedAttrCount:        0
ScopeSchemaUrl:
ServiceName:                  query-scheduler
MetricName:                   clp.query.outstanding_tasks
MetricDescription:            Total number of outstanding tasks across all active query jobs
MetricUnit:
Attributes:                   {}
StartTimeUnix:                1970-01-01 00:00:00.000000000
TimeUnix:                     2026-06-08 20:27:15.978237752
Value:                        0
Flags:                        0
Exemplars.FilteredAttributes: []
Exemplars.TimeUnix:           []
Exemplars.Value:              []
Exemplars.SpanId:             []
Exemplars.TraceId:            []
ubuntu@ip-172-26-13-138:~/clp-telemetry-server$  docker exec clp-telemetry-server-clickhouse-1 clickhouse-client --user default --password clickhouse --query "SELECT * FROM clp_telemetry.otel_metrics_sum ORDER BY TimeUnix DESC LIMIT 5 FORMAT Vertical"
Row 1:
──────
ResourceAttributes:           {'clp.deployment.id':'38af27ab-6e8a-441a-b78a-d90702fc15ba','clp.deployment.method':'docker-compose','clp.storage.engine':'clp-s','host.arch':'amd64','host.cpu.family':'6','host.cpu.model.id':'63','host.cpu.vendor.id':'GenuineIntel','os.type':'linux','service.name':'compression-worker','service.version':'0.12.1-dev','telemetry.sdk.language':'python','telemetry.sdk.name':'opentelemetry','telemetry.sdk.version':'1.42.1'}
ResourceSchemaUrl:            https://opentelemetry.io/schemas/1.40.0
ScopeName:                    compression-worker
ScopeVersion:
ScopeAttributes:              {}
ScopeDroppedAttrCount:        0
ScopeSchemaUrl:
ServiceName:                  compression-worker
MetricName:                   clp.compression.bytes_output_total
MetricDescription:            Total compressed bytes output by compression
MetricUnit:                   By
Attributes:                   {'status':'success'}
StartTimeUnix:                2026-06-08 20:26:21.371128317
TimeUnix:                     2026-06-08 20:26:30.825562333
Value:                        69020331
Flags:                        0
Exemplars.FilteredAttributes: []
Exemplars.TimeUnix:           []
Exemplars.Value:              []
Exemplars.SpanId:             []
Exemplars.TraceId:            []
AggregationTemporality:       2
IsMonotonic:                  true

Row 2:
──────
ResourceAttributes:           {'clp.deployment.id':'38af27ab-6e8a-441a-b78a-d90702fc15ba','clp.deployment.method':'docker-compose','clp.storage.engine':'clp-s','host.arch':'amd64','host.cpu.family':'6','host.cpu.model.id':'63','host.cpu.vendor.id':'GenuineIntel','os.type':'linux','service.name':'compression-worker','service.version':'0.12.1-dev','telemetry.sdk.language':'python','telemetry.sdk.name':'opentelemetry','telemetry.sdk.version':'1.42.1'}
ResourceSchemaUrl:            https://opentelemetry.io/schemas/1.40.0
ScopeName:                    compression-worker
ScopeVersion:
ScopeAttributes:              {}
ScopeDroppedAttrCount:        0
ScopeSchemaUrl:
ServiceName:                  compression-worker
MetricName:                   clp.compression.bytes_input_total
MetricDescription:            Total uncompressed bytes processed by compression
MetricUnit:                   By
Attributes:                   {'status':'success'}
StartTimeUnix:                2026-06-08 20:26:21.371089242
TimeUnix:                     2026-06-08 20:26:30.825562333
Value:                        2353623901
Flags:                        0
Exemplars.FilteredAttributes: []
Exemplars.TimeUnix:           []
Exemplars.Value:              []
Exemplars.SpanId:             []
Exemplars.TraceId:            []
AggregationTemporality:       2
IsMonotonic:                  true

ubuntu@ip-172-26-13-138:~/clp-telemetry-server$  docker exec clp-telemetry-server-clickhouse-1 clickhouse-client --user default --password clickhouse --query "SELECT MetricName, Value, TimeUnix FROM clp_telemetry.otel_metrics_gauge WHERE ServiceName = 'compression-scheduler' ORDER BY TimeUnix DESC LIMIT 10 FORMAT Vertical"
Row 1:
──────
MetricName: clp.compression.active_jobs
Value:      0
TimeUnix:   2026-06-08 20:27:17.674228146

Row 2:
──────
MetricName: clp.compression.outstanding_tasks
Value:      0
TimeUnix:   2026-06-08 20:27:17.674228146

Row 3:
──────
MetricName: clp.compression.active_jobs
Value:      0
TimeUnix:   2026-06-08 20:27:17.672156077

Row 4:
──────
MetricName: clp.compression.outstanding_tasks
Value:      0
TimeUnix:   2026-06-08 20:27:17.672156077

Row 5:
──────
MetricName: clp.compression.active_jobs
Value:      0
TimeUnix:   2026-06-08 20:27:15.800721994

Row 6:
──────
MetricName: clp.compression.outstanding_tasks
Value:      0
TimeUnix:   2026-06-08 20:27:15.800721994

Row 7:
──────
MetricName: clp.compression.active_jobs
Value:      1
TimeUnix:   2026-06-08 20:26:15.795841709

Row 8:
──────
MetricName: clp.compression.outstanding_tasks
Value:      1
TimeUnix:   2026-06-08 20:26:15.795841709

Row 9:
───────
MetricName: clp.compression.active_jobs
Value:      0
TimeUnix:   2026-06-08 20:20:10.212991913

Row 10:
───────
MetricName: clp.compression.outstanding_tasks
Value:      0
TimeUnix:   2026-06-08 20:20:10.212991913
ubuntu@ip-172-26-13-138:~/clp-telemetry-server$ docker exec clp-telemetry-server-clickhouse-1 clickhouse-client --user default --password clickhouse --query "SELECT MetricName, Count, Sum, Min, Max, TimeUnix FROM clp_telemetry.otel_metrics_histogram ORDER BY TimeUnix DESC LIMIT 10 FORMAT Vertical"
Row 1:
──────
MetricName: clp.compression.input_rate
Count:      1
Sum:        104898841.16838366 -- 104.90 million
Min:        104898841.16838366 -- 104.90 million
Max:        104898841.16838366 -- 104.90 million
TimeUnix:   2026-06-15 18:50:32.790569515

Row 2:
──────
MetricName: clp.compression.output_rate
Count:      1
Sum:        3076172.7839654014 -- 3.08 million
Min:        3076172.7839654014 -- 3.08 million
Max:        3076172.7839654014 -- 3.08 million
TimeUnix:   2026-06-15 18:50:32.790569515

Row 3:
──────
MetricName: clp.compression.job.duration
Count:      1
Sum:        23.191334
Min:        23.191334
Max:        23.191334
TimeUnix:   2026-06-15 18:50:19.304071802

Row 4:
──────
MetricName: clp.compression.task.duration
Count:      1
Sum:        22.437082
Min:        22.437082
Max:        22.437082
TimeUnix:   2026-06-15 18:50:19.304071802
ubuntu@ip-172-26-13-138:~/clp-telemetry-server$ docker exec clp-telemetry-server-clickhouse-1 clickhouse-client --user default --password clickhouse --query "SELECT MetricName, Count, Sum, Min, Max, TimeUnix FROM clp_telemetry.otel_metrics_histogram WHERE ServiceName = 'compression-scheduler' ORDER BY TimeUnix DESC LIMIT 10 FORMAT Vertical"

Row 1:
──────
MetricName: clp.compression.job.duration
Count:      1
Sum:        23.191334
Min:        23.191334
Max:        23.191334
TimeUnix:   2026-06-15 18:50:19.304071802

Row 2:
──────
MetricName: clp.compression.task.duration
Count:      1
Sum:        22.437082
Min:        22.437082
Max:        22.437082
TimeUnix:   2026-06-15 18:50:19.304071802
ubuntu@ip-172-26-13-138:~/clp-telemetry-server$
ubuntu@ip-172-26-13-138:~/clp-telemetry-server$ docker exec clp-telemetry-server-clickhouse-1 clickhouse-client --user default --password clickhouse --query "SELECT MetricName, Value, TimeUnix FROM clp_telemetry.otel_metrics_sum WHERE MetricName LIKE '%tasks.completed' OR MetricName LIKE '%tasks.failed' ORDER BY TimeUnix DESC LIMIT 5 FORMAT Vertical"
Row 1:
──────
MetricName: clp.compression.tasks.completed
Value:      1
TimeUnix:   2026-06-15 18:50:19.304071802

Summary by CodeRabbit

  • New Features

    • Added OpenTelemetry metrics across compression and query schedulers and workers (bytes in/out counters, active jobs, outstanding tasks).
  • Chores

    • Wired telemetry init/shutdown into services; added OTEL service names and metric export interval settings; updated deployment manifests and Helm chart version.
    • Added OpenTelemetry support in Python utilities and dependencies.
  • Tests

    • Increased integration test command timeout and improved port probing reliability.
  • Documentation

    • Added telemetry configuration docs and examples.

@Nathan903 Nathan903 requested a review from a team as a code owner May 19, 2026 21:33
@coderabbitai

coderabbitai Bot commented May 19, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Adds OpenTelemetry metrics and lifecycle wiring across CLP services: a telemetry helper and dependencies; counters in compression tasks; observable gauges in compression and query schedulers; Celery worker signal handlers; deployment, compose, config, and test updates to expose telemetry environment variables.

Changes

OpenTelemetry Metrics Integration

Layer / File(s) Summary
Telemetry Foundation Module and Dependencies
components/clp-py-utils/clp_py_utils/telemetry.py, components/clp-py-utils/pyproject.toml
telemetry.py implements init_telemetry() and shutdown_telemetry() with environment gating and OTLP metric exporter setup; OpenTelemetry API, SDK, and OTLP HTTP exporter pinned to v1.42.1.
Compression Worker Metrics and Lifecycle Wiring
components/job-orchestration/job_orchestration/executor/compress/celery.py, components/job-orchestration/job_orchestration/executor/compress/compression_task.py, tools/deployment/package-helm/templates/compression-worker-deployment.yaml, components/clp-package-utils/clp_package_utils/controller.py, tools/deployment/package/docker-compose-all.yaml
Celery worker process signal handlers call telemetry init/shutdown. Compression task defines a compression-worker meter and two counters for total input/output bytes and records them (with status attribute) after task completion. Helm/compose templates and controller mappings include telemetry env and OTEL service name for the worker.
Compression Scheduler Observable Metrics and Lifecycle
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py, tools/deployment/package-helm/templates/compression-scheduler-deployment.yaml, components/clp-package-utils/clp_package_utils/controller.py, tools/deployment/package/docker-compose-all.yaml
Compression scheduler registers observable gauges: clp.compression.active_jobs (current scheduled job count) and clp.compression.outstanding_tasks (sum of remaining tasks); initializes telemetry at startup and registers shutdown via atexit. Deployment templates and controller mappings include telemetry env and OTEL service name.
Query Scheduler Observable Metrics and Lifecycle
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py, tools/deployment/package-helm/templates/query-scheduler-deployment.yaml, components/clp-package-utils/clp_package_utils/controller.py, tools/deployment/package/docker-compose-all.yaml
Query scheduler registers observable gauges for active jobs and outstanding tasks, and wires telemetry init/shutdown at process startup/exit. Deployment templates and controller mappings include telemetry env and OTEL service name.
Service Config and Deployment Propagation
components/clp-py-utils/clp_py_utils/clp_config.py, tools/deployment/package-helm/values.yaml, tools/deployment/package-helm/Chart.yaml, tools/deployment/package-helm/templates/api-server-deployment.yaml, components/clp-package-utils/clp_package_utils/controller.py, tools/deployment/package/docker-compose-all.yaml
Adds telemetry_update_interval_ms fields to component config models, maps them into package env vars, adds OTEL_METRIC_EXPORT_INTERVAL/OTEL_SERVICE_NAME to Helm templates and Docker Compose, and bumps Helm chart version.
Integration Test Infrastructure and Docs
integration-tests/tests/utils/classes.py, integration-tests/tests/utils/port_utils.py, integration-tests/tests/package_tests/utils/modes.py, docs/src/user-docs/reference-telemetry.md
Default subprocess timeout increased to 300s; port probe socket sets SO_REUSEADDR; package tests include OTEL collector service; docs add a configuration section for telemetry_update_interval_ms.

Sequence Diagram

sequenceDiagram
  participant WorkerProcess
  participant telemetry_init as init_telemetry()
  participant OTLPExporter
  participant MeterProvider
  participant MetricsAPI as metrics.get_meter(...)
  WorkerProcess->>telemetry_init: call on startup (Celery/scheduler)
  telemetry_init->>OTLPExporter: create OTLPMetricExporter
  telemetry_init->>MeterProvider: create MeterProvider with reader
  telemetry_init->>WorkerProcess: metrics.set_meter_provider(...)
  WorkerProcess->>MetricsAPI: meter = metrics.get_meter("compression-worker")
  WorkerProcess->>MetricsAPI: record counters/gauges (bytes/counts)
  WorkerProcess->>telemetry_init: register shutdown (atexit / worker signal)
  WorkerProcess->>telemetry_init: call shutdown_telemetry() on exit
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Suggested reviewers

  • junhaoliao
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 30.95% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly matches the main telemetry metrics addition across query and compression schedulers/workers.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In
`@components/job-orchestration/job_orchestration/executor/compress/compression_task.py`:
- Line 34: Replace the non-public import "from opentelemetry.api.metrics import
get_meter" with the public API import "from opentelemetry.metrics import
get_meter" in compression_task.py; make the same change in telemetry.py and
query/utils.py where opentelemetry.api.metrics is used so all modules import
get_meter (or other metrics symbols) from opentelemetry.metrics to avoid runtime
failures.

In `@components/job-orchestration/job_orchestration/executor/query/utils.py`:
- Around line 175-180: The code interpolates table_name into SQL for
db_cursor.execute which can lead to SQL injection if dataset influences the
name; replace the string interpolation by validating/whitelisting the
dataset/table name and using proper SQL identifier composition (e.g.,
psycopg2.sql.SQL and psycopg2.sql.Identifier) when calling db_cursor.execute, or
ensure get_archives_table_name only returns a safe validated identifier; update
the call site that currently uses table_name and archive_id so the query uses an
escaped Identifier for the table and a parameterized value for id, and add
validation logic around get_archives_table_name/dataset to enforce allowed
characters or a whitelist.
- Line 14: The import line using get_meter from opentelemetry.api.metrics is
invalid and prevents worker startup; change the import to pull get_meter from
the correct public module opentelemetry.metrics (i.e., replace the import of
get_meter in the module where it's used, e.g., in query/utils.py) so the code
imports get_meter from opentelemetry.metrics instead of
opentelemetry.api.metrics.

In `@components/job-orchestration/job_orchestration/executor/telemetry.py`:
- Line 14: Replace the unsupported import path "from opentelemetry.api.metrics
import set_meter_provider" with the supported metrics package by importing
set_meter_provider from "opentelemetry.metrics" instead; update the import in
telemetry.py so any references to set_meter_provider continue to work (no other
code changes required).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 3148caed-aa04-4c16-84a2-6938fb44db0f

📥 Commits

Reviewing files that changed from the base of the PR and between 137af2c and 19b4726.

📒 Files selected for processing (9)
  • components/job-orchestration/job_orchestration/executor/compress/celery.py
  • components/job-orchestration/job_orchestration/executor/compress/celery_compress.py
  • components/job-orchestration/job_orchestration/executor/compress/compression_task.py
  • components/job-orchestration/job_orchestration/executor/query/celery.py
  • components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py
  • components/job-orchestration/job_orchestration/executor/query/fs_search_task.py
  • components/job-orchestration/job_orchestration/executor/query/utils.py
  • components/job-orchestration/job_orchestration/executor/telemetry.py
  • components/job-orchestration/pyproject.toml

Comment thread components/job-orchestration/job_orchestration/executor/query/utils.py Outdated
Comment thread components/job-orchestration/job_orchestration/executor/query/utils.py Outdated
Comment thread components/job-orchestration/job_orchestration/executor/telemetry.py Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (1)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (1)

34-34: ⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Use the public OpenTelemetry metrics API path.

opentelemetry.api.metrics is not a valid public module; this will raise ModuleNotFoundError at import time. The public API is opentelemetry.metrics.

🐛 Proposed fix
-from opentelemetry.api.metrics import get_meter
+from opentelemetry.metrics import get_meter
In opentelemetry-python 1.20.0, is get_meter importable from opentelemetry.metrics or opentelemetry.api.metrics?
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@components/job-orchestration/job_orchestration/executor/compress/compression_task.py`
at line 34, The import uses a non-public path "opentelemetry.api.metrics" which
will raise ModuleNotFoundError; change the import to use the public API by
importing get_meter from "opentelemetry.metrics" (update the import statement
that currently references get_meter to use the public module name) so subsequent
uses of get_meter in this file (e.g., within compression_task.py) work with the
supported OpenTelemetry public API.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In
`@components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py`:
- Around line 26-27: The code imports the private symbol _bytes_output_counter
from utils which breaks encapsulation; add a public helper
emit_bytes_output(byte_count: int) to utils.py (matching the pattern of
emit_bytes_scanned) that increments/records bytes output and use that from
extract_stream_task.py instead of importing _bytes_output_counter directly;
update any references to _bytes_output_counter in functions like the extract
stream task to call utils.emit_bytes_output(...) and remove the
underscore-prefixed import.
- Around line 260-269: The telemetry (emit_bytes_scanned and
_bytes_output_counter.add) is emitted unconditionally after run_query_task which
records metrics even on failure; change the code to only emit metrics when
task_results.status == QueryTaskStatus.SUCCEEDED by wrapping the
emit_bytes_scanned(...) call and the
_bytes_output_counter.add(stdout_byte_count) call in an if-check against
QueryTaskStatus.SUCCEEDED (use the existing task_results.status), leaving other
failure handling unchanged so failed runs do not increment the bytes-scanned or
bytes-output counters.

In `@components/job-orchestration/job_orchestration/executor/telemetry.py`:
- Around line 65-66: The metrics_endpoint construction should respect
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT and avoid double slashes: replace the
current metrics_endpoint = f"{endpoint}/v1/metrics" with logic that first reads
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT (falling back to
OTEL_EXPORTER_OTLP_ENDPOINT/_DEFAULT_OTEL_ENDPOINT), and if building from the
generic endpoint, normalize by stripping any trailing slash from endpoint before
appending "/v1/metrics" (or use a safe URL join utility) so you never produce
"//v1/metrics"; update references to endpoint, metrics_endpoint and
_DEFAULT_OTEL_ENDPOINT accordingly.

In `@components/job-orchestration/pyproject.toml`:
- Around line 14-16: The opentelemetry exporter dependency
"opentelemetry-exporter-otlp-proto-http>=0.41b0" is using an incorrect/legacy
floor that doesn’t align with the API/SDK pins; update the dependency entry in
pyproject.toml to "opentelemetry-exporter-otlp-proto-http>=1.20.0" so it matches
"opentelemetry-api>=1.20.0" and "opentelemetry-sdk>=1.20.0" (or otherwise
document the intended mapping) to ensure consistent, real-version constraints.

---

Duplicate comments:
In
`@components/job-orchestration/job_orchestration/executor/compress/compression_task.py`:
- Line 34: The import uses a non-public path "opentelemetry.api.metrics" which
will raise ModuleNotFoundError; change the import to use the public API by
importing get_meter from "opentelemetry.metrics" (update the import statement
that currently references get_meter to use the public module name) so subsequent
uses of get_meter in this file (e.g., within compression_task.py) work with the
supported OpenTelemetry public API.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 311a0665-7cd1-46a7-ade5-b9487365c9ac

📥 Commits

Reviewing files that changed from the base of the PR and between 19b4726 and 2b4a4ef.

📒 Files selected for processing (9)
  • components/job-orchestration/job_orchestration/executor/compress/celery.py
  • components/job-orchestration/job_orchestration/executor/compress/celery_compress.py
  • components/job-orchestration/job_orchestration/executor/compress/compression_task.py
  • components/job-orchestration/job_orchestration/executor/query/celery.py
  • components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py
  • components/job-orchestration/job_orchestration/executor/query/fs_search_task.py
  • components/job-orchestration/job_orchestration/executor/query/utils.py
  • components/job-orchestration/job_orchestration/executor/telemetry.py
  • components/job-orchestration/pyproject.toml

Comment thread components/job-orchestration/job_orchestration/executor/telemetry.py Outdated
Comment thread components/job-orchestration/pyproject.toml Outdated
Nathan903 added 2 commits June 1, 2026 15:44
This reverts commit 2b4a4ef.
@Nathan903 Nathan903 requested a review from gibber9809 as a code owner June 1, 2026 20:30

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
components/core/src/clp_s/search/Output.hpp (1)

41-47: 🧹 Nitpick | 🔵 Trivial | 💤 Low value

Document the new out-parameter.

The method now takes bytes_output, but the comment block only describes the return value. Add a @param bytes_output line clarifying that it is accumulated (added to, not reset) with the total length of emitted messages.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@components/core/src/clp_s/search/Output.hpp` around lines 41 - 47, Update the
comment for the method filter(uint64_t& bytes_output) to document the new
out-parameter: add a `@param` bytes_output line stating that bytes_output is an
output accumulator which the method adds the total length of emitted messages to
(it is added to, not reset), and keep the existing `@return` description unchanged
so readers understand both the side-effect and the boolean success value.
components/core/src/clp/clo/clo.cpp (2)

606-613: ⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Stray } will break compilation.

The try/catch block already closes at Line 609. The extra } on Line 610 prematurely closes main(), leaving the subsequent initialization and argument-parsing code outside any function. This won't compile.

🐛 Proposed fix
     } catch (std::exception& e) {
         // NOTE: We can't log an exception if the logger couldn't be constructed
         return -1;
     }
-    }
     clp::Profiler::init();
     clp::TimestampPattern::init();
     clp::telemetry::init();
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@components/core/src/clp/clo/clo.cpp` around lines 606 - 613, There is a stray
closing brace that prematurely ends main() after the catch block; remove the
extra '}' that appears immediately after the catch (the one that closes main) so
the subsequent initialization calls (clp::Profiler::init(),
clp::TimestampPattern::init(), clp::telemetry::init()) remain inside main(), and
verify the try/catch and main() brace balance is correct around the existing try
{ ... } catch (std::exception& e) { ... } and main() function.

631-648: 🧹 Nitpick | 🔵 Trivial | 💤 Low value

Telemetry records counters even for non-search commands.

bytes_scanned/bytes_output remain 0 for ExtractIr, yet record_query_metrics(...) still emits clp.query.* counters afterward. Consider recording query metrics only when the Search command ran to avoid emitting zero-valued query data points on the extraction path.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@components/core/src/clp/clo/clo.cpp` around lines 631 - 648, The telemetry
call currently records bytes_scanned/bytes_output (which stay zero) for
non-Search commands; restrict
clp::telemetry::record_query_metrics(bytes_scanned, bytes_output) so it is only
invoked when the Search path ran successfully (i.e. inside the Command::Search
branch after a successful search() call), leaving clp::telemetry::shutdown()
where it is called for all commands; update the logic around the search branch
and the variables bytes_scanned/bytes_output to ensure metrics are emitted only
for search operations and not for extract_ir().
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@components/core/CMakeLists.txt`:
- Around line 349-353: Replace the unconditional
FetchContent_Declare(opentelemetry-cpp ...) that uses GIT_TAG v1.14.2 with a
reproducible, shallow fetch: pin the GIT repository to the tag's commit SHA
instead of the tag name and add GIT_SHALLOW TRUE to the FetchContent_Declare
call; also gate the entire opentelemetry-cpp FetchContent block behind a
dedicated option (e.g., ENABLE_OPENTELEMETRY or BUILD_TELEMETRY) in addition to
CLP_BUILD_EXECUTABLES so the dependency is only fetched when telemetry is
explicitly enabled.
- Around line 342-355: The FetchContent block is forcing BUILD_TESTING into the
cache which pollutes global state and disables CLP tests; instead, save/restore
BUILD_TESTING around the opentelemetry-cpp FetchContent: before
FetchContent_Declare/FetchContent_MakeAvailable capture whether BUILD_TESTING is
defined (e.g., save to a temporary variable like _OLD_BUILD_TESTING), then set
BUILD_TESTING OFF locally (without CACHE/FORCE) for the duration of
FetchContent_MakeAvailable(opentelemetry-cpp), and finally restore the original
BUILD_TESTING value (or undefine it) after FetchContent_MakeAvailable so
CLP_ENABLE_TESTS / CLP_BUILD_TESTING behavior is unchanged.

In `@components/core/src/clp_s/clp-s.cpp`:
- Line 321: Telemetry is initialized unconditionally via clp::telemetry::init()
but clp::telemetry::shutdown() (and record_query_metrics) are only called on the
successful search path, causing exporter flush/leak issues; fix by introducing
an RAII guard (e.g., a TelemetryGuard class or scoped object) constructed
immediately after clp::telemetry::init() that calls clp::telemetry::shutdown()
from its destructor, and ensure record_query_metrics is invoked before shutdown
where appropriate (for search path) so that the destructor always runs on every
early return or exception; update main flow to create this guard (referencing
clp::telemetry::init, TelemetryGuard, record_query_metrics, and
clp::telemetry::shutdown) so shutdown is guaranteed on all exit paths.

In `@components/core/src/clp/telemetry/telemetry.cpp`:
- Around line 62-66: The pointer guard in record_query_metrics uses a negation
check; change the conditional from if (!provider) to an explicit comparison if
(nullptr == provider) to follow the repo's pointer-guard style; update the check
immediately after retrieving provider in record_query_metrics to use nullptr ==
provider while leaving the surrounding logic unchanged.

In
`@components/job-orchestration/job_orchestration/executor/compress/celery_compress.py`:
- Around line 13-15: The worker_process_init_handler in celery_compress.py calls
init_worker_telemetry but that symbol is not importable due to a broken export
in telemetry_utils; fix by either correcting telemetry_utils to properly export
and return init_worker_telemetry (or the correctly named function) or update the
import in this module to the correct symbol name, and add a safe guard
(try/except around the import or the call inside worker_process_init_handler) so
a missing telemetry import won't crash the worker process; reference the
init_worker_telemetry function and the worker_process_init_handler signal
handler when making the change.

In `@components/job-orchestration/job_orchestration/executor/telemetry_utils.py`:
- Line 5: Replace the faulty import of get_meter from opentelemetry.api.metrics
with the correct module opentelemetry.metrics in both locations: update the
import in telemetry_utils.py (where get_meter is currently imported) and the
import in compression_task.py (the line importing get_meter at or near the top
of the file); ensure the import reads "from opentelemetry.metrics import
get_meter" so worker startup no longer raises ModuleNotFoundError.

---

Outside diff comments:
In `@components/core/src/clp_s/search/Output.hpp`:
- Around line 41-47: Update the comment for the method filter(uint64_t&
bytes_output) to document the new out-parameter: add a `@param` bytes_output line
stating that bytes_output is an output accumulator which the method adds the
total length of emitted messages to (it is added to, not reset), and keep the
existing `@return` description unchanged so readers understand both the
side-effect and the boolean success value.

In `@components/core/src/clp/clo/clo.cpp`:
- Around line 606-613: There is a stray closing brace that prematurely ends
main() after the catch block; remove the extra '}' that appears immediately
after the catch (the one that closes main) so the subsequent initialization
calls (clp::Profiler::init(), clp::TimestampPattern::init(),
clp::telemetry::init()) remain inside main(), and verify the try/catch and
main() brace balance is correct around the existing try { ... } catch
(std::exception& e) { ... } and main() function.
- Around line 631-648: The telemetry call currently records
bytes_scanned/bytes_output (which stay zero) for non-Search commands; restrict
clp::telemetry::record_query_metrics(bytes_scanned, bytes_output) so it is only
invoked when the Search path ran successfully (i.e. inside the Command::Search
branch after a successful search() call), leaving clp::telemetry::shutdown()
where it is called for all commands; update the logic around the search branch
and the variables bytes_scanned/bytes_output to ensure metrics are emitted only
for search operations and not for extract_ir().
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: f2055520-266f-4e06-94d9-de2a2788cb68

📥 Commits

Reviewing files that changed from the base of the PR and between 2b4a4ef and 9c4df4d.

📒 Files selected for processing (16)
  • components/clp-py-utils/clp_py_utils/telemetry.py
  • components/clp-py-utils/pyproject.toml
  • components/core/CMakeLists.txt
  • components/core/src/clp/clo/CMakeLists.txt
  • components/core/src/clp/clo/clo.cpp
  • components/core/src/clp/telemetry/telemetry.cpp
  • components/core/src/clp/telemetry/telemetry.hpp
  • components/core/src/clp_s/CMakeLists.txt
  • components/core/src/clp_s/clp-s.cpp
  • components/core/src/clp_s/search/Output.cpp
  • components/core/src/clp_s/search/Output.hpp
  • components/job-orchestration/job_orchestration/executor/compress/celery_compress.py
  • components/job-orchestration/job_orchestration/executor/compress/compression_task.py
  • components/job-orchestration/job_orchestration/executor/compress/spider_compress.py
  • components/job-orchestration/job_orchestration/executor/query/celery.py
  • components/job-orchestration/job_orchestration/executor/telemetry_utils.py
💤 Files with no reviewable changes (1)
  • components/job-orchestration/job_orchestration/executor/compress/compression_task.py

Comment thread components/core/CMakeLists.txt Outdated
Comment thread components/core/CMakeLists.txt Outdated
Comment thread components/core/src/clp_s/clp-s.cpp Outdated
Comment thread components/core/src/clp/telemetry/telemetry.cpp Outdated
Comment thread components/job-orchestration/job_orchestration/executor/telemetry_utils.py Outdated
Nathan903 added 2 commits June 2, 2026 16:15
* test

* add units

* inject envvars

* correct meter names

* touch cpp

* add cardinality

* py lint

* cpp lint

* fix

* scan for stats

* remove fallback

* remove unused deps

* add descriptions

* forgot break

Revert "try"

This reverts commit 9c4df4d.
@Nathan903 Nathan903 changed the title test feat(telemetry): Add OpenTelemetry query and compression worker stats Jun 5, 2026

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
components/core/src/clp/clo/clo.cpp (1)

422-429: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Count output bytes only after successful emission.

Line 422 increments total_bytes_output before output_handler->add_result(...) succeeds. If result delivery fails, emitted-byte metrics are overstated.

Proposed fix
-        total_bytes_output += decompressed_message.length();
         if (ErrorCode_Success
             != output_handler->add_result(
                     compressed_file.get_orig_path(),
                     compressed_file.get_orig_file_id_as_string(),
                     encoded_message,
                     decompressed_message
             ))
         {
             result = SearchFilesResult::ResultSendFailure;
             break;
         }
+        total_bytes_output += decompressed_message.length();
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@components/core/src/clp/clo/clo.cpp` around lines 422 - 429, The code
increments total_bytes_output before verifying delivery; change the flow so you
only add decompressed_message.length() to total_bytes_output after
output_handler->add_result(...) returns ErrorCode_Success. Locate the block
using total_bytes_output, output_handler->add_result(...), and compressed_file
(methods get_orig_path/get_orig_file_id_as_string) and move or conditionally
apply the increment so it occurs only on successful add_result.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@components/core/src/clp_s/search/Output.cpp`:
- Around line 146-147: Do not hardcode stats.bytes_scanned to zero in
Output.cpp; either populate json_msg["stats"]["bytes_scanned"] with the real
scan metric (e.g., use the existing bytes_scanned/scan_bytes variable or a
Stats/QueryStats object used when computing total_bytes_output) or omit the
field entirely if the metric is unavailable. Update the assignment that sets
json_msg["stats"]["bytes_scanned"] (paired with
json_msg["stats"]["bytes_output"]) so it reads the actual metric or is not
emitted, ensuring the query worker does not ingest a fabricated zero value.

In `@components/core/src/clp/clo/clo.cpp`:
- Line 459: The code increments total_bytes_scanned using
file_metadata_ix.get_num_uncompressed_bytes() before calling search_file(...),
which inflates stats.bytes_scanned when search_file returns
SearchFilesResult::OpenFailure; modify both occurrences (the additions at the
sites where total_bytes_scanned is incremented) so that you only add
file_metadata_ix.get_num_uncompressed_bytes() after search_file(...) completes
successfully (i.e., result != SearchFilesResult::OpenFailure) or guard the
increment with a check of the search result; update the logic that sets
stats.bytes_scanned to use total_bytes_scanned that only includes successfully
opened files.

In
`@components/job-orchestration/job_orchestration/executor/query/fs_search_task.py`:
- Around line 315-318: Guard against non-dict stats before accessing its keys:
when you extract stats = data["stats"] (in the block that currently checks
isinstance(data, dict) and "stats" in data), add a type check like
isinstance(stats, dict) and only call stats.get(...) if true; otherwise set
bytes_scanned and bytes_output to None (or a safe default) so accessing
bytes_scanned and bytes_output cannot raise when stats is not an object. Ensure
you update the code paths that use bytes_scanned/bytes_output accordingly.

---

Outside diff comments:
In `@components/core/src/clp/clo/clo.cpp`:
- Around line 422-429: The code increments total_bytes_output before verifying
delivery; change the flow so you only add decompressed_message.length() to
total_bytes_output after output_handler->add_result(...) returns
ErrorCode_Success. Locate the block using total_bytes_output,
output_handler->add_result(...), and compressed_file (methods
get_orig_path/get_orig_file_id_as_string) and move or conditionally apply the
increment so it occurs only on successful add_result.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: d846e0ce-b602-43fc-bf1f-11de43766da6

📥 Commits

Reviewing files that changed from the base of the PR and between 9c4df4d and 92974ae.

⛔ Files ignored due to path filters (5)
  • components/clp-mcp-server/uv.lock is excluded by !**/*.lock
  • components/clp-package-utils/uv.lock is excluded by !**/*.lock
  • components/clp-py-utils/uv.lock is excluded by !**/*.lock
  • components/job-orchestration/uv.lock is excluded by !**/*.lock
  • integration-tests/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (10)
  • components/clp-py-utils/clp_py_utils/telemetry.py
  • components/core/src/clp/clo/clo.cpp
  • components/core/src/clp_s/search/Output.cpp
  • components/job-orchestration/job_orchestration/executor/compress/celery.py
  • components/job-orchestration/job_orchestration/executor/compress/compression_task.py
  • components/job-orchestration/job_orchestration/executor/query/celery.py
  • components/job-orchestration/job_orchestration/executor/query/fs_search_task.py
  • tools/deployment/package-helm/templates/compression-worker-deployment.yaml
  • tools/deployment/package-helm/templates/query-worker-deployment.yaml
  • tools/deployment/package/docker-compose-all.yaml

Comment thread components/core/src/clp_s/search/Output.cpp Outdated
Comment thread components/core/src/clp/clo/clo.cpp Outdated
Comment thread components/job-orchestration/job_orchestration/executor/query/fs_search_task.py Outdated
fix

lint

Update Chart.yaml

lint

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
components/job-orchestration/job_orchestration/executor/query/fs_search_task.py (1)

312-315: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Guard stats shape before key access.

At Line 313-Line 315, stats.get(...) can raise if data["stats"] is not a dict, which can fail the task in telemetry parsing.

Proposed fix
-                if isinstance(data, dict) and "stats" in data:
-                    stats = data["stats"]
-                    bytes_scanned = stats.get("bytes_scanned", None)
-                    bytes_output = stats.get("bytes_output", None)
-                    break
+                if isinstance(data, dict) and "stats" in data:
+                    stats = data["stats"]
+                    if isinstance(stats, dict):
+                        bytes_scanned = stats.get("bytes_scanned")
+                        bytes_output = stats.get("bytes_output")
+                        break
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@components/job-orchestration/job_orchestration/executor/query/fs_search_task.py`
around lines 312 - 315, The current code assumes data["stats"] is a dict and
calls stats.get(...), which will raise if stats is not a mapping; update the
block that reads data and stats (the variables data, stats, bytes_scanned,
bytes_output in fs_search_task.py) to first verify stats is a dict (e.g., if
isinstance(stats, dict)) before calling stats.get, otherwise set bytes_scanned
and bytes_output to None (or safe defaults); ensure the guard is applied where
data is checked ("if isinstance(data, dict) and 'stats' in data") so you extract
stats = data["stats"] then check isinstance(stats, dict) before accessing keys.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@integration-tests/tests/utils/fs_validation.py`:
- Around line 44-55: The current per-line JSON parsing in
is_json_file_structurally_equal (using json_fp and building lines[]) breaks for
pretty-printed/multi-line JSON; instead read the entire file content, try
json.loads(content) and on success append json.dumps(obj, separators=(",", ":"),
sort_keys=True) to lines for normalization; if full-file parse fails, fall back
to treating the file as JSONL by iterating non-empty lines and normalizing each
via json.loads as before (or keeping raw line on JSONDecodeError) so both single
JSON documents and JSONL files are handled.

---

Duplicate comments:
In
`@components/job-orchestration/job_orchestration/executor/query/fs_search_task.py`:
- Around line 312-315: The current code assumes data["stats"] is a dict and
calls stats.get(...), which will raise if stats is not a mapping; update the
block that reads data and stats (the variables data, stats, bytes_scanned,
bytes_output in fs_search_task.py) to first verify stats is a dict (e.g., if
isinstance(stats, dict)) before calling stats.get, otherwise set bytes_scanned
and bytes_output to None (or safe defaults); ensure the guard is applied where
data is checked ("if isinstance(data, dict) and 'stats' in data") so you extract
stats = data["stats"] then check isinstance(stats, dict) before accessing keys.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: f74fd339-f913-4b59-a949-3cda8a38cee8

📥 Commits

Reviewing files that changed from the base of the PR and between 92974ae and f8189fb.

📒 Files selected for processing (9)
  • components/core/CMakeLists.txt
  • components/job-orchestration/job_orchestration/executor/compress/compression_task.py
  • components/job-orchestration/job_orchestration/executor/query/fs_search_task.py
  • integration-tests/tests/binary_tests/test_log_converter.py
  • integration-tests/tests/package_tests/utils/modes.py
  • integration-tests/tests/utils/classes.py
  • integration-tests/tests/utils/fs_validation.py
  • integration-tests/tests/utils/port_utils.py
  • tools/deployment/package-helm/Chart.yaml

Comment thread integration-tests/tests/utils/fs_validation.py Outdated

@sitaowang1998 sitaowang1998 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hasn't gone into details, but I am concerned with the overall method we use to collect telemetry.
Changing the stdout is a significant breaking change, which probably breaks a lot of things. Since we do need IPC, how about creating separate pipes for metrics so we don't pollute the stdout? @junhaoliao What's your opinion on this?

@gibber9809

Copy link
Copy Markdown
Contributor

Hasn't gone into details, but I am concerned with the overall method we use to collect telemetry. Changing the stdout is a significant breaking change, which probably breaks a lot of things. Since we do need IPC, how about creating separate pipes for metrics so we don't pollute the stdout? @junhaoliao What's your opinion on this?

I think I'd personally prefer just properly integrating open telemetry into the c++ code. Writing to stdout/stderr unconditionally definitely causes problems here (since we already have output to stdout/stderr), and even using a pipe is probably a temporary solution that we will probably want to change once we integrate with spider.

@coderabbitai

coderabbitai Bot commented Jun 8, 2026

Copy link
Copy Markdown
Contributor

Note

Unit test generation is a public access feature. Expect some limitations and changes as we gather feedback and continue to improve it.


Generating unit tests... This may take up to 20 minutes.

@Nathan903 Nathan903 changed the title feat(telemetry): Add OpenTelemetry query and compression worker stats feat(telemetry)!: Add OpenTelemetry query and compression scheduler/workers metrics. Jun 8, 2026

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
components/clp-py-utils/clp_py_utils/telemetry.py (2)

19-20: 🧹 Nitpick | 🔵 Trivial | 💤 Low value

Update the docstring to reflect actual behaviour.

The docstring states "this function does nothing" when telemetry is disabled, but Line 25 actually sets a NoOpMeterProvider. Consider revising to "this function installs a no-op meter provider" for accuracy.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@components/clp-py-utils/clp_py_utils/telemetry.py` around lines 19 - 20,
Update the function docstring in telemetry.py to accurately describe behavior
when telemetry is disabled: instead of saying "this function does nothing",
state that the function installs a NoOpMeterProvider (i.e., a no-op meter
provider) and returns/continues without active telemetry. Mention the
NoOpMeterProvider install performed in the function where the provider is set
(the block that assigns NoOpMeterProvider) so readers know the function still
configures a no-op provider rather than doing nothing.

46-52: 🧹 Nitpick | 🔵 Trivial

Use timeout_millis explicitly for OpenTelemetry force_flush()/shutdown() in shutdown_telemetry()

MeterProvider.force_flush() and MeterProvider.shutdown() already support timeout_millis (defaults: 10s and 30s), so these calls shouldn’t block indefinitely. Passing explicit timeout_millis values would make the intended shutdown budget clear.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@components/clp-py-utils/clp_py_utils/telemetry.py` around lines 46 - 52, In
shutdown_telemetry(), explicitly pass timeout_millis to the OpenTelemetry
provider calls so they don't block indefinitely: call
provider.force_flush(timeout_millis=10000) (or another chosen ms value) and
provider.shutdown(timeout_millis=30000) (or chosen ms), and keep the existing
try/except/logger.warning handling around those calls (referencing
provider.force_flush(), provider.shutdown(), shutdown_telemetry(), and
logger.warning).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@integration-tests/tests/package_tests/utils/modes.py`:
- Line 40: Add an OTEL_COLLECTOR_COMPONENT_NAME constant and replace the literal
"otel-collector" with
_to_docker_compose_service_name(OTEL_COLLECTOR_COMPONENT_NAME) to match the
existing pattern; define the constant alongside other component constants (or
import it from clp_py_utils.clp_config) and update any reference in modes.py
where the literal is used so the service name is derived via the
_to_docker_compose_service_name function.

---

Outside diff comments:
In `@components/clp-py-utils/clp_py_utils/telemetry.py`:
- Around line 19-20: Update the function docstring in telemetry.py to accurately
describe behavior when telemetry is disabled: instead of saying "this function
does nothing", state that the function installs a NoOpMeterProvider (i.e., a
no-op meter provider) and returns/continues without active telemetry. Mention
the NoOpMeterProvider install performed in the function where the provider is
set (the block that assigns NoOpMeterProvider) so readers know the function
still configures a no-op provider rather than doing nothing.
- Around line 46-52: In shutdown_telemetry(), explicitly pass timeout_millis to
the OpenTelemetry provider calls so they don't block indefinitely: call
provider.force_flush(timeout_millis=10000) (or another chosen ms value) and
provider.shutdown(timeout_millis=30000) (or chosen ms), and keep the existing
try/except/logger.warning handling around those calls (referencing
provider.force_flush(), provider.shutdown(), shutdown_telemetry(), and
logger.warning).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 5af9bdf9-1462-4c21-bad6-9f0cd45469c5

📥 Commits

Reviewing files that changed from the base of the PR and between 92974ae and b0d6046.

📒 Files selected for processing (12)
  • components/clp-py-utils/clp_py_utils/telemetry.py
  • components/job-orchestration/job_orchestration/executor/compress/compression_task.py
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
  • components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
  • integration-tests/tests/package_tests/utils/modes.py
  • integration-tests/tests/utils/classes.py
  • integration-tests/tests/utils/port_utils.py
  • tools/deployment/package-helm/Chart.yaml
  • tools/deployment/package-helm/templates/compression-scheduler-deployment.yaml
  • tools/deployment/package-helm/templates/compression-worker-deployment.yaml
  • tools/deployment/package-helm/templates/query-scheduler-deployment.yaml
  • tools/deployment/package/docker-compose-all.yaml

Comment thread integration-tests/tests/package_tests/utils/modes.py Outdated
@Nathan903 Nathan903 requested a review from junhaoliao June 15, 2026 19:10
@gibber9809

Copy link
Copy Markdown
Contributor

Ive re-ran the validations and everything works.

I found a possible bug:

running

sbin/start-clp.sh
sbin/compress.sh ~/hive-24hr-ts/hive-24hr-ts.jsonl
sbin/search.sh "ERROR"

gives 3 outstanding tasks forever.

image to reproduce it run:
nathanhung@baker18:/home/nathanhung/clp/build/clp-package$ CLP_HOME=$PWD docker compose -f docker-compose.runtime.yaml run --rm clp-runtime python3 -c '
import contextlib
from clp_py_utils.clp_config import CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH
from clp_package_utils.general import get_clp_home, load_config_file, validate_and_load_db_credentials_file
from clp_py_utils.sql_adapter import SqlAdapter
home = get_clp_home()
cfg = load_config_file(home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH)
validate_and_load_db_credentials_file(cfg, home, False)
with contextlib.closing(SqlAdapter(cfg.database).create_connection(True)) as db, contextlib.closing(db.cursor()) as cursor:
    cursor.execute("SELECT job_id, status FROM query_tasks")
    print("Database Tasks:", cursor.fetchall())
'
Container clp-package-clp-runtime-run-de5eb84fc00b Creating
Container clp-package-clp-runtime-run-de5eb84fc00b Created
Database Tasks: [(1, 0), (1, 0), (1, 0)]
nathanhung@baker18:/home/nathanhung/clp/build/clp-package$ cat var/log/query_scheduler/query_scheduler.log

2026-06-13 06:27:49,954 search-job-handler [INFO] Connected to archive database database:3306.
2026-06-13 06:27:49,954 search-job-handler [INFO] query_scheduler started.
2026-06-13 06:34:52,072 search-job-handler [INFO] Dispatched job 1 with 3 archives to search.
2026-06-13 06:34:52,584 search-job-handler [INFO] Search task job-1-task-1 succeeded in 0.065003 second(s).
2026-06-13 06:34:52,584 search-job-handler [INFO] Search task job-1-task-2 succeeded in 0.064874 second(s).
2026-06-13 06:34:52,584 search-job-handler [INFO] Search task job-1-task-3 succeeded in 0.532274 second(s).
2026-06-13 06:34:52,627 search-job-handler [INFO] Completed job 1.

For what it's worth, I suspect that this was related to the issue fixed by #2318 where forked dispatch workers inherit some things from the parent process. If these gauges were inherited across a fork that had non-empty active jobs it might have caused the behaviour you see here. Since that fix has been merged into this branch now, could you try reproducing this issue again and see if it still happens?

@gibber9809 gibber9809 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial review for compression scheduler +worker changes + query scheduler changes. One nit, but otherwise looks good to me.

Comment thread components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py Outdated
@Nathan903

Copy link
Copy Markdown
Contributor Author

For what it's worth, I suspect that this was related to the issue fixed by #2318 where forked dispatch workers inherit some things from the parent process. If these gauges were inherited across a fork that had non-empty active jobs it might have caused the behaviour you see here. Since that fix has been merged into this branch now, could you try reproducing this issue again and see if it still happens?

I've re-ran the test and now the telemetry sent to the telemetry server is correct. However, running the script for reproducing still gives the same output. The query_tasks table still shows pending status = 0 even after a success. Functionally everything is working, so this might not be a problem.

@gibber9809

@Nathan903 Nathan903 requested a review from gibber9809 June 22, 2026 16:00
@junhaoliao junhaoliao changed the title feat(telemetry): Add OpenTelemetry query and compression scheduler/workers metrics. feat(telemetry): Add OpenTelemetry query and compression scheduler/workers metrics (fixes #2346). Jun 23, 2026

@junhaoliao junhaoliao left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good job. there are some data race and missing key issues. other ones are just nitpicking

  • Error-path return shape may cause exceptions

    • Worker failure paths return only error_message, but later code unconditionally reads total_uncompressed_size and total_compressed_size.
    • Fix by returning zero values for both size fields on all error paths.
  • Potential data races in metric callbacks

    • Observable metric callbacks may run in a background thread while scheduler state dictionaries are being mutated.
    • Fix by snapshotting scheduled_jobs.values() / active_jobs.values() inside a try, catching RuntimeError, and computing counts from the snapshot.
  • Out-of-scope integration-test fixes

    • Port-check and timeout changes appear unrelated to the telemetry PR.
    • Revert them and track the port occupancy issue separately.
  • Telemetry config duplication

    • Telemetry-disable env-var logic is duplicated.
    • Add a shared helper such as clp_py_utils.telemetry_config.is_telemetry_disabled_by_env() and use it from both call sites.
  • Metric instrument semantics

    • Active jobs and outstanding tasks are additive but not monotonically increasing.
    • Use asynchronous up-down counters instead of gauges.
  • OpenTelemetry instrumentation scope

    • Meters should use the module name as the instrumentation scope.
    • Replace hardcoded names like "compression-worker" / "compression-scheduler" / "query-scheduler" with __name__.
  • Metric units

    • Workload metrics should declare units.
    • Use unit="{job}" for active jobs and unit="{task}" for outstanding tasks.
  • Unused callback parameters

    • Callback parameters named options are unused.
    • Rename them to _options.
  • Docs alignment

    • Update docs to mention Python services using OTEL_SERVICE_NAME, list supported telemetry components, and describe the workload metrics as up-down counters.
  • Lint / formatting / config ordering

    • Some comments, docstrings, and YAML examples need cleanup.
    • Use a one-line docstring where requested and sort config keys consistently.

Comment thread components/clp-py-utils/clp_py_utils/telemetry.py Outdated
Comment thread components/clp-py-utils/clp_py_utils/telemetry.py Outdated
Comment thread components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py Outdated
Comment thread components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py Outdated
Comment thread tools/deployment/package-helm/values.yaml Outdated
Comment thread tools/deployment/package-helm/values.yaml Outdated
Comment thread tools/deployment/package-helm/values.yaml Outdated
Comment thread tools/deployment/package-helm/values.yaml Outdated
Nathan903 and others added 2 commits June 24, 2026 02:37
Co-authored-by: Junhao Liao <junhao@junhao.ca>
Apply suggestions from code review

fix doc

Co-Authored-By: Junhao Liao <junhao@junhao.ca>
Nathan903 and others added 6 commits June 24, 2026 03:01
Co-authored-by: Junhao Liao <junhao@junhao.ca>
Co-authored-by: Junhao Liao <junhao@junhao.ca>
Co-authored-by: Junhao Liao <junhao@junhao.ca>
@Nathan903 Nathan903 requested a review from junhaoliao June 24, 2026 07:45

@junhaoliao junhaoliao left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the title, how about:

feat(telemetry): Add metrics for query and compression schedulers/workers (fixes #2346).

@Nathan903 Nathan903 changed the title feat(telemetry): Add OpenTelemetry query and compression scheduler/workers metrics (fixes #2346). feat(telemetry): Add metrics for query and compression schedulers/workers (fixes #2346). Jun 24, 2026
@Nathan903

Copy link
Copy Markdown
Contributor Author

for the title, how about:

feat(telemetry): Add metrics for query and compression schedulers/workers (fixes #2346).

done. if everything is all good, I will go ahead and merge.

@Nathan903 Nathan903 merged commit 63fa527 into y-scope:main Jun 24, 2026
31 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants