Give QueueManager a MessageWatchdog reference to trim handled messages#171
Merged
Conversation
Thread the registry's MessageWatchdog through InvocationHelper into each QueueManager via constructor injection (no cyclic dependency: the watchdog is built once before any InvocationHelper/QueueManager and only reaches a QueueManager at runtime via FlowsManagers -> FlowState -> Push). QueueManager now calls MessageWatchdog.RemoveMessages once it deletes messages from the store (AfterFlush, the Initialize replay, and the idempotency-duplicate path) so the watchdog drops those positions from its ignore-set instead of carrying them forever - realizing the existing TODO on _pushedPositions. RemoveMessages runs on flow threads, so _pushedPositions is now guarded by a lock. The hand-rolled QueueManager tests in MessagesSubscriptionTests get a shared CreateMessageWatchdog helper (the watchdog is never started, so RemoveMessages no-ops against its empty ignore-set).
Extract an IMessageWatchdog interface (just RemoveMessages) that MessageWatchdog implements, and have QueueManager depend on it. The hand-rolled QueueManager tests now pass a one-line no-op stub instead of constructing a fully wired MessageWatchdog. Production still injects the real watchdog (non-null).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Gives each
QueueManagera reference to the registry'sMessageWatchdogso it can ask the watchdog to drop message positions once they've been deleted from the store — keeping the watchdog's ignore-set bounded.Why constructor injection (not a setter)
No cyclic dependency exists, so constructor injection is used (avoids the half-initialized window a public setter introduces). Creation order:
FunctionsRegistrybuilds_messageWatchdogonce, before any registration.InvocationHelperis built later (during registration) → the watchdog already exists.QueueManageris built per-invocation, later still.MessageWatchdogonly reaches aQueueManagerat runtime (FlowsManagers.Push → FlowState.Push → QueueManager.Push), never at construction — soQueueManager → MessageWatchdogis acyclic.What changed
MessageWatchdog— newRemoveMessages(IReadOnlyList<long> positions)that drops positions from_pushedPositions. Since it's now called from flow threads concurrently with the watchdog loop,_pushedPositionsis guarded by aLock. Realizes the existing TODO on that field.QueueManager— newMessageWatchdogctor param + field; callsRemoveMessages(...)right after eachDeleteMessages(inAfterFlush, theInitializereplay, and the idempotency-duplicate path).InvocationHelper/FunctionsRegistry— plumbing to thread the reference through.MessagesSubscriptionTests— the three hand-rolledQueueManagertests get a sharedCreateMessageWatchdoghelper. The watchdog is never started, soRemoveMessagesno-ops against its empty ignore-set. Production keeps a strict (non-null, required) ctor param.Testing