Summary
Currently the run-level cache lock (should_lock_run) is implemented by monkey-patching Task.run in TaskOnKart.__init__:
# gokart/task.py
if self.should_lock_run:
self._lock_at_dump = False
task_lock_params = make_task_lock_params_for_run(task_self=self)
self.run = wrap_run_with_lock(run_func=self.run, task_lock_params=task_lock_params) # type: ignore
This proposal moves the run-level lock out of the Task and into the gokart Worker's task dispatch path, and abstracts the lock store behind a backend interface so non-Redis backends (e.g. GCS object preconditions) can be used.
This issue is only about the run lock. The cache I/O locks for dump() / load() / remove() (#265) solve a different problem (read/write races on the cache file) and stay at the target layer.
Problems with the current approach
- Mixed concerns — a cross-cutting coordination concern (distributed lock) is woven into
Task construction. A Task should describe what to compute, not whether it may run here right now.
- Monkey-patching
self.run — reassigning the instance method breaks introspection, makes wrapper ordering implicit (it stacks with task_complete_check_wrapper), and leaks infra config (redis_host asserts) into __init__.
- Wrong layer / poor collision behavior — deciding "may this task run here now?" is a dispatch decision, but it currently lives inside the thing being dispatched. On collision the wrapped
run() raises TaskLockException from inside an already-forked process, and build.py retries the whole build with exponential backoff. It never "moves on to another task".
Proposed design
Acquire/release the run lock in the Worker dispatch path instead of wrapping Task.run:
# gokart/worker.py (sketch)
def _run_task(self, task_id):
task = self._scheduled_tasks[task_id]
lock = self._try_acquire_run_lock(task) # backend lock, blocking=False
if lock is None: # held by another worker
self._add_task(worker=self._id, task_id=task_id, status=PENDING, runnable=True)
next(self._sleeper()) # other PENDING tasks go first; otherwise wait
return
self._run_locks[task_id] = lock # held by parent worker + heartbeat
task_process = self._create_task_process(task)
self._running_tasks[task_id] = task_process
...
def _handle_next_task(self):
...
self._release_run_lock(task_id) # release on completion
def _purge_children(self): # and __exit__
...
self._release_run_lock(task_id) # release on crash
Why this works (verified against luigi's Scheduler.get_work):
get_work marks the chosen task RUNNING / worker_running immediately, and re-hands a task the worker claimed-but-isn't-running back on the next get_work. So simply ignoring a locked task busy-loops. Re-pending it with add_task(status=PENDING, runnable=True) releases the claim so other PENDING tasks are scheduled first; when none remain, _sleeper() waits — exactly the desired "run another task first, otherwise wait" behavior.
- This matters mainly for the
local_scheduler=True case (the gokart.build default), where each process has its own scheduler and cannot dedup runs across processes — so a side-channel lock is required. With a shared central scheduler, run dedup is mostly already handled by the scheduler.
- After the lock is released, the late worker's
task_completion_check_at_run skips the now-completed task automatically.
- Holding the lock in the parent worker process (instead of inside the forked child as today) makes crash cleanup reliable via
_purge_children.
Pluggable lock backend
Extract the Redis-specific logic behind an interface so other stores can implement it:
gokart/conflict_prevention_lock/
backends/
base.py # AbstractTaskLockBackend: acquire() / release() / extend()
redis_backend.py # existing logic, moved
gcs_backend.py # atomic create via if_generation_match=0; stale takeover via expire_at; heartbeat rewrites expire_at
GCS notes: GCS has no native per-object TTL, so a stale lock (dead worker) is handled by storing expire_at in the lock object and allowing takeover via generation-match once expired; the heartbeat rewrites expire_at. GCS is strongly consistent, but the same-object write rate (~1/s) should be respected by the heartbeat interval.
Scope / non-goals
Open questions
- Backend selection API — reuse the existing
redis_* parameters, or introduce a more general task_lock_backend parameter?
- Re-pend strategy on collision — is
add_task(PENDING) + _sleeper() sufficient, or do we want a short per-task backoff to avoid thrashing when many workers contend the same task?
- Should the worker-held run lock fully replace
wrap_run_with_lock, or coexist behind a flag during a deprecation period?
Summary
Currently the run-level cache lock (
should_lock_run) is implemented by monkey-patchingTask.runinTaskOnKart.__init__:This proposal moves the run-level lock out of the
Taskand into the gokartWorker's task dispatch path, and abstracts the lock store behind a backend interface so non-Redis backends (e.g. GCS object preconditions) can be used.This issue is only about the run lock. The cache I/O locks for
dump()/load()/remove()(#265) solve a different problem (read/write races on the cache file) and stay at the target layer.Problems with the current approach
Taskconstruction. ATaskshould describe what to compute, not whether it may run here right now.self.run— reassigning the instance method breaks introspection, makes wrapper ordering implicit (it stacks withtask_complete_check_wrapper), and leaks infra config (redis_hostasserts) into__init__.run()raisesTaskLockExceptionfrom inside an already-forked process, andbuild.pyretries the whole build with exponential backoff. It never "moves on to another task".Proposed design
Acquire/release the run lock in the
Workerdispatch path instead of wrappingTask.run:Why this works (verified against luigi's
Scheduler.get_work):get_workmarks the chosen taskRUNNING/worker_runningimmediately, and re-hands a task the worker claimed-but-isn't-running back on the nextget_work. So simply ignoring a locked task busy-loops. Re-pending it withadd_task(status=PENDING, runnable=True)releases the claim so other PENDING tasks are scheduled first; when none remain,_sleeper()waits — exactly the desired "run another task first, otherwise wait" behavior.local_scheduler=Truecase (thegokart.builddefault), where each process has its own scheduler and cannot dedup runs across processes — so a side-channel lock is required. With a shared central scheduler, run dedup is mostly already handled by the scheduler.task_completion_check_at_runskips the now-completed task automatically._purge_children.Pluggable lock backend
Extract the Redis-specific logic behind an interface so other stores can implement it:
GCS notes: GCS has no native per-object TTL, so a stale lock (dead worker) is handled by storing
expire_atin the lock object and allowing takeover via generation-match once expired; the heartbeat rewritesexpire_at. GCS is strongly consistent, but the same-object write rate (~1/s) should be respected by the heartbeat interval.Scope / non-goals
dump()/load()/remove()cache I/O locks (The redis cache lock reduces task parallelism and causes unnecessary latency in execution #265) at the target layer — out of scope.should_lock_run+redis_host/redis_portshould keep working (Redis backend as default), withTask.runwrapping removed.Open questions
redis_*parameters, or introduce a more generaltask_lock_backendparameter?add_task(PENDING)+_sleeper()sufficient, or do we want a short per-task backoff to avoid thrashing when many workers contend the same task?wrap_run_with_lock, or coexist behind a flag during a deprecation period?