Skip to content

Commit 9cee66d

Browse files
mihowclaude
andcommitted
docs(jobs): address PR #1261 review feedback
Response to reviewer comments on the async_api concurrency tuning PR: - views.py: mark the three `ids_only=1` overrides (random ordering, paginator default_limit=1, heartbeat dispatch) as temporary — they exist only because list() is doubling as a claim-next-job endpoint. A dedicated `/next` action is tracked as #1265; comments carry a hard-coded removal target of 2026-04-24 to keep the expiration visible. - views.py: rewrite the `_mark_async_services_seen_for_pipelines` and `_mark_async_services_seen_for_project` docstrings so the contract is obvious — they are Redis-throttled wrappers around the matching celery task. No DB work in the wrapper. - tasks.py: add TODO on both `update_async_services_seen_*` tasks pointing at #1194 (client-ID auth), so the "one poller falsely marks its peers live" invariant-break is visible in source, not just in the PR body. - tasks.py: fix the stale comment in `_update_job_progress` — the previous wording claimed the narrowed save() avoids overwriting `updated_at`, but `updated_at` is in `update_fields` intentionally (Django skips auto_now when update_fields is provided). - config/settings/base.py: add a one-line plain-language purpose for `JOB_LOG_PERSIST_ENABLED` so the flag reads at-a-glance. No behavior changes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 695f761 commit 9cee66d

3 files changed

Lines changed: 67 additions & 27 deletions

File tree

ami/jobs/tasks.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ def update_async_services_seen_for_pipelines(pipeline_slugs: list[str]) -> None:
9191
pipelines across many projects), so scope the heartbeat by the pipelines it
9292
asked about. Marks every async ProcessingService linked to any of those
9393
pipelines as seen.
94+
95+
TODO: once #1194 (client-ID / application-token auth) lands, scope this
96+
update to the specific calling ProcessingService rather than every service
97+
matching the slugs. Currently one poller's heartbeat falsely marks its
98+
peers live.
9499
"""
95100
from ami.ml.models import ProcessingService # avoid circular import
96101

@@ -114,10 +119,14 @@ def update_async_services_seen_for_project(project_id: int) -> None:
114119
"""
115120
Heartbeat for idle worker polls on ``GET /api/v2/jobs/?ids_only=1``.
116121
117-
Unlike ``update_pipeline_pull_services_seen`` — which is pipeline-scoped and
118-
only fires when a worker hits /tasks/ or /result/ for an active job — this
119-
marks every async processing service attached to the polling project as
120-
seen. The list endpoint has no pipeline context, so scope is the project.
122+
Fallback path used only when the request carries ``?project_id=`` without
123+
``pipeline__slug__in`` — the ADC worker does not currently send this shape,
124+
so in practice the pipeline-slug task above is the one that fires.
125+
126+
TODO: once #1194 (client-ID / application-token auth) lands, scope this
127+
update to the specific calling ProcessingService rather than every async
128+
service attached to the project. Currently one poller's heartbeat falsely
129+
marks its peers live.
121130
"""
122131
from ami.ml.models import ProcessingService # avoid circular import
123132

@@ -629,11 +638,12 @@ def _update_job_progress(
629638
job.finished_at = datetime.datetime.now() # Use naive datetime in local time
630639
job.logger.info(f"Updated job {job_id} progress in stage '{stage}' to {progress_percentage*100}%")
631640
# Narrow the write to the fields we actually mutated. Without this, a full
632-
# save() would also overwrite `updated_at`, `logs`, and any other field on
633-
# the instance fetched at the top of this block — so a concurrent worker's
634-
# append to `progress.errors` (via `_reconcile_lost_images`) or log line
635-
# (via JobLogHandler) could be clobbered by a stale read-modify-write.
636-
# See PR #1261 review feedback.
641+
# save() would overwrite `logs` and any other field on the instance
642+
# fetched at the top of this block — so a concurrent worker's append to
643+
# `progress.errors` (via `_reconcile_lost_images`) or log line (via
644+
# JobLogHandler) could be clobbered by a stale read-modify-write.
645+
# `updated_at` is listed explicitly because Django skips `auto_now` bumps
646+
# when `update_fields` is provided. See PR #1261 review feedback.
637647
job.save(update_fields=["progress", "status", "finished_at", "updated_at"])
638648
try:
639649
_log_job_throughput(job, stage)

ami/jobs/views.py

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,16 @@ def _actor_log_context(request) -> tuple[str, str | None]:
6060

6161
def _mark_async_services_seen_for_pipelines(pipeline_slugs: tuple[str, ...]) -> None:
6262
"""
63-
Heartbeat for idle worker polls that send ``pipeline__slug__in=...`` but no
64-
``project_id`` — the real ADC worker shape, where one worker may serve
65-
pipelines across many projects and has no single project to nominate.
66-
67-
Redis throttle keyed on the sorted slug set so concurrent pollers for the
68-
same pipelines share a single dispatch per window.
63+
Redis-throttled wrapper around the ``update_async_services_seen_for_pipelines``
64+
celery task. The wrapper does no DB work itself — it gates dispatch so at
65+
most one heartbeat is enqueued per sorted slug set per
66+
``HEARTBEAT_THROTTLE_SECONDS`` window (currently 30s), keeping the HTTP
67+
request path cheap under concurrent polling.
68+
69+
Called from the ``?ids_only=1`` branch of ``JobViewSet.list()`` — the real
70+
ADC worker shape, which sends ``pipeline__slug__in=<slugs>`` and no
71+
``project_id`` (one worker may serve pipelines across many projects and
72+
has no single project to nominate).
6973
"""
7074
if not pipeline_slugs:
7175
return
@@ -80,17 +84,18 @@ def _mark_async_services_seen_for_pipelines(pipeline_slugs: tuple[str, ...]) ->
8084

8185
def _mark_async_services_seen_for_project(project_id: int) -> None:
8286
"""
83-
Heartbeat for idle worker polls on ``GET /api/v2/jobs/?ids_only=1``.
84-
85-
The pipeline-scoped heartbeat in ``_mark_pipeline_pull_services_seen`` only
86-
fires when a worker hits /tasks/ or /result/ on an active job; workers idling
87-
on the list endpoint between jobs had no heartbeat path at all, so their
88-
``last_seen`` would age out of ``PROCESSING_SERVICE_LAST_SEEN_MAX`` and the
89-
UI would flip them to offline despite being actively online.
90-
91-
Scope: marks every async service attached to the polling project. The list
92-
endpoint has no pipeline context to narrow by. Once application-token auth
93-
lands (PR #1117), this should be scoped to the specific calling service.
87+
Redis-throttled wrapper around ``update_async_services_seen_for_project``.
88+
Same shape as ``_mark_async_services_seen_for_pipelines`` above — gates
89+
celery dispatch to at most one per-project enqueue per
90+
``HEARTBEAT_THROTTLE_SECONDS`` window — but keyed by project id for
91+
callers that send ``?project_id=`` without ``pipeline__slug__in``.
92+
93+
The ADC worker does not currently use this shape, so this is a fallback.
94+
Background on why idle-poll heartbeats exist at all: the other heartbeat
95+
(``_mark_pipeline_pull_services_seen``) only fires from ``/tasks/`` and
96+
``/result/`` — i.e., from workers with active work — so a worker sitting
97+
on ``GET /jobs/?ids_only=1`` between jobs would otherwise age past
98+
``PROCESSING_SERVICE_LAST_SEEN_MAX`` and flip to offline in the UI.
9499
"""
95100
cache_key = f"heartbeat:list:project:{project_id}"
96101
if not cache.add(cache_key, 1, timeout=HEARTBEAT_THROTTLE_SECONDS):
@@ -302,6 +307,7 @@ def get_queryset(self) -> QuerySet:
302307
status=JobState.failed_states(),
303308
updated_at__lt=cutoff_datetime,
304309
)
310+
# ⚠️ TEMPORARY HACK — remove by 2026-04-24.
305311
# Worker-polling call path (`ids_only=1`): randomize order so concurrent
306312
# pollers don't all converge on the same head-of-queue job. An
307313
# `updated_at`-based sort has a degenerate case at startup — freshly
@@ -310,18 +316,32 @@ def get_queryset(self) -> QuerySet:
310316
# ordering gives probabilistic disjoint assignment without writing a
311317
# poll-stamp column. Combined with `limit=1` below, each poll is an
312318
# independent "pick any unfinished job" draw.
319+
#
320+
# The whole `ids_only=1` branch (this ordering override, the paginator
321+
# override in `paginator` below, the heartbeat dispatch in `list()`)
322+
# exists because the ADC worker currently repurposes this list endpoint
323+
# as a claim-next-job call. Correct shape is a dedicated `/next` action
324+
# (tracked as #1265). Once `/next` ships
325+
# and ADC is migrated, delete this `order_by("?")` override along with
326+
# the paginator override and the list() heartbeat branch.
313327
if self.action == "list" and url_boolean_param(self.request, "ids_only", default=False):
314328
jobs = jobs.order_by("?")
315329
return jobs
316330

317331
@property
318332
def paginator(self):
333+
# ⚠️ TEMPORARY HACK — remove by 2026-04-24.
319334
# Treat `?ids_only=1` as a pop()-style handoff ("what job is next?")
320335
# rather than a list() dump: default to one job per response unless the
321336
# caller explicitly asks for a batch via ?limit=N or ?page_size=N.
322337
# Concurrent pollers drain a cached list serially and starve later jobs;
323-
# forcing a re-poll per job lets the `updated_at` fairness sort rotate
338+
# forcing a re-poll per job lets the random-shuffle fairness sort rotate
324339
# work across jobs every iteration. No ADC-side change required.
340+
#
341+
# This override exists only because `list(ids_only=True)` is being used
342+
# as a claim-next-job call. Replace with a dedicated `/next` action
343+
# (tracked as #1265); once ADC is migrated,
344+
# drop this override so the list endpoint goes back to normal pagination.
325345
if not hasattr(self, "_paginator"):
326346
if (
327347
self.action == "list"
@@ -344,11 +364,18 @@ def paginator(self):
344364
]
345365
)
346366
def list(self, request, *args, **kwargs):
367+
# ⚠️ TEMPORARY HACK — remove by 2026-04-24.
347368
# Worker-polling call path: record heartbeat for async processing services.
348369
# The real ADC worker request carries ``pipeline__slug__in=...`` and no
349370
# project_id, so prefer the pipeline-slug scope when those slugs are
350371
# present; fall back to project scope for callers that pass ?project_id=.
351372
# Throttled via Redis so concurrent pollers don't churn the DB/broker.
373+
#
374+
# This heartbeat branch lives on `list()` only because `list(ids_only=1)`
375+
# is doubling as the worker's claim-next-job endpoint. Once a dedicated
376+
# `/next` action ships (tracked as #1265)
377+
# and ADC is migrated to it, move the heartbeat to that action and
378+
# delete this branch — `list()` should go back to being a plain list.
352379
if url_boolean_param(request, "ids_only", default=False):
353380
pipeline_slugs_raw = request.query_params.get("pipeline__slug__in")
354381
if pipeline_slugs_raw:

config/settings/base.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,9 @@ def _celery_result_backend_url(redis_url):
576576
DEFAULT_INCLUDE_TAXA = env.list("DEFAULT_INCLUDE_TAXA", default=[]) # type: ignore[no-untyped-call]
577577
DEFAULT_EXCLUDE_TAXA = env.list("DEFAULT_EXCLUDE_TAXA", default=[]) # type: ignore[no-untyped-call]
578578

579+
# Purpose: master switch for per-job logs in the database and UI.
580+
# Set to False to disable them as a contention escape hatch.
581+
#
579582
# When True, ``JobLogHandler.emit`` persists each log line to ``jobs_job.logs``
580583
# (JSONB column) so the per-job log feed in the UI stays populated. When False,
581584
# log lines go to the container stdout logger only — used as an escape hatch

0 commit comments

Comments
 (0)