Skip to content

Commit a82290a

Browse files
authored
Move message deletion from QueueManager into MessageWatchdog.RemoveMessages (#172)
QueueManager no longer calls IMessageStore.DeleteMessages directly. Instead RemoveMessages now owns deletion: it deletes the handled positions from the store and then trims them from the watchdog's ignore-set (delete-before-trim so a no-longer-ignored position can't be re-fetched before it is gone from the store). The three QueueManager delete sites (Initialize replay, idempotency duplicate, AfterFlush) collapse to a single awaited RemoveMessages call. IMessageWatchdog.RemoveMessages gains the StoredId and becomes async; the hand-rolled QueueManager tests keep a one-line no-op stub.
1 parent 599a24e commit a82290a

4 files changed

Lines changed: 21 additions & 18 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ namespace Cleipnir.ResilientFunctions.Tests.Messaging.TestTemplates;
1818

1919
public abstract class MessagesSubscriptionTests
2020
{
21-
// These tests hand-roll a QueueManager, which depends on IMessageWatchdog only to report deleted message
22-
// positions. They don't exercise that path, so a no-op stub suffices.
21+
// These tests hand-roll a QueueManager, which delegates message deletion to IMessageWatchdog. They don't
22+
// assert on store cleanup, so a no-op stub suffices.
2323
private static readonly IMessageWatchdog StubMessageWatchdog = new NoopMessageWatchdog();
2424

2525
private sealed class NoopMessageWatchdog : IMessageWatchdog
2626
{
27-
public void RemoveMessages(IReadOnlyList<long> positions) { }
27+
public Task RemoveMessages(StoredId storedId, IReadOnlyList<long> positions) => Task.CompletedTask;
2828
}
2929

3030
public abstract Task EventsSubscriptionSunshineScenario();
Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
using System.Collections.Generic;
2+
using System.Threading.Tasks;
3+
using Cleipnir.ResilientFunctions.Storage;
24

35
namespace Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
46

57
/// <summary>
6-
/// The slice of <see cref="MessageWatchdog"/> a QueueManager depends on: reporting message positions it has
7-
/// deleted from the store so the watchdog can drop them from its ignore-set. Exists so tests that hand-roll a
8+
/// The slice of <see cref="MessageWatchdog"/> a QueueManager depends on: deleting handled messages from the
9+
/// store and dropping their positions from the watchdog's ignore-set. Exists so tests that hand-roll a
810
/// QueueManager can pass a no-op stub instead of a fully wired watchdog.
911
/// </summary>
1012
internal interface IMessageWatchdog
1113
{
12-
void RemoveMessages(IReadOnlyList<long> positions);
14+
Task RemoveMessages(StoredId storedId, IReadOnlyList<long> positions);
1315
}

Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/MessageWatchdog.cs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using Cleipnir.ResilientFunctions.Domain.Exceptions;
88
using Cleipnir.ResilientFunctions.Helpers;
99
using Cleipnir.ResilientFunctions.Messaging;
10+
using Cleipnir.ResilientFunctions.Storage;
1011

1112
namespace Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
1213

@@ -22,9 +23,9 @@ internal class MessageWatchdog : IMessageWatchdog
2223
private readonly UtcNow _utcNow;
2324

2425
// Positions already pushed to this replica's flows. Passed as ignore-set so messages are not re-delivered
25-
// on subsequent ticks. A QueueManager calls RemoveMessages once it has deleted the corresponding messages
26-
// from the store, trimming this set so it does not grow without bound. Guarded by _pushedPositionsLock
27-
// because RemoveMessages runs on flow threads concurrently with the watchdog loop.
26+
// on subsequent ticks. RemoveMessages (called by a QueueManager to delete handled messages) trims this set
27+
// so it does not grow without bound. Guarded by _pushedPositionsLock because RemoveMessages runs on flow
28+
// threads concurrently with the watchdog loop.
2829
private readonly HashSet<long> _pushedPositions = new();
2930
private readonly Lock _pushedPositionsLock = new();
3031

@@ -98,14 +99,17 @@ public async Task Start()
9899
}
99100

100101
/// <summary>
101-
/// Called by a QueueManager once it has deleted the given message positions from the store, so the
102-
/// watchdog drops them from its ignore-set instead of carrying them forever.
102+
/// Deletes the given handled message positions from the store on a QueueManager's behalf, then drops them
103+
/// from the ignore-set instead of carrying them forever. Deleting before trimming avoids re-fetching a
104+
/// position that is no longer ignored but not yet gone from the store.
103105
/// </summary>
104-
public void RemoveMessages(IReadOnlyList<long> positions)
106+
public async Task RemoveMessages(StoredId storedId, IReadOnlyList<long> positions)
105107
{
106108
if (positions.Count == 0)
107109
return;
108110

111+
await _messageStore.DeleteMessages(storedId, positions);
112+
109113
lock (_pushedPositionsLock)
110114
foreach (var position in positions)
111115
_pushedPositions.Remove(position);

Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,7 @@ private async Task Initialize()
8989

9090
if (_effect.TryGet<List<long>>(DeliveredPositionsId, out var positions) && positions is { Count: > 0 })
9191
{
92-
await _messageStore.DeleteMessages(_storedId, positions);
93-
_messageWatchdog.RemoveMessages(positions);
92+
await _messageWatchdog.RemoveMessages(_storedId, positions);
9493
positions.Clear();
9594
_effect.FlushlessUpsert(DeliveredPositionsId, positions, alias: null);
9695
}
@@ -228,8 +227,7 @@ private async Task ProcessMessages(IReadOnlyList<StoredMessage> messages)
228227

229228
if (idempotencyKey != null && !_idempotencyKeys.Add(idempotencyKey, position))
230229
{
231-
await _messageStore.DeleteMessages(_storedId, [position]);
232-
_messageWatchdog.RemoveMessages([position]);
230+
await _messageWatchdog.RemoveMessages(_storedId, [position]);
233231
continue;
234232
}
235233

@@ -287,8 +285,7 @@ public async Task AfterFlush()
287285
if (deliveredPositions.Count == 0 || _effect.IsDirty(DeliveredPositionsId))
288286
return;
289287

290-
await _messageStore.DeleteMessages(_storedId, deliveredPositions);
291-
_messageWatchdog.RemoveMessages(deliveredPositions);
288+
await _messageWatchdog.RemoveMessages(_storedId, deliveredPositions);
292289
deliveredPositions.Clear();
293290
_effect.FlushlessUpsert(DeliveredPositionsId, deliveredPositions, alias: null);
294291
}

0 commit comments

Comments
 (0)