Skip to content

fix(utils): guard onProgress cycles in Job dispatch#3485

Merged
dozyio merged 3 commits into
libp2p:mainfrom
Bojan131:fix/job-onprogress-reentry-guard
May 9, 2026
Merged

fix(utils): guard onProgress cycles in Job dispatch#3485
dozyio merged 3 commits into
libp2p:mainfrom
Bojan131:fix/job-onprogress-reentry-guard

Conversation

@Bojan131
Copy link
Copy Markdown
Contributor

What

Fixes #3484Job in @libp2p/utils crashes the process with RangeError: Maximum call stack size exceeded when two jobs in different queues end up with progress callbacks that point at each other.

The crash was first reported by users running OriginTrail/dkg but the bug is general; any consumer that has two Queue instances whose recipients can transitively call back into each other's synthesised onProgress will hit it. We see it in practice on the dial path when two parallel dials of the same peer share a job via join().

How

Add a per-Job dispatchingProgress flag. The synthesised onProgress checks the flag, returns early if it's already dispatching, otherwise sets it, runs the recipient forEach, and clears it in a finally.

onProgress: (evt: any): void => {
  if (this.dispatchingProgress) {
    return
  }
  this.dispatchingProgress = true
  try {
    this.recipients.forEach(recipient => {
      recipient.onProgress?.(evt)
    })
  } finally {
    this.dispatchingProgress = false
  }
}

This is the smallest change that fixes the cycle without altering any non-cyclic behaviour: a normal dispatch (one entry, never re-enters before returning) sees the flag flip on then off again with no observable difference. Only a synchronous re-entry into the same job's dispatcher is short-circuited.

Why this approach

Two alternatives were considered in the issue and rejected:

  • Per-event visited set — works, but every progress event carries new allocation/cleanup overhead, even on the hot path where there is no cycle. The bug is a self-recursion guard, so a per-instance flag is enough.
  • queueMicrotask to defer dispatch — changes the synchronous contract of onProgress. The existing test 'should consume synchronous progress events' (queue.spec.ts) explicitly asserts that synchronous events are observed before the awaited delay. Deferring would break that.

The flag is per-Job, not per-Queue, so two unrelated jobs in the same queue can dispatch concurrently without interference.

Tests

Added 'should not recurse infinitely when two jobs progress-feed each other (issue #3484)' to packages/utils/test/queue.spec.ts. The test sets up two queues whose recipients reference each other's synthesised onProgress, kicks a single event into the cycle, and asserts:

  1. The dispatch does not throw.
  2. Each recipient observes the event exactly once (the second hop sees the flag set and short-circuits).

I verified the test is load-bearing by temporarily removing the guard and re-running it — without the fix it fails with the original RangeError: Maximum call stack size exceeded.

The full @libp2p/utils node test suite (217 tests) passes with the change, and aegir lint, aegir doc-check, aegir build, and aegir dep-check are all clean.

When two jobs share progress callbacks that form a cycle, a single
progress event would recurse synchronously through the recipients
forEach loop until V8 hit `Maximum call stack size exceeded` and the
process crashed. The cycle most often appears in DialQueue, where two
parallel dials of the same peer share a job via `join()` and propagate
progress events back to each other.

Add a per-Job `dispatchingProgress` flag so a synchronous re-entry into
the same job's synthesised onProgress short-circuits instead of
recursing. Non-cyclic dispatches behave identically; only the cycle is
broken.

Includes a regression test that reproduces the recursion — without the
guard it fails with `RangeError: Maximum call stack size exceeded`.

Fixes libp2p#3484
@BlobMaster41
Copy link
Copy Markdown

BlobMaster41 commented May 7, 2026

Hi @Bojan131, thanks for the fix. We hit this same recursion in libp2p@3.2.4 with kad-dht enabled, manifesting as repeated RangeError crashes of the P2P worker every time a re-entrant dialQueue.dial(peerX) from kad-dht's findPeer(X) reached existingDial.join(options) with the running job's own synthesised onProgress. Verified your patch against a 7-case suite we wrote independently while tracking the bug down. All pass. Independent confirmation that this approach is correct.

A few extra cases that might be worth adding to queue.spec.ts to round out the coverage of the existing test:

it('should not recurse infinitely on a 3-job triangle (A -> B -> C -> A)', async () => {
  // Beyond the 2-job mutual cycle, deeper kad-dht peer-routing chains
  // can form longer cycles. Each Job's flag guards its own dispatcher,
  // so the recursion terminates the first time it loops back to a Job
  // that is already dispatching, regardless of cycle length.
  interface ProgressJobOptions extends AbortOptions, ProgressOptions {}

  const queueA = new Queue<string, ProgressJobOptions>({ concurrency: 1 })
  const queueB = new Queue<string, ProgressJobOptions>({ concurrency: 1 })
  const queueC = new Queue<string, ProgressJobOptions>({ concurrency: 1 })

  let aOP: ((evt: any) => void) | undefined
  let bOP: ((evt: any) => void) | undefined
  let cOP: ((evt: any) => void) | undefined

  const aReady = pDefer<void>()
  const bReady = pDefer<void>()
  const cReady = pDefer<void>()
  const hold = pDefer<void>()

  const eventsA: any[] = []
  const eventsB: any[] = []
  const eventsC: any[] = []

  const pA = queueA.add(async (options) => {
    aOP = options.onProgress
    aReady.resolve()
    await hold.promise
    return 'a'
  }, {
    onProgress: (evt) => { eventsA.push(evt); bOP?.(evt) }
  })

  const pB = queueB.add(async (options) => {
    bOP = options.onProgress
    bReady.resolve()
    await hold.promise
    return 'b'
  }, {
    onProgress: (evt) => { eventsB.push(evt); cOP?.(evt) }
  })

  const pC = queueC.add(async (options) => {
    cOP = options.onProgress
    cReady.resolve()
    await hold.promise
    return 'c'
  }, {
    onProgress: (evt) => { eventsC.push(evt); aOP?.(evt) }
  })

  await Promise.all([aReady.promise, bReady.promise, cReady.promise])

  expect(() => { aOP?.(new CustomProgressEvent('kick')) }).to.not.throw()

  expect(eventsA).to.have.lengthOf(1)
  expect(eventsB).to.have.lengthOf(1)
  expect(eventsC).to.have.lengthOf(1)

  hold.resolve()
  await Promise.all([pA, pB, pC])
})

it('should keep dispatching after a previous cycle completes', async () => {
  // Verifies the per-job flag is actually reset (try/finally) so a
  // second progress event fires normally after the first one was
  // short-circuited by the cycle guard.
  interface ProgressJobOptions extends AbortOptions, ProgressOptions {}

  const queueA = new Queue<string, ProgressJobOptions>({ concurrency: 1 })
  const queueB = new Queue<string, ProgressJobOptions>({ concurrency: 1 })

  let aOP: ((evt: any) => void) | undefined
  let bOP: ((evt: any) => void) | undefined

  const aReady = pDefer<void>()
  const bReady = pDefer<void>()
  const hold = pDefer<void>()

  const eventsA: any[] = []
  const eventsB: any[] = []

  const pA = queueA.add(async (options) => {
    aOP = options.onProgress
    aReady.resolve()
    await hold.promise
    return 'a'
  }, {
    onProgress: (evt) => { eventsA.push(evt); bOP?.(evt) }
  })

  const pB = queueB.add(async (options) => {
    bOP = options.onProgress
    bReady.resolve()
    await hold.promise
    return 'b'
  }, {
    onProgress: (evt) => { eventsB.push(evt); aOP?.(evt) }
  })

  await Promise.all([aReady.promise, bReady.promise])

  aOP?.(new CustomProgressEvent('first'))
  aOP?.(new CustomProgressEvent('second'))

  expect(eventsA.map(e => e.type)).to.deep.equal(['first', 'second'])
  expect(eventsB.map(e => e.type)).to.deep.equal(['first', 'second'])

  hold.resolve()
  await Promise.all([pA, pB])
})

it('resets the dispatch flag even if a recipient throws', async () => {
  // The try/finally must reset dispatchingProgress on the error path,
  // otherwise one bad event poisons every subsequent dispatch on this
  // Job for its lifetime.
  interface ProgressJobOptions extends AbortOptions, ProgressOptions {}

  const queue = new Queue<string, ProgressJobOptions>({ concurrency: 1 })

  let synthOP: ((evt: any) => void) | undefined

  const ready = pDefer<void>()
  const hold = pDefer<void>()

  const seen: any[] = []
  let throwOnce = true

  const p = queue.add(async (options) => {
    synthOP = options.onProgress
    ready.resolve()
    await hold.promise
    return 'done'
  }, {
    onProgress: (evt) => {
      seen.push(evt)
      if (throwOnce) {
        throwOnce = false
        throw new Error('boom')
      }
    }
  })

  await ready.promise

  // First dispatch throws inside the recipient. The forEach surfaces it,
  // but the finally clause must still reset the flag.
  expect(() => synthOP?.(new CustomProgressEvent('first'))).to.throw('boom')

  // Second dispatch must NOT be blocked by a stuck flag.
  expect(() => synthOP?.(new CustomProgressEvent('second'))).to.not.throw()

  expect(seen.map(e => e.type)).to.deep.equal(['first', 'second'])

  hold.resolve()
  await p
})

If any of these duplicate something already in the suite I missed, ignore. The triangle one is the one we found most useful in tracking down the production crash because the kad-dht peer-routing path naturally produces 3-and-more-job cycles, not just the 2-job mutual case.

Independent reproduction context: we observed this on a Bitcoin L1 indexer node running libp2p@3.2.4, manifesting as the P2P worker thread aborting under steady-state load when findPeer(X) issued sub-dials that re-entered the running dial(X) via kad-dht. Stack identical to the one in #3484. We pinned @libp2p/utils@7.0.14 is unaffected and 7.1.0+ is, which matches the regression range.

Happy to squash these into a follow-up commit on this branch if it helps.

Adds a 3-job triangle, a flag-resets-after-cycle, and a
recipient-throws test for the per-Job re-entry guard, and tidies
the comments around the existing 2-job test.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@tabcat tabcat changed the title fix(utils): guard against re-entrant progress dispatch in Job fix(utils): guard onProgress cycles in Job dispatch May 7, 2026
@tabcat tabcat requested a review from dozyio May 8, 2026 11:38
@dozyio dozyio merged commit 40a6220 into libp2p:main May 9, 2026
73 of 77 checks passed
@tabcat
Copy link
Copy Markdown
Member

tabcat commented May 9, 2026

thanks :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Job onProgress can recurse infinitely when two queues' progress callbacks reference each other

4 participants