Skip to content

Give QueueManager a MessageWatchdog reference to trim handled messages#171

Merged
stidsborg merged 2 commits into
mainfrom
queuemanager-messagewatchdog-ref
Jun 20, 2026
Merged

Give QueueManager a MessageWatchdog reference to trim handled messages#171
stidsborg merged 2 commits into
mainfrom
queuemanager-messagewatchdog-ref

Conversation

@stidsborg

Copy link
Copy Markdown
Owner

Summary

Gives each QueueManager a reference to the registry's MessageWatchdog so it can ask the watchdog to drop message positions once they've been deleted from the store — keeping the watchdog's ignore-set bounded.

Why constructor injection (not a setter)

No cyclic dependency exists, so constructor injection is used (avoids the half-initialized window a public setter introduces). Creation order:

  • FunctionsRegistry builds _messageWatchdog once, before any registration.
  • InvocationHelper is built later (during registration) → the watchdog already exists.
  • QueueManager is built per-invocation, later still.

MessageWatchdog only reaches a QueueManager at runtime (FlowsManagers.Push → FlowState.Push → QueueManager.Push), never at construction — so QueueManager → MessageWatchdog is acyclic.

What changed

  • MessageWatchdog — new RemoveMessages(IReadOnlyList<long> positions) that drops positions from _pushedPositions. Since it's now called from flow threads concurrently with the watchdog loop, _pushedPositions is guarded by a Lock. Realizes the existing TODO on that field.
  • QueueManager — new MessageWatchdog ctor param + field; calls RemoveMessages(...) right after each DeleteMessages (in AfterFlush, the Initialize replay, and the idempotency-duplicate path).
  • InvocationHelper / FunctionsRegistry — plumbing to thread the reference through.
  • MessagesSubscriptionTests — the three hand-rolled QueueManager tests get a shared CreateMessageWatchdog helper. The watchdog is never started, so RemoveMessages no-ops against its empty ignore-set. Production keeps a strict (non-null, required) ctor param.

Testing

  • Full solution builds clean (0 warnings / 0 errors).
  • The 3 directly-affected tests and the full 59-test in-memory messaging suite pass.

Thread the registry's MessageWatchdog through InvocationHelper into each
QueueManager via constructor injection (no cyclic dependency: the watchdog is
built once before any InvocationHelper/QueueManager and only reaches a
QueueManager at runtime via FlowsManagers -> FlowState -> Push).

QueueManager now calls MessageWatchdog.RemoveMessages once it deletes messages
from the store (AfterFlush, the Initialize replay, and the idempotency-duplicate
path) so the watchdog drops those positions from its ignore-set instead of
carrying them forever - realizing the existing TODO on _pushedPositions.
RemoveMessages runs on flow threads, so _pushedPositions is now guarded by a lock.

The hand-rolled QueueManager tests in MessagesSubscriptionTests get a shared
CreateMessageWatchdog helper (the watchdog is never started, so RemoveMessages
no-ops against its empty ignore-set).
Extract an IMessageWatchdog interface (just RemoveMessages) that MessageWatchdog
implements, and have QueueManager depend on it. The hand-rolled QueueManager
tests now pass a one-line no-op stub instead of constructing a fully wired
MessageWatchdog. Production still injects the real watchdog (non-null).
@stidsborg stidsborg merged commit 599a24e into main Jun 20, 2026
8 checks passed
@stidsborg stidsborg deleted the queuemanager-messagewatchdog-ref branch June 20, 2026 07:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant