fix(utils): guard onProgress cycles in Job dispatch#3485
Conversation
When two jobs share progress callbacks that form a cycle, a single progress event would recurse synchronously through the recipients forEach loop until V8 hit `Maximum call stack size exceeded` and the process crashed. The cycle most often appears in DialQueue, where two parallel dials of the same peer share a job via `join()` and propagate progress events back to each other. Add a per-Job `dispatchingProgress` flag so a synchronous re-entry into the same job's synthesised onProgress short-circuits instead of recursing. Non-cyclic dispatches behave identically; only the cycle is broken. Includes a regression test that reproduces the recursion — without the guard it fails with `RangeError: Maximum call stack size exceeded`. Fixes libp2p#3484
|
Hi @Bojan131, thanks for the fix. We hit this same recursion in A few extra cases that might be worth adding to it('should not recurse infinitely on a 3-job triangle (A -> B -> C -> A)', async () => {
// Beyond the 2-job mutual cycle, deeper kad-dht peer-routing chains
// can form longer cycles. Each Job's flag guards its own dispatcher,
// so the recursion terminates the first time it loops back to a Job
// that is already dispatching, regardless of cycle length.
interface ProgressJobOptions extends AbortOptions, ProgressOptions {}
const queueA = new Queue<string, ProgressJobOptions>({ concurrency: 1 })
const queueB = new Queue<string, ProgressJobOptions>({ concurrency: 1 })
const queueC = new Queue<string, ProgressJobOptions>({ concurrency: 1 })
let aOP: ((evt: any) => void) | undefined
let bOP: ((evt: any) => void) | undefined
let cOP: ((evt: any) => void) | undefined
const aReady = pDefer<void>()
const bReady = pDefer<void>()
const cReady = pDefer<void>()
const hold = pDefer<void>()
const eventsA: any[] = []
const eventsB: any[] = []
const eventsC: any[] = []
const pA = queueA.add(async (options) => {
aOP = options.onProgress
aReady.resolve()
await hold.promise
return 'a'
}, {
onProgress: (evt) => { eventsA.push(evt); bOP?.(evt) }
})
const pB = queueB.add(async (options) => {
bOP = options.onProgress
bReady.resolve()
await hold.promise
return 'b'
}, {
onProgress: (evt) => { eventsB.push(evt); cOP?.(evt) }
})
const pC = queueC.add(async (options) => {
cOP = options.onProgress
cReady.resolve()
await hold.promise
return 'c'
}, {
onProgress: (evt) => { eventsC.push(evt); aOP?.(evt) }
})
await Promise.all([aReady.promise, bReady.promise, cReady.promise])
expect(() => { aOP?.(new CustomProgressEvent('kick')) }).to.not.throw()
expect(eventsA).to.have.lengthOf(1)
expect(eventsB).to.have.lengthOf(1)
expect(eventsC).to.have.lengthOf(1)
hold.resolve()
await Promise.all([pA, pB, pC])
})
it('should keep dispatching after a previous cycle completes', async () => {
// Verifies the per-job flag is actually reset (try/finally) so a
// second progress event fires normally after the first one was
// short-circuited by the cycle guard.
interface ProgressJobOptions extends AbortOptions, ProgressOptions {}
const queueA = new Queue<string, ProgressJobOptions>({ concurrency: 1 })
const queueB = new Queue<string, ProgressJobOptions>({ concurrency: 1 })
let aOP: ((evt: any) => void) | undefined
let bOP: ((evt: any) => void) | undefined
const aReady = pDefer<void>()
const bReady = pDefer<void>()
const hold = pDefer<void>()
const eventsA: any[] = []
const eventsB: any[] = []
const pA = queueA.add(async (options) => {
aOP = options.onProgress
aReady.resolve()
await hold.promise
return 'a'
}, {
onProgress: (evt) => { eventsA.push(evt); bOP?.(evt) }
})
const pB = queueB.add(async (options) => {
bOP = options.onProgress
bReady.resolve()
await hold.promise
return 'b'
}, {
onProgress: (evt) => { eventsB.push(evt); aOP?.(evt) }
})
await Promise.all([aReady.promise, bReady.promise])
aOP?.(new CustomProgressEvent('first'))
aOP?.(new CustomProgressEvent('second'))
expect(eventsA.map(e => e.type)).to.deep.equal(['first', 'second'])
expect(eventsB.map(e => e.type)).to.deep.equal(['first', 'second'])
hold.resolve()
await Promise.all([pA, pB])
})
it('resets the dispatch flag even if a recipient throws', async () => {
// The try/finally must reset dispatchingProgress on the error path,
// otherwise one bad event poisons every subsequent dispatch on this
// Job for its lifetime.
interface ProgressJobOptions extends AbortOptions, ProgressOptions {}
const queue = new Queue<string, ProgressJobOptions>({ concurrency: 1 })
let synthOP: ((evt: any) => void) | undefined
const ready = pDefer<void>()
const hold = pDefer<void>()
const seen: any[] = []
let throwOnce = true
const p = queue.add(async (options) => {
synthOP = options.onProgress
ready.resolve()
await hold.promise
return 'done'
}, {
onProgress: (evt) => {
seen.push(evt)
if (throwOnce) {
throwOnce = false
throw new Error('boom')
}
}
})
await ready.promise
// First dispatch throws inside the recipient. The forEach surfaces it,
// but the finally clause must still reset the flag.
expect(() => synthOP?.(new CustomProgressEvent('first'))).to.throw('boom')
// Second dispatch must NOT be blocked by a stuck flag.
expect(() => synthOP?.(new CustomProgressEvent('second'))).to.not.throw()
expect(seen.map(e => e.type)).to.deep.equal(['first', 'second'])
hold.resolve()
await p
})If any of these duplicate something already in the suite I missed, ignore. The triangle one is the one we found most useful in tracking down the production crash because the kad-dht peer-routing path naturally produces 3-and-more-job cycles, not just the 2-job mutual case. Independent reproduction context: we observed this on a Bitcoin L1 indexer node running Happy to squash these into a follow-up commit on this branch if it helps. |
Adds a 3-job triangle, a flag-resets-after-cycle, and a recipient-throws test for the per-Job re-entry guard, and tidies the comments around the existing 2-job test. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
thanks :) |
What
Fixes #3484 —
Jobin@libp2p/utilscrashes the process withRangeError: Maximum call stack size exceededwhen two jobs in different queues end up with progress callbacks that point at each other.The crash was first reported by users running
OriginTrail/dkgbut the bug is general; any consumer that has twoQueueinstances whose recipients can transitively call back into each other's synthesisedonProgresswill hit it. We see it in practice on the dial path when two parallel dials of the same peer share a job viajoin().How
Add a per-
JobdispatchingProgressflag. The synthesisedonProgresschecks the flag, returns early if it's already dispatching, otherwise sets it, runs the recipientforEach, and clears it in afinally.This is the smallest change that fixes the cycle without altering any non-cyclic behaviour: a normal dispatch (one entry, never re-enters before returning) sees the flag flip on then off again with no observable difference. Only a synchronous re-entry into the same job's dispatcher is short-circuited.
Why this approach
Two alternatives were considered in the issue and rejected:
queueMicrotaskto defer dispatch — changes the synchronous contract ofonProgress. The existing test'should consume synchronous progress events'(queue.spec.ts) explicitly asserts that synchronous events are observed before the awaiteddelay. Deferring would break that.The flag is per-
Job, not per-Queue, so two unrelated jobs in the same queue can dispatch concurrently without interference.Tests
Added
'should not recurse infinitely when two jobs progress-feed each other (issue #3484)'topackages/utils/test/queue.spec.ts. The test sets up two queues whose recipients reference each other's synthesisedonProgress, kicks a single event into the cycle, and asserts:I verified the test is load-bearing by temporarily removing the guard and re-running it — without the fix it fails with the original
RangeError: Maximum call stack size exceeded.The full
@libp2p/utilsnode test suite (217 tests) passes with the change, andaegir lint,aegir doc-check,aegir build, andaegir dep-checkare all clean.