Skip to content

Move run-level task lock from Task.run wrapping to the gokart Worker (with pluggable lock backend, e.g. GCS) #524

Description

@kitagry

Summary

Currently the run-level cache lock (should_lock_run) is implemented by monkey-patching Task.run in TaskOnKart.__init__:

# gokart/task.py
if self.should_lock_run:
    self._lock_at_dump = False
    task_lock_params = make_task_lock_params_for_run(task_self=self)
    self.run = wrap_run_with_lock(run_func=self.run, task_lock_params=task_lock_params)  # type: ignore

This proposal moves the run-level lock out of the Task and into the gokart Worker's task dispatch path, and abstracts the lock store behind a backend interface so non-Redis backends (e.g. GCS object preconditions) can be used.

This issue is only about the run lock. The cache I/O locks for dump() / load() / remove() (#265) solve a different problem (read/write races on the cache file) and stay at the target layer.

Problems with the current approach

  1. Mixed concerns — a cross-cutting coordination concern (distributed lock) is woven into Task construction. A Task should describe what to compute, not whether it may run here right now.
  2. Monkey-patching self.run — reassigning the instance method breaks introspection, makes wrapper ordering implicit (it stacks with task_complete_check_wrapper), and leaks infra config (redis_host asserts) into __init__.
  3. Wrong layer / poor collision behavior — deciding "may this task run here now?" is a dispatch decision, but it currently lives inside the thing being dispatched. On collision the wrapped run() raises TaskLockException from inside an already-forked process, and build.py retries the whole build with exponential backoff. It never "moves on to another task".

Proposed design

Acquire/release the run lock in the Worker dispatch path instead of wrapping Task.run:

# gokart/worker.py (sketch)
def _run_task(self, task_id):
    task = self._scheduled_tasks[task_id]
    lock = self._try_acquire_run_lock(task)        # backend lock, blocking=False
    if lock is None:                                # held by another worker
        self._add_task(worker=self._id, task_id=task_id, status=PENDING, runnable=True)
        next(self._sleeper())                       # other PENDING tasks go first; otherwise wait
        return
    self._run_locks[task_id] = lock                 # held by parent worker + heartbeat
    task_process = self._create_task_process(task)
    self._running_tasks[task_id] = task_process
    ...

def _handle_next_task(self):
    ...
    self._release_run_lock(task_id)                 # release on completion

def _purge_children(self):  # and __exit__
    ...
    self._release_run_lock(task_id)                 # release on crash

Why this works (verified against luigi's Scheduler.get_work):

  • get_work marks the chosen task RUNNING / worker_running immediately, and re-hands a task the worker claimed-but-isn't-running back on the next get_work. So simply ignoring a locked task busy-loops. Re-pending it with add_task(status=PENDING, runnable=True) releases the claim so other PENDING tasks are scheduled first; when none remain, _sleeper() waits — exactly the desired "run another task first, otherwise wait" behavior.
  • This matters mainly for the local_scheduler=True case (the gokart.build default), where each process has its own scheduler and cannot dedup runs across processes — so a side-channel lock is required. With a shared central scheduler, run dedup is mostly already handled by the scheduler.
  • After the lock is released, the late worker's task_completion_check_at_run skips the now-completed task automatically.
  • Holding the lock in the parent worker process (instead of inside the forked child as today) makes crash cleanup reliable via _purge_children.

Pluggable lock backend

Extract the Redis-specific logic behind an interface so other stores can implement it:

gokart/conflict_prevention_lock/
  backends/
    base.py          # AbstractTaskLockBackend: acquire() / release() / extend()
    redis_backend.py # existing logic, moved
    gcs_backend.py   # atomic create via if_generation_match=0; stale takeover via expire_at; heartbeat rewrites expire_at

GCS notes: GCS has no native per-object TTL, so a stale lock (dead worker) is handled by storing expire_at in the lock object and allowing takeover via generation-match once expired; the heartbeat rewrites expire_at. GCS is strongly consistent, but the same-object write rate (~1/s) should be respected by the heartbeat interval.

Scope / non-goals

Open questions

  1. Backend selection API — reuse the existing redis_* parameters, or introduce a more general task_lock_backend parameter?
  2. Re-pend strategy on collision — is add_task(PENDING) + _sleeper() sufficient, or do we want a short per-task backoff to avoid thrashing when many workers contend the same task?
  3. Should the worker-held run lock fully replace wrap_run_with_lock, or coexist behind a flag during a deprecation period?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions