Extract MessageClearer and add a positions-only DeleteMessages overload#173
Merged
Conversation
Add IMessageStore.DeleteMessages(IEnumerable<long> positions) - a position-only delete (positions are globally unique identity values) so handled messages across many flows can be removed in one query. Implemented inline in every store (InMemory, PostgreSQL, MariaDB, SqlServer); the existing per-StoredId overload is untouched. Extract the handled-message lifecycle out of MessageWatchdog into a dedicated MessageClearer: it owns the not-yet-cleared ignore-set (MarkPushed / NonClearedPositions) and the coalescing, retry-until-success delete pipeline (Clear). The watchdog becomes a pure poll/push loop with no shared mutable state. QueueManager now depends on the narrow IMessageClearer instead of IMessageWatchdog and calls Clear(positions) (the dead storedId argument is gone). A failed delete is retried until it lands - callers are told a message is gone only once it truly is - notifying the unhandled-exception handler on each failure. Tests: MessageClearerTests covers coalescing, retry-on-failure, concurrent load, and ignore-set trimming against a bare MessageClearer; the hand-rolled QueueManager tests use a one-line IMessageClearer stub.
Add an end-to-end test against the real InMemoryFunctionStore: append messages, await Clear for a subset, then assert GetMessages no longer returns them the instant the task completes. Also gives the new positions-only DeleteMessages its first real-store coverage.
Replace the hand-expanded @position{i} IN clause with the codebase's STRING_SPLIT idiom (same shape as SetReplica), passing the positions as one comma-joined @positions parameter.
Replace the hand-expanded IN (?, ?, ...) clause with FIND_IN_SET(position, ?), matching the existing GetMessagesForReplica idiom - one comma-joined parameter instead of one placeholder per position.
Callers already hold a List<long> (MessageClearer) and the type mirrors IMessageClearer.Clear, so IReadOnlyList flows through without conversions. Drop the now-redundant .ToList() materializations in the implementations.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Two cohesive changes: a new position-only message delete, and the extraction of the handled-message lifecycle out of
MessageWatchdoginto a dedicatedMessageClearer.1. Positions-only
DeleteMessagesIMessageStore.DeleteMessages(IEnumerable<long> positions)deletes messages by position regardless of flow. Positions are globally unique identity/auto-increment PKs, so noStoredIdis needed — letting handled messages across many flows be removed in a single query. Implemented inline in every store (InMemory, PostgreSQL, MariaDB, SqlServer); the existing per-StoredIdoverload is untouched.2.
MessageClearerextractionMessageWatchdogpreviously owned both the poll/push delivery loop and a coalescing delete pipeline + the_pushedPositionsignore-set. That second responsibility now lives inMessageClearer:MarkPushed(positions)/NonClearedPositions()(the not-yet-cleared positions the watchdog passes toGetMessagesForReplica).Clear(positions): the first caller starts a single drain; calls arriving mid-flight batch up and flush together (one query per burst). A failed delete is retried until it lands (callers are told a message is gone only once it truly is), notifying the unhandled-exception handler each failure.MessageWatchdogis now a pure poll/push loop with no shared mutable state.QueueManagerdepends on the narrowIMessageClearerinstead ofIMessageWatchdogand callsClear(positions)— the previously-deadstoredIdargument is gone.FunctionsRegistryconstructs oneMessageClearerand shares it between the watchdog and the invocation path.Tests
MessageClearerTests— coalescing, retry-on-failure (+ handler notified), concurrent load, and ignore-set trimming, all against a bareMessageClearer(justIMessageStore+ handler + retry delay — no watchdog plumbing).QueueManagertests use a one-lineIMessageClearerstub.Verification