Skip to content

Commit 090d2c0

Browse files
committed
Extract MessageClearer and add a positions-only DeleteMessages overload
Add IMessageStore.DeleteMessages(IEnumerable<long> positions) - a position-only delete (positions are globally unique identity values) so handled messages across many flows can be removed in one query. Implemented inline in every store (InMemory, PostgreSQL, MariaDB, SqlServer); the existing per-StoredId overload is untouched. Extract the handled-message lifecycle out of MessageWatchdog into a dedicated MessageClearer: it owns the not-yet-cleared ignore-set (MarkPushed / NonClearedPositions) and the coalescing, retry-until-success delete pipeline (Clear). The watchdog becomes a pure poll/push loop with no shared mutable state. QueueManager now depends on the narrow IMessageClearer instead of IMessageWatchdog and calls Clear(positions) (the dead storedId argument is gone). A failed delete is retried until it lands - callers are told a message is gone only once it truly is - notifying the unhandled-exception handler on each failure. Tests: MessageClearerTests covers coalescing, retry-on-failure, concurrent load, and ignore-set trimming against a bare MessageClearer; the hand-rolled QueueManager tests use a one-line IMessageClearer stub.
1 parent a82290a commit 090d2c0

14 files changed

Lines changed: 429 additions & 110 deletions

File tree

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Cleipnir.ResilientFunctions.CoreRuntime;
7+
using Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
8+
using Cleipnir.ResilientFunctions.Domain;
9+
using Cleipnir.ResilientFunctions.Domain.Exceptions;
10+
using Cleipnir.ResilientFunctions.Messaging;
11+
using Cleipnir.ResilientFunctions.Storage;
12+
using Microsoft.VisualStudio.TestTools.UnitTesting;
13+
using Shouldly;
14+
15+
namespace Cleipnir.ResilientFunctions.Tests.Messaging.InMemoryTests;
16+
17+
[TestClass]
18+
public class MessageClearerTests
19+
{
20+
private static readonly TimeSpan MaxWait = TimeSpan.FromSeconds(10);
21+
22+
[TestMethod]
23+
public async Task ClearCoalescesCallsArrivingWhileADeleteIsInFlight()
24+
{
25+
var firstDeleteReached = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
26+
var releaseFirstDelete = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
27+
var callCount = 0;
28+
29+
var store = new ControllableMessageStore(async _ =>
30+
{
31+
if (Interlocked.Increment(ref callCount) == 1)
32+
{
33+
firstDeleteReached.SetResult();
34+
await releaseFirstDelete.Task;
35+
}
36+
});
37+
var clearer = CreateClearer(store);
38+
39+
// First call starts the drain and blocks inside DeleteMessages.
40+
var first = clearer.Clear([1]);
41+
await firstDeleteReached.Task.WaitAsync(MaxWait);
42+
43+
// These arrive mid-flight, so they should be batched into a single follow-up delete.
44+
var second = clearer.Clear([2]);
45+
var third = clearer.Clear([3]);
46+
47+
releaseFirstDelete.SetResult();
48+
await Task.WhenAll(first, second, third).WaitAsync(MaxWait);
49+
50+
store.DeletedBatches.Count.ShouldBe(2);
51+
store.DeletedBatches[0].ShouldBe(new long[] { 1 });
52+
store.DeletedBatches[1].OrderBy(p => p).ShouldBe(new long[] { 2, 3 });
53+
}
54+
55+
[TestMethod]
56+
public async Task ClearRetriesUntilDeleteSucceedsAndNotifiesEachFailure()
57+
{
58+
var unhandledLock = new Lock();
59+
var unhandled = new List<FrameworkException>();
60+
var failuresRemaining = 3;
61+
62+
var store = new ControllableMessageStore(_ =>
63+
Interlocked.Decrement(ref failuresRemaining) >= 0
64+
? throw new InvalidOperationException("boom")
65+
: Task.CompletedTask
66+
);
67+
var clearer = CreateClearer(
68+
store,
69+
onUnhandledException: e => { lock (unhandledLock) unhandled.Add(e); },
70+
retryDelay: TimeSpan.FromMilliseconds(10)
71+
);
72+
73+
// Despite the first three deletes throwing, the caller's task completes (it is never faulted).
74+
await clearer.Clear([1]).WaitAsync(MaxWait);
75+
76+
store.DeletedPositions.ShouldContain(1L);
77+
lock (unhandledLock)
78+
{
79+
unhandled.Count.ShouldBe(3);
80+
unhandled.ShouldAllBe(e => e.InnerException is InvalidOperationException);
81+
}
82+
}
83+
84+
[TestMethod]
85+
public async Task ClearCompletesEveryCallerUnderConcurrentLoad()
86+
{
87+
var store = new ControllableMessageStore(_ => Task.CompletedTask);
88+
var clearer = CreateClearer(store);
89+
90+
var tasks = Enumerable
91+
.Range(0, 200)
92+
.Select(i => clearer.Clear([i]))
93+
.ToArray();
94+
95+
await Task.WhenAll(tasks).WaitAsync(MaxWait);
96+
97+
store.DeletedPositions.OrderBy(p => p).ShouldBe(Enumerable.Range(0, 200).Select(i => (long)i));
98+
}
99+
100+
[TestMethod]
101+
public async Task ClearRemovesPositionsFromIgnoreSetOnceDeleted()
102+
{
103+
var store = new ControllableMessageStore(_ => Task.CompletedTask);
104+
var clearer = CreateClearer(store);
105+
106+
clearer.MarkPushed([1, 2, 3]);
107+
clearer.NonClearedPositions().OrderBy(p => p).ShouldBe(new long[] { 1, 2, 3 });
108+
109+
await clearer.Clear([2]).WaitAsync(MaxWait);
110+
111+
clearer.NonClearedPositions().OrderBy(p => p).ShouldBe(new long[] { 1, 3 });
112+
}
113+
114+
private static MessageClearer CreateClearer(
115+
IMessageStore messageStore,
116+
Action<FrameworkException>? onUnhandledException = null,
117+
TimeSpan? retryDelay = null)
118+
=> new(
119+
messageStore,
120+
new UnhandledExceptionHandler(onUnhandledException ?? (_ => { })),
121+
retryDelay ?? TimeSpan.FromSeconds(1)
122+
);
123+
124+
// Minimal IMessageStore that only implements the positions-only DeleteMessages (the sole method
125+
// MessageClearer touches); every other member is irrelevant to these tests.
126+
private sealed class ControllableMessageStore(Func<IReadOnlyList<long>, Task> onDelete) : IMessageStore
127+
{
128+
private readonly Lock _lock = new();
129+
public List<long[]> DeletedBatches { get; } = new();
130+
public IEnumerable<long> DeletedPositions => DeletedBatches.SelectMany(b => b);
131+
132+
public async Task DeleteMessages(IEnumerable<long> positions)
133+
{
134+
var batch = positions.ToArray();
135+
lock (_lock)
136+
DeletedBatches.Add(batch);
137+
await onDelete(batch);
138+
}
139+
140+
public Task Initialize() => throw new NotSupportedException();
141+
public Task AppendMessages(IReadOnlyList<StoredIdAndMessage> messages) => throw new NotSupportedException();
142+
public Task<bool> ReplaceMessage(StoredId storedId, long position, StoredMessage storedMessage) => throw new NotSupportedException();
143+
public Task DeleteMessages(StoredId storedId, IEnumerable<long> positions) => throw new NotSupportedException();
144+
public Task Truncate(StoredId storedId) => throw new NotSupportedException();
145+
public Task<IReadOnlyList<StoredMessage>> GetMessages(StoredId storedId) => throw new NotSupportedException();
146+
public Task<IReadOnlyList<StoredMessage>> GetMessages(StoredId storedId, IReadOnlyList<long> skipPositions) => throw new NotSupportedException();
147+
public Task<Dictionary<StoredId, List<StoredMessage>>> GetMessages(IEnumerable<StoredId> storedIds) => throw new NotSupportedException();
148+
public Task<List<StoredMessages>> GetMessagesForReplica(ReplicaId replicaId, IReadOnlyList<long> ignorePositions) => throw new NotSupportedException();
149+
public Task<List<StoredIdAndPosition>> GetCrashedReplicaMessages(IReadOnlySet<ReplicaId> liveReplicas) => throw new NotSupportedException();
150+
public Task SetReplica(IEnumerable<long> positions, ReplicaId newReplica, ReplicaId expectedReplica) => throw new NotSupportedException();
151+
}
152+
}

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ namespace Cleipnir.ResilientFunctions.Tests.Messaging.TestTemplates;
1818

1919
public abstract class MessagesSubscriptionTests
2020
{
21-
// These tests hand-roll a QueueManager, which delegates message deletion to IMessageWatchdog. They don't
21+
// These tests hand-roll a QueueManager, which delegates message deletion to IMessageClearer. They don't
2222
// assert on store cleanup, so a no-op stub suffices.
23-
private static readonly IMessageWatchdog StubMessageWatchdog = new NoopMessageWatchdog();
23+
private static readonly IMessageClearer StubMessageClearer = new NoopMessageClearer();
2424

25-
private sealed class NoopMessageWatchdog : IMessageWatchdog
25+
private sealed class NoopMessageClearer : IMessageClearer
2626
{
27-
public Task RemoveMessages(StoredId storedId, IReadOnlyList<long> positions) => Task.CompletedTask;
27+
public Task Clear(IReadOnlyList<long> positions) => Task.CompletedTask;
2828
}
2929

3030
public abstract Task EventsSubscriptionSunshineScenario();
@@ -582,7 +582,7 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task<IFuncti
582582
flowTimeouts,
583583
() => DateTime.UtcNow,
584584
SettingsWithDefaults.Default,
585-
StubMessageWatchdog
585+
StubMessageClearer
586586
);
587587

588588
var queueClient = await queueManager.CreateQueueClient();
@@ -646,7 +646,7 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task<IFunction
646646
minimumTimeout,
647647
() => DateTime.UtcNow,
648648
SettingsWithDefaults.Default,
649-
StubMessageWatchdog
649+
StubMessageClearer
650650
);
651651

652652

@@ -708,7 +708,7 @@ protected async Task PullEnvelopeReturnsEnvelopeWithReceiverAndSender(Task<IFunc
708708
flowTimeouts,
709709
() => DateTime.UtcNow,
710710
SettingsWithDefaults.Default,
711-
StubMessageWatchdog
711+
StubMessageClearer
712712
);
713713

714714
var queueClient = await queueManager.CreateQueueClient();

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@ internal class InvocationHelper<TParam, TReturn>
2626
private readonly StoredType _storedType;
2727
private readonly ReplicaId _replicaId;
2828
private readonly ResultBusyWaiter<TReturn> _resultBusyWaiter;
29-
private readonly MessageWatchdog _messageWatchdog;
29+
private readonly IMessageClearer _messageClearer;
3030
public UtcNow UtcNow { get; }
3131

3232
private ISerializer Serializer { get; }
3333

34-
public InvocationHelper(FlowType flowType, StoredType storedType, ReplicaId replicaId, bool isParamlessFunction, SettingsWithDefaults settings, IFunctionStore functionStore, ShutdownCoordinator shutdownCoordinator, ISerializer serializer, UtcNow utcNow, bool clearChildren, MessageWatchdog messageWatchdog)
34+
public InvocationHelper(FlowType flowType, StoredType storedType, ReplicaId replicaId, bool isParamlessFunction, SettingsWithDefaults settings, IFunctionStore functionStore, ShutdownCoordinator shutdownCoordinator, ISerializer serializer, UtcNow utcNow, bool clearChildren, IMessageClearer messageClearer)
3535
{
3636
_flowType = flowType;
3737
_isParamlessFunction = isParamlessFunction;
@@ -44,7 +44,7 @@ public InvocationHelper(FlowType flowType, StoredType storedType, ReplicaId repl
4444
_storedType = storedType;
4545
_replicaId = replicaId;
4646
_functionStore = functionStore;
47-
_messageWatchdog = messageWatchdog;
47+
_messageClearer = messageClearer;
4848
_resultBusyWaiter = new ResultBusyWaiter<TReturn>(_functionStore, Serializer);
4949
}
5050

@@ -413,7 +413,7 @@ public async Task<ExistingEffects> CreateExistingEffects(FlowId flowId)
413413
public ExistingMessages CreateExistingMessages(FlowId flowId) => new(MapToStoredId(flowId), _functionStore.MessageStore, Serializer);
414414

415415
public QueueManager CreateQueueManager(FlowId flowId, StoredId storedId, Effect effect, FlowState flowState, FlowTimeouts timeouts, UnhandledExceptionHandler unhandledExceptionHandler)
416-
=> new(flowId, storedId, _functionStore.MessageStore, Serializer, effect, flowState, unhandledExceptionHandler, timeouts, UtcNow, _settings, _messageWatchdog);
416+
=> new(flowId, storedId, _functionStore.MessageStore, Serializer, effect, flowState, unhandledExceptionHandler, timeouts, UtcNow, _settings, _messageClearer);
417417

418418
public StoredId MapToStoredId(FlowId flowId) => StoredId.Create(_storedType, flowId.Instance.Value);
419419

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
using System.Collections.Generic;
2+
using System.Threading.Tasks;
3+
4+
namespace Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
5+
6+
/// <summary>
7+
/// The slice of <see cref="MessageClearer"/> a QueueManager depends on: deleting handled messages from the store
8+
/// and dropping their positions from the watchdog's ignore-set. Exists so tests that hand-roll a QueueManager can
9+
/// pass a no-op stub instead of a fully wired clearer.
10+
/// </summary>
11+
internal interface IMessageClearer
12+
{
13+
Task Clear(IReadOnlyList<long> positions);
14+
}

Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/IMessageWatchdog.cs

Lines changed: 0 additions & 15 deletions
This file was deleted.
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Cleipnir.ResilientFunctions.Domain.Exceptions;
7+
using Cleipnir.ResilientFunctions.Messaging;
8+
9+
namespace Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
10+
11+
/// <summary>
12+
/// Tracks the positions this replica has pushed to its flows but not yet cleared from the store, and clears them
13+
/// (deleting the messages) once a QueueManager reports them handled. The MessageWatchdog uses the ignore-set to
14+
/// avoid re-fetching pushed messages; QueueManagers call <see cref="Clear"/> to delete handled ones.
15+
/// </summary>
16+
internal sealed class MessageClearer(
17+
IMessageStore messageStore,
18+
UnhandledExceptionHandler unhandledExceptionHandler,
19+
TimeSpan retryDelay)
20+
: IMessageClearer
21+
{
22+
// Positions pushed to this replica's flows but not yet cleared from the store. Handed to the MessageWatchdog
23+
// as the ignore-set so they are not re-fetched; trimmed by Clear once their messages are deleted.
24+
private readonly HashSet<long> _pushedPositions = new();
25+
private readonly Lock _pushedPositionsLock = new();
26+
27+
// Coalescing delete pipeline (see Clear): the first caller starts the drain, callers that arrive while a
28+
// delete is in flight batch up and are flushed together. Guarded by _deleteLock; the drain trims
29+
// _pushedPositions under _pushedPositionsLock after each batch's delete lands.
30+
private readonly Lock _deleteLock = new();
31+
private List<long> _pendingDeletes = new();
32+
private TaskCompletionSource _pendingDeletesTcs = new();
33+
private bool _draining;
34+
35+
/// <summary>Records positions just pushed to flows so they are excluded from the next fetch.</summary>
36+
public void MarkPushed(IEnumerable<long> positions)
37+
{
38+
lock (_pushedPositionsLock)
39+
foreach (var position in positions)
40+
_pushedPositions.Add(position);
41+
}
42+
43+
/// <summary>Snapshot of the not-yet-cleared positions, passed to the store as the fetch ignore-set.</summary>
44+
public IReadOnlyList<long> NonClearedPositions()
45+
{
46+
lock (_pushedPositionsLock)
47+
return _pushedPositions.ToList();
48+
}
49+
50+
/// <summary>
51+
/// Deletes the given handled message positions from the store on a QueueManager's behalf, then drops them
52+
/// from the ignore-set instead of carrying them forever. The returned task completes only once that has
53+
/// happened for the caller's positions, so the caller knows it will not receive those messages again.
54+
///
55+
/// Calls are coalesced: the first one starts the drain and runs immediately, while calls that arrive while a
56+
/// delete is in flight are batched and flushed together - collapsing a burst of small deletes into one query.
57+
/// Deleting before trimming avoids re-fetching a position that is no longer ignored but not yet gone from the
58+
/// store.
59+
/// </summary>
60+
public Task Clear(IReadOnlyList<long> positions)
61+
{
62+
if (positions.Count == 0)
63+
return Task.CompletedTask;
64+
65+
Task completion;
66+
bool startDraining;
67+
lock (_deleteLock)
68+
{
69+
_pendingDeletes.AddRange(positions);
70+
completion = _pendingDeletesTcs.Task;
71+
startDraining = !_draining;
72+
if (startDraining)
73+
_draining = true;
74+
}
75+
76+
if (startDraining)
77+
_ = Task.Run(DrainPendingDeletes);
78+
79+
return completion;
80+
}
81+
82+
// Drains the pending-delete queue one batch at a time until it is empty. Only the drain that set _draining
83+
// runs at a time; calls arriving mid-flight just enqueue and are picked up by a later batch. A failing batch
84+
// is retried until it lands (callers are told a message is gone only once it truly is), notifying the
85+
// unhandled-exception handler on each failure.
86+
private async Task DrainPendingDeletes()
87+
{
88+
while (true)
89+
{
90+
List<long> pendingDeletes;
91+
TaskCompletionSource completionTcs;
92+
lock (_deleteLock)
93+
{
94+
if (_pendingDeletes.Count == 0)
95+
{
96+
_draining = false;
97+
return;
98+
}
99+
100+
pendingDeletes = _pendingDeletes;
101+
_pendingDeletes = new List<long>();
102+
completionTcs = _pendingDeletesTcs;
103+
_pendingDeletesTcs = new TaskCompletionSource();
104+
}
105+
106+
// Retry until the delete lands - a failed delete must not fault the callers (they are told the
107+
// message is gone only once it truly is). Each failure notifies the unhandled-exception handler.
108+
while (true)
109+
{
110+
try
111+
{
112+
await messageStore.DeleteMessages(pendingDeletes);
113+
break;
114+
}
115+
catch (Exception exception)
116+
{
117+
unhandledExceptionHandler.Invoke(
118+
new FrameworkException(
119+
$"{nameof(MessageClearer)} failed to delete handled messages - retrying",
120+
innerException: exception
121+
)
122+
);
123+
await Task.Delay(retryDelay);
124+
}
125+
}
126+
127+
lock (_pushedPositionsLock)
128+
foreach (var position in pendingDeletes)
129+
_pushedPositions.Remove(position);
130+
131+
// Offload to the pool so awaiting callers' continuations do not run on the drain loop's thread.
132+
_ = Task.Run(completionTcs.SetResult);
133+
}
134+
}
135+
}

0 commit comments

Comments
 (0)