Skip to content

Commit d5fe748

Browse files
committed
Fix defer time to be relative to now, not original enqueue time
Refactor score computation to use current timestamp (now_ms) for _defer_by calculations and expiry, while preserving the original enqueue_time_ms only in the serialized job data.
1 parent 5c30f2e commit d5fe748

2 files changed

Lines changed: 14 additions & 12 deletions

File tree

arq/connections.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -167,29 +167,29 @@ async def enqueue_job(
167167
job_exists = await pipe.exists(job_key)
168168
result_exists = await pipe.exists(result_key_prefix + job_id)
169169
in_progress = await pipe.exists(in_progress_key_prefix + job_id) if _debounce else False
170+
can_debounce = _debounce and job_exists and not result_exists and not in_progress
170171

171-
if (job_exists or result_exists) and not (
172-
_debounce and job_exists and not result_exists and not in_progress
173-
):
172+
if (job_exists or result_exists) and not can_debounce:
174173
await pipe.reset()
175174
return None
176175

177-
if _debounce and job_exists:
176+
now_ms = timestamp_ms()
177+
if can_debounce:
178178
existing_job_data = await pipe.get(job_key)
179179
_, _, _, _, enqueue_time_ms = deserialize_job_raw(existing_job_data, deserializer=self.job_deserializer)
180-
if debounce_max_ms is not None and timestamp_ms() - enqueue_time_ms >= debounce_max_ms:
180+
if debounce_max_ms is not None and now_ms - enqueue_time_ms >= debounce_max_ms:
181181
await pipe.reset()
182182
return None
183183
else:
184-
enqueue_time_ms = timestamp_ms()
184+
enqueue_time_ms = now_ms
185185
if _defer_until is not None:
186186
score = to_unix_ms(_defer_until)
187187
elif defer_by_ms:
188-
score = enqueue_time_ms + defer_by_ms
188+
score = now_ms + defer_by_ms
189189
else:
190-
score = enqueue_time_ms
190+
score = now_ms
191191

192-
expires_ms = expires_ms or score - enqueue_time_ms + self.expires_extra_ms
192+
expires_ms = expires_ms or score - now_ms + self.expires_extra_ms
193193

194194
job = serialize_job(function, args, kwargs, _job_try, enqueue_time_ms, serializer=self.job_serializer)
195195
pipe.multi()

tests/test_main.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -295,10 +295,12 @@ async def test_debounce_updates_defer_time(arq_redis: ArqRedis):
295295
assert isinstance(j1, Job)
296296
score1 = await arq_redis.zscore(default_queue_name, 'debounce_id')
297297

298-
# when: we enqueue the same job with debounce and a new defer
299-
j2 = await arq_redis.enqueue_job('foobar', _job_id='debounce_id', _debounce=True, _defer_by=10)
298+
await asyncio.sleep(0.05)
299+
300+
# when: we enqueue the same job with debounce and the same defer
301+
j2 = await arq_redis.enqueue_job('foobar', _job_id='debounce_id', _debounce=True, _defer_by=5)
300302

301-
# then: the job is returned (not None) and the score is updated
303+
# then: the job is returned (not None) and the score is updated (deferred from now)
302304
assert isinstance(j2, Job)
303305
score2 = await arq_redis.zscore(default_queue_name, 'debounce_id')
304306
assert score2 > score1

0 commit comments

Comments
 (0)