Skip to content

Commit aed0908

Browse files
mihowclaude
andcommitted
refactor(minimal-worker): address PR review — typed schemas, env file, simpler loop
Apply feedback from PR #1252 review: - Consolidate schemas into `processing_services/minimal/api/schemas.py`. v1 push and v2 worker now share a single source of truth; the separate `worker/schemas.py` mirror is removed. Client/runner/register/loop use real Pydantic types in signatures (PipelineProcessingTask, PipelineTaskResult, ProcessingServiceClientInfo, AsyncPipelineRegistrationRequest). - Move defaults into `processing_services/minimal/.env.dev`. Strip the hard-coded `os.environ.get(..., "<default>")` fallbacks in register.py and worker_main.py; replace with `os.environ[...]`. The inline environment block in docker-compose.yml becomes `env_file:`. - Replace the loop's job→slug reverse-lookup with per-slug iteration: the outer loop variable IS the slug, so `_slug_for_job` is gone. - Fix register.py docstring: the PS is identified by the Authorization header's user (main) or the API key (#1194), not by `processing_service_name`. That field just labels the DB row `get_or_create(name=...)` lands on. - Import pipelines directly in register.py instead of HTTP-GETting /info from the co-located FastAPI. Removes the FastAPI readiness wait entirely, and means register.py works in MODE=worker (where FastAPI isn't running at all). Co-Authored-By: Claude <noreply@anthropic.com>
1 parent c06fc62 commit aed0908

10 files changed

Lines changed: 325 additions & 280 deletions

File tree

processing_services/docker-compose.yml

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,11 @@ services:
1010
- minio:host-gateway
1111
networks:
1212
- antenna_network
13-
# MODE=api+worker runs both the v1 FastAPI service and the v2 worker
14-
# poll loop in one container. start.sh handles sequencing: FastAPI first
15-
# (for register.py to read /info), then register.py, then worker loop.
16-
# For pure v1 testing, set MODE=api. For pure v2, set MODE=worker.
17-
environment:
18-
MODE: api+worker
19-
ANTENNA_API_URL: http://django:8000
20-
ANTENNA_DEFAULT_PROJECT_NAME: Default Project
21-
ANTENNA_USER: antenna@insectai.org
22-
ANTENNA_PASSWORD: localadmin
23-
ANTENNA_SERVICE_NAME: minimal-worker-dev
13+
# start.sh dispatches on MODE: api | worker | api+worker. For api+worker,
14+
# start.sh runs FastAPI first (for register.py to read /info), then
15+
# register.py, then the worker poll loop.
16+
env_file:
17+
- ./minimal/.env.dev
2418

2519
ml_backend_example:
2620
build:
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Dev defaults for the minimal processing service container.
2+
#
3+
# Loaded via `env_file:` in processing_services/docker-compose.yml when you run
4+
# the stub locally against a fresh Antenna stack. These are the same creds
5+
# seeded by ami/main/management/commands/ensure_default_project.py and
6+
# .envs/.local/.django — intentionally not secret.
7+
#
8+
# Copy to .env.minimal-worker and edit to point at a different Antenna
9+
# deployment. CI loads this same file.
10+
11+
# ── Container mode ──────────────────────────────────────────────────────────
12+
# api FastAPI only (default in Dockerfile; used by docker-compose.ci.yml)
13+
# worker poll loop only
14+
# api+worker both, plus register.py on boot — used by local dev compose below
15+
MODE=api+worker
16+
17+
# ── Antenna target ─────────────────────────────────────────────────────────
18+
ANTENNA_API_URL=http://django:8000
19+
ANTENNA_DEFAULT_PROJECT_NAME=Default Project
20+
ANTENNA_SERVICE_NAME=minimal-worker-dev
21+
22+
# ── Auth (user/password fallback; overridden by ANTENNA_API_KEY / _AUTH_TOKEN if set) ──
23+
ANTENNA_USER=antenna@insectai.org
24+
ANTENNA_PASSWORD=localadmin
25+
26+
# ── Worker tuning ──────────────────────────────────────────────────────────
27+
WORKER_POLL_INTERVAL_SECONDS=2.0
28+
WORKER_BATCH_SIZE=4
29+
WORKER_REQUEST_TIMEOUT_SECONDS=30

processing_services/minimal/api/schemas.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,3 +291,67 @@ class ProcessingServiceInfoResponse(pydantic.BaseModel):
291291
# default=list,
292292
# examples=[RANDOM_BINARY_CLASSIFIER],
293293
# )
294+
295+
296+
# -----------------------------------------------------------------------------
297+
# v2 async worker schemas
298+
#
299+
# Mirror of the relevant classes in ami/ml/schemas.py. Kept in this package so
300+
# both the v1 FastAPI side (push / `/process`) and the v2 worker side
301+
# (pull / polling) share a single source of truth. When Antenna evolves the
302+
# canonical schemas, keep these in sync — field-for-field parity matters for
303+
# correct JSON round-trips.
304+
# -----------------------------------------------------------------------------
305+
306+
307+
class PipelineResultsError(pydantic.BaseModel):
308+
"""Error result when pipeline processing fails for a single task."""
309+
310+
error: str
311+
image_id: str | None = None
312+
313+
314+
class PipelineProcessingTask(pydantic.BaseModel):
315+
"""A single image task reserved from the async job queue.
316+
317+
`reply_subject` is the NATS subject Antenna ACKs on when the result comes
318+
back — the worker must round-trip it verbatim in the matching
319+
PipelineTaskResult.
320+
"""
321+
322+
id: str
323+
image_id: str
324+
image_url: str
325+
reply_subject: str | None = None
326+
327+
328+
class TasksResponse(pydantic.BaseModel):
329+
"""Response body of `POST /api/v2/jobs/{id}/tasks/`."""
330+
331+
tasks: list[PipelineProcessingTask] = []
332+
333+
334+
class PipelineTaskResult(pydantic.BaseModel):
335+
"""Result of processing a single PipelineProcessingTask."""
336+
337+
reply_subject: str
338+
result: PipelineResultsResponse | PipelineResultsError
339+
340+
341+
class ProcessingServiceClientInfo(pydantic.BaseModel):
342+
"""Identity metadata sent by a processing service worker.
343+
344+
A ProcessingService in the DB may have multiple physical workers running
345+
simultaneously; this lets the server distinguish them. Fields are
346+
intentionally open — processing services can send any useful key/value
347+
pairs (hostname, software version, pod name, etc).
348+
"""
349+
350+
model_config = pydantic.ConfigDict(extra="allow")
351+
352+
353+
class AsyncPipelineRegistrationRequest(pydantic.BaseModel):
354+
"""Body for `POST /api/v2/projects/{id}/pipelines/` from an async processing service."""
355+
356+
processing_service_name: str
357+
pipelines: list[PipelineConfigResponse] = []

processing_services/minimal/register.py

Lines changed: 78 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,72 @@
11
"""
22
Self-register this processing service's pipelines with Antenna.
33
4-
Modeled on the PR #1194 version (which targets API-key auth once merged). For
5-
now this targets main, which still uses user-token auth and requires
6-
`processing_service_name` in the registration body.
7-
8-
Auth priority:
9-
1. ANTENNA_API_KEY set → use `Api-Key <key>` (TODO: activate once PR #1194 merges)
10-
2. ANTENNA_API_AUTH_TOKEN set → use `Token <token>`
11-
3. ANTENNA_USER / ANTENNA_PASSWORD → log in, use the returned token.
12-
13-
Self-provisioning (option 3) matches the local-dev and CI defaults baked into
14-
.envs/.local/.django (antenna@insectai.org / localadmin).
15-
16-
Environment:
17-
ANTENNA_API_URL Base URL (e.g. http://django:8000)
18-
ANTENNA_PROJECT_ID Project PK OR ANTENNA_DEFAULT_PROJECT_NAME to resolve to one.
19-
ANTENNA_DEFAULT_PROJECT_NAME Fallback lookup by name (default: "Default Project")
20-
ANTENNA_SERVICE_NAME ProcessingService name (default: minimal-worker-<hostname>)
21-
ANTENNA_API_KEY Optional (future, PR #1194 path)
22-
ANTENNA_API_AUTH_TOKEN Optional static token (skips login)
23-
ANTENNA_USER Fallback login email (default: antenna@insectai.org)
24-
ANTENNA_PASSWORD Fallback login password (default: localadmin)
4+
What this does, in order:
5+
1. Resolve an Authorization header (env var or fallback login).
6+
2. Resolve the target project id (env var, else look up by name).
7+
3. Fetch our own /info to get the list of pipelines this container serves.
8+
4. POST that list to `/api/v2/projects/{id}/pipelines/` so Antenna knows which
9+
async pipelines this ProcessingService can handle.
10+
11+
About identity: on main, the server looks up / creates a `ProcessingService`
12+
record by the `processing_service_name` field in the request body, and grants
13+
write access based on the Authorization header's user. PR #1194 changes that
14+
to use API keys — the PS record is derived from the key itself and
15+
`processing_service_name` is no longer sent. We tolerate both by sending
16+
`processing_service_name` now; #1194-enabled Antenna will ignore the field and
17+
pick the PS from the key.
18+
19+
Env vars are read via `os.environ[...]` without fallbacks — the .env file is
20+
expected to provide them. See `processing_services/.env.example`.
2521
"""
2622

2723
import logging
2824
import os
29-
import platform
30-
import socket
3125
import sys
3226
import time
3327

3428
import requests
29+
from api.api import pipelines as pipeline_classes # type: ignore[import-not-found]
30+
from api.schemas import ( # type: ignore[import-not-found]
31+
AsyncPipelineRegistrationRequest,
32+
PipelineConfigResponse,
33+
ProcessingServiceClientInfo,
34+
)
3535

3636
logger = logging.getLogger(__name__)
3737

3838
MAX_RETRIES = 20
3939
RETRY_DELAY = 3 # seconds
4040

41-
DEFAULT_USER = "antenna@insectai.org"
42-
DEFAULT_PASSWORD = "localadmin"
43-
DEFAULT_PROJECT_NAME = "Default Project"
44-
LOCAL_INFO_URL = "http://localhost:2000/info"
45-
LOCAL_LIVEZ_URL = "http://localhost:2000/livez"
41+
CACHED_AUTH_HEADER_PATH = "/tmp/antenna_auth_header"
4642

4743

48-
def get_client_info() -> dict:
49-
"""Identity metadata sent to Antenna.
44+
def get_client_info() -> ProcessingServiceClientInfo:
45+
"""Identity metadata sent to Antenna in the registration body.
5046
51-
Extra keys are allowed by the ProcessingServiceClientInfo schema (Config.extra = "allow"),
52-
so it's fine to add more here; main's registration endpoint currently ignores this field
53-
and PR #1194 reads it.
47+
`ProcessingServiceClientInfo` has `extra="allow"`, so any keys here are
48+
forwarded verbatim. On main the registration serializer ignores unknown
49+
fields; PR #1194 consumes this field.
5450
"""
55-
return {
56-
"hostname": socket.gethostname(),
57-
"software": "antenna-minimal-worker",
58-
"version": "0.1.0",
59-
"platform": platform.platform(),
60-
}
51+
import platform
52+
import socket
53+
54+
return ProcessingServiceClientInfo.model_validate(
55+
{
56+
"hostname": socket.gethostname(),
57+
"software": "antenna-minimal-worker",
58+
"version": "0.1.0",
59+
"platform": platform.platform(),
60+
}
61+
)
6162

6263

6364
def auth_header() -> dict[str, str] | None:
6465
"""Pick an auth header based on what env vars are set, or None to trigger login flow."""
6566
api_key = os.environ.get("ANTENNA_API_KEY")
6667
if api_key:
67-
# TODO(PR #1194): Api-Key auth is enabled on Antenna once #1194 merges.
68+
# PR #1194 path. Harmless on main — main ignores unknown auth schemes
69+
# and falls through to the next header, which we don't send.
6870
return {"Authorization": f"Api-Key {api_key}"}
6971

7072
token = os.environ.get("ANTENNA_API_AUTH_TOKEN")
@@ -93,7 +95,7 @@ def resolve_project_id(api_url: str, headers: dict[str, str]) -> str:
9395
if explicit:
9496
return explicit
9597

96-
name = os.environ.get("ANTENNA_DEFAULT_PROJECT_NAME", DEFAULT_PROJECT_NAME)
98+
name = os.environ["ANTENNA_DEFAULT_PROJECT_NAME"]
9799
resp = requests.get(f"{api_url}/api/v2/projects/", headers=headers, timeout=10)
98100
resp.raise_for_status()
99101
for project in resp.json().get("results", []):
@@ -104,57 +106,54 @@ def resolve_project_id(api_url: str, headers: dict[str, str]) -> str:
104106
raise RuntimeError(f"No project found with name '{name}' — ensure_default_project should have created it")
105107

106108

107-
def fetch_own_pipelines() -> list[dict]:
108-
resp = requests.get(LOCAL_INFO_URL, timeout=5)
109-
resp.raise_for_status()
110-
return resp.json().get("pipelines", [])
111-
109+
def fetch_own_pipelines() -> list[PipelineConfigResponse]:
110+
"""Return the pipeline configs this container serves.
112111
113-
def wait_for_local_server() -> None:
114-
for attempt in range(MAX_RETRIES):
115-
try:
116-
r = requests.get(LOCAL_LIVEZ_URL, timeout=2)
117-
if r.status_code == 200:
118-
return
119-
except (requests.ConnectionError, requests.Timeout):
120-
pass
121-
logger.info("Waiting for local FastAPI server (%d/%d)", attempt + 1, MAX_RETRIES)
122-
time.sleep(RETRY_DELAY)
123-
raise RuntimeError("Local FastAPI server did not come up in time")
112+
Imported directly from the api module rather than fetched over HTTP from
113+
the co-located FastAPI service — register.py runs in the same container,
114+
and importing avoids having to wait for FastAPI to be up (which it isn't
115+
in MODE=worker).
116+
"""
117+
return [p.config for p in pipeline_classes]
124118

125119

126-
def register(api_url: str, project_id: str, headers: dict[str, str], pipelines: list[dict]) -> None:
120+
def register(
121+
api_url: str,
122+
project_id: str,
123+
headers: dict[str, str],
124+
pipelines: list[PipelineConfigResponse],
125+
) -> None:
127126
"""POST pipelines to the project registration endpoint.
128127
129-
Body shape for main:
130-
{"processing_service_name": str, "pipelines": [...], "client_info": {...}}
131-
PR #1194 drops `processing_service_name`. Sending both now is safe because
132-
main's serializer ignores unknown fields and #1194's serializer ignores
133-
`processing_service_name`.
128+
Sends the schema-defined `AsyncPipelineRegistrationRequest` body. We also
129+
attach a `client_info` field, which is ignored on main (unknown field) and
130+
read by PR #1194.
134131
"""
135-
service_name = os.environ.get("ANTENNA_SERVICE_NAME", f"minimal-worker-{socket.gethostname()}")
136-
payload = {
137-
"processing_service_name": service_name,
138-
"pipelines": pipelines,
139-
"client_info": get_client_info(),
140-
}
132+
service_name = os.environ["ANTENNA_SERVICE_NAME"]
133+
body = AsyncPipelineRegistrationRequest(
134+
processing_service_name=service_name,
135+
pipelines=pipelines,
136+
).model_dump(mode="json")
137+
body["client_info"] = get_client_info().model_dump(mode="json")
138+
141139
url = f"{api_url}/api/v2/projects/{project_id}/pipelines/"
142-
resp = requests.post(url, json=payload, headers=headers, timeout=30)
140+
resp = requests.post(url, json=body, headers=headers, timeout=30)
143141
if resp.status_code in (200, 201):
144-
logger.info("Registered %d pipelines as '%s' (project=%s)", len(pipelines), service_name, project_id)
142+
logger.info(
143+
"Registered %d pipelines as '%s' (project=%s)",
144+
len(pipelines),
145+
service_name,
146+
project_id,
147+
)
145148
return
146149
raise RuntimeError(f"Registration failed: {resp.status_code} {resp.text}")
147150

148151

149152
def main() -> int:
150153
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
151154

152-
api_url = os.environ.get("ANTENNA_API_URL")
153-
if not api_url:
154-
logger.error("ANTENNA_API_URL not set; skipping registration")
155-
return 0
155+
api_url = os.environ["ANTENNA_API_URL"]
156156

157-
wait_for_local_server()
158157
pipelines = fetch_own_pipelines()
159158
if not pipelines:
160159
logger.warning("No pipelines found from local /info; nothing to register")
@@ -163,8 +162,8 @@ def main() -> int:
163162
# Auth: use explicit header if provided, else log in.
164163
headers = auth_header()
165164
if headers is None:
166-
email = os.environ.get("ANTENNA_USER", DEFAULT_USER)
167-
password = os.environ.get("ANTENNA_PASSWORD", DEFAULT_PASSWORD)
165+
email = os.environ["ANTENNA_USER"]
166+
password = os.environ["ANTENNA_PASSWORD"]
168167
# Retry login so we tolerate "Django not up yet".
169168
for attempt in range(MAX_RETRIES):
170169
try:
@@ -193,7 +192,7 @@ def main() -> int:
193192

194193
# Cache the resolved id and auth header for worker_main.py to reuse.
195194
os.environ["ANTENNA_PROJECT_ID"] = project_id
196-
with open("/tmp/antenna_auth_header", "w") as f:
195+
with open(CACHED_AUTH_HEADER_PATH, "w") as f:
197196
f.write(next(iter(headers.values())))
198197

199198
for attempt in range(MAX_RETRIES):

processing_services/minimal/start.sh

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,11 @@ start_api() {
2828
}
2929

3030
start_register() {
31-
# register.py self-provisions a ProcessingService and registers this
32-
# container's pipelines with Antenna. It has its own retry loop for
33-
# "Antenna not up yet", so we don't need to poll for readiness here.
31+
# register.py imports pipelines from the api module directly (no HTTP to
32+
# self), self-provisions a ProcessingService, and registers pipelines
33+
# with Antenna. Its retry loop handles "Antenna not up yet".
3434
# Skip if registration env vars aren't set — the container still works
3535
# as a pure v1 push service without them.
36-
# Need ANTENNA_API_URL and either an explicit ANTENNA_PROJECT_ID or a
37-
# project name (ANTENNA_DEFAULT_PROJECT_NAME) that register.py can look up.
3836
if [ -z "${ANTENNA_API_URL:-}" ]; then
3937
echo "[start.sh] Skipping registration (ANTENNA_API_URL not set)"
4038
return
@@ -62,10 +60,11 @@ case "$MODE" in
6260
start_worker
6361
;;
6462
api+worker)
63+
# FastAPI first so /process is available even while register.py is
64+
# still retrying against a not-yet-ready Antenna. register.py doesn't
65+
# depend on FastAPI (imports pipeline configs directly), so there's
66+
# no startup race here.
6567
start_api
66-
# Give the FastAPI side a beat to bind its port before register.py
67-
# tries to GET /info from it. Cheap; register.py also retries.
68-
sleep 2
6968
start_register
7069
start_worker
7170
;;

0 commit comments

Comments
 (0)