Skip to content

Commit a87ac7a

Browse files
authored
Extract MessageClearer and add a positions-only DeleteMessages overload (#173)
1 parent a82290a commit a87ac7a

14 files changed

Lines changed: 450 additions & 110 deletions

File tree

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Cleipnir.ResilientFunctions.CoreRuntime;
7+
using Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
8+
using Cleipnir.ResilientFunctions.Domain;
9+
using Cleipnir.ResilientFunctions.Domain.Exceptions;
10+
using Cleipnir.ResilientFunctions.Messaging;
11+
using Cleipnir.ResilientFunctions.Storage;
12+
using Cleipnir.ResilientFunctions.Tests.Utils;
13+
using Microsoft.VisualStudio.TestTools.UnitTesting;
14+
using Shouldly;
15+
16+
namespace Cleipnir.ResilientFunctions.Tests.Messaging.InMemoryTests;
17+
18+
[TestClass]
19+
public class MessageClearerTests
20+
{
21+
private static readonly TimeSpan MaxWait = TimeSpan.FromSeconds(10);
22+
23+
[TestMethod]
24+
public async Task ClearCoalescesCallsArrivingWhileADeleteIsInFlight()
25+
{
26+
var firstDeleteReached = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
27+
var releaseFirstDelete = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
28+
var callCount = 0;
29+
30+
var store = new ControllableMessageStore(async _ =>
31+
{
32+
if (Interlocked.Increment(ref callCount) == 1)
33+
{
34+
firstDeleteReached.SetResult();
35+
await releaseFirstDelete.Task;
36+
}
37+
});
38+
var clearer = CreateClearer(store);
39+
40+
// First call starts the drain and blocks inside DeleteMessages.
41+
var first = clearer.Clear([1]);
42+
await firstDeleteReached.Task.WaitAsync(MaxWait);
43+
44+
// These arrive mid-flight, so they should be batched into a single follow-up delete.
45+
var second = clearer.Clear([2]);
46+
var third = clearer.Clear([3]);
47+
48+
releaseFirstDelete.SetResult();
49+
await Task.WhenAll(first, second, third).WaitAsync(MaxWait);
50+
51+
store.DeletedBatches.Count.ShouldBe(2);
52+
store.DeletedBatches[0].ShouldBe(new long[] { 1 });
53+
store.DeletedBatches[1].OrderBy(p => p).ShouldBe(new long[] { 2, 3 });
54+
}
55+
56+
[TestMethod]
57+
public async Task ClearRetriesUntilDeleteSucceedsAndNotifiesEachFailure()
58+
{
59+
var unhandledLock = new Lock();
60+
var unhandled = new List<FrameworkException>();
61+
var failuresRemaining = 3;
62+
63+
var store = new ControllableMessageStore(_ =>
64+
Interlocked.Decrement(ref failuresRemaining) >= 0
65+
? throw new InvalidOperationException("boom")
66+
: Task.CompletedTask
67+
);
68+
var clearer = CreateClearer(
69+
store,
70+
onUnhandledException: e => { lock (unhandledLock) unhandled.Add(e); },
71+
retryDelay: TimeSpan.FromMilliseconds(10)
72+
);
73+
74+
// Despite the first three deletes throwing, the caller's task completes (it is never faulted).
75+
await clearer.Clear([1]).WaitAsync(MaxWait);
76+
77+
store.DeletedPositions.ShouldContain(1L);
78+
lock (unhandledLock)
79+
{
80+
unhandled.Count.ShouldBe(3);
81+
unhandled.ShouldAllBe(e => e.InnerException is InvalidOperationException);
82+
}
83+
}
84+
85+
[TestMethod]
86+
public async Task ClearCompletesEveryCallerUnderConcurrentLoad()
87+
{
88+
var store = new ControllableMessageStore(_ => Task.CompletedTask);
89+
var clearer = CreateClearer(store);
90+
91+
var tasks = Enumerable
92+
.Range(0, 200)
93+
.Select(i => clearer.Clear([i]))
94+
.ToArray();
95+
96+
await Task.WhenAll(tasks).WaitAsync(MaxWait);
97+
98+
store.DeletedPositions.OrderBy(p => p).ShouldBe(Enumerable.Range(0, 200).Select(i => (long)i));
99+
}
100+
101+
[TestMethod]
102+
public async Task ClearRemovesPositionsFromIgnoreSetOnceDeleted()
103+
{
104+
var store = new ControllableMessageStore(_ => Task.CompletedTask);
105+
var clearer = CreateClearer(store);
106+
107+
clearer.MarkPushed([1, 2, 3]);
108+
clearer.NonClearedPositions().OrderBy(p => p).ShouldBe(new long[] { 1, 2, 3 });
109+
110+
await clearer.Clear([2]).WaitAsync(MaxWait);
111+
112+
clearer.NonClearedPositions().OrderBy(p => p).ShouldBe(new long[] { 1, 3 });
113+
}
114+
115+
[TestMethod]
116+
public async Task ClearedPositionsAreGoneFromTheStoreWhenTheReturnedTaskCompletes()
117+
{
118+
var functionStore = new InMemoryFunctionStore();
119+
var messageStore = functionStore.MessageStore;
120+
var storedId = TestStoredId.Create();
121+
122+
await messageStore.AppendMessages([
123+
new StoredIdAndMessage(storedId, Message()),
124+
new StoredIdAndMessage(storedId, Message()),
125+
new StoredIdAndMessage(storedId, Message())
126+
]);
127+
var positions = (await messageStore.GetMessages(storedId)).Select(m => m.Position).ToList();
128+
positions.Count.ShouldBe(3);
129+
130+
var clearer = CreateClearer(messageStore);
131+
await clearer.Clear(positions.Take(2).ToList()).WaitAsync(MaxWait);
132+
133+
// The instant Clear's task completes, the cleared messages must already be gone from the store.
134+
var remaining = (await messageStore.GetMessages(storedId)).Select(m => m.Position).ToList();
135+
remaining.ShouldBe(new[] { positions[2] });
136+
}
137+
138+
private static StoredMessage Message()
139+
=> new(MessageContent: new byte[] { 1 }, MessageType: new byte[] { 2 }, Position: 0, Replica: ReplicaId.Empty);
140+
141+
private static MessageClearer CreateClearer(
142+
IMessageStore messageStore,
143+
Action<FrameworkException>? onUnhandledException = null,
144+
TimeSpan? retryDelay = null)
145+
=> new(
146+
messageStore,
147+
new UnhandledExceptionHandler(onUnhandledException ?? (_ => { })),
148+
retryDelay ?? TimeSpan.FromSeconds(1)
149+
);
150+
151+
// Minimal IMessageStore that only implements the positions-only DeleteMessages (the sole method
152+
// MessageClearer touches); every other member is irrelevant to these tests.
153+
private sealed class ControllableMessageStore(Func<IReadOnlyList<long>, Task> onDelete) : IMessageStore
154+
{
155+
private readonly Lock _lock = new();
156+
public List<long[]> DeletedBatches { get; } = new();
157+
public IEnumerable<long> DeletedPositions => DeletedBatches.SelectMany(b => b);
158+
159+
public async Task DeleteMessages(IReadOnlyList<long> positions)
160+
{
161+
var batch = positions.ToArray();
162+
lock (_lock)
163+
DeletedBatches.Add(batch);
164+
await onDelete(batch);
165+
}
166+
167+
public Task Initialize() => throw new NotSupportedException();
168+
public Task AppendMessages(IReadOnlyList<StoredIdAndMessage> messages) => throw new NotSupportedException();
169+
public Task<bool> ReplaceMessage(StoredId storedId, long position, StoredMessage storedMessage) => throw new NotSupportedException();
170+
public Task DeleteMessages(StoredId storedId, IEnumerable<long> positions) => throw new NotSupportedException();
171+
public Task Truncate(StoredId storedId) => throw new NotSupportedException();
172+
public Task<IReadOnlyList<StoredMessage>> GetMessages(StoredId storedId) => throw new NotSupportedException();
173+
public Task<IReadOnlyList<StoredMessage>> GetMessages(StoredId storedId, IReadOnlyList<long> skipPositions) => throw new NotSupportedException();
174+
public Task<Dictionary<StoredId, List<StoredMessage>>> GetMessages(IEnumerable<StoredId> storedIds) => throw new NotSupportedException();
175+
public Task<List<StoredMessages>> GetMessagesForReplica(ReplicaId replicaId, IReadOnlyList<long> ignorePositions) => throw new NotSupportedException();
176+
public Task<List<StoredIdAndPosition>> GetCrashedReplicaMessages(IReadOnlySet<ReplicaId> liveReplicas) => throw new NotSupportedException();
177+
public Task SetReplica(IEnumerable<long> positions, ReplicaId newReplica, ReplicaId expectedReplica) => throw new NotSupportedException();
178+
}
179+
}

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ namespace Cleipnir.ResilientFunctions.Tests.Messaging.TestTemplates;
1818

1919
public abstract class MessagesSubscriptionTests
2020
{
21-
// These tests hand-roll a QueueManager, which delegates message deletion to IMessageWatchdog. They don't
21+
// These tests hand-roll a QueueManager, which delegates message deletion to IMessageClearer. They don't
2222
// assert on store cleanup, so a no-op stub suffices.
23-
private static readonly IMessageWatchdog StubMessageWatchdog = new NoopMessageWatchdog();
23+
private static readonly IMessageClearer StubMessageClearer = new NoopMessageClearer();
2424

25-
private sealed class NoopMessageWatchdog : IMessageWatchdog
25+
private sealed class NoopMessageClearer : IMessageClearer
2626
{
27-
public Task RemoveMessages(StoredId storedId, IReadOnlyList<long> positions) => Task.CompletedTask;
27+
public Task Clear(IReadOnlyList<long> positions) => Task.CompletedTask;
2828
}
2929

3030
public abstract Task EventsSubscriptionSunshineScenario();
@@ -582,7 +582,7 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task<IFuncti
582582
flowTimeouts,
583583
() => DateTime.UtcNow,
584584
SettingsWithDefaults.Default,
585-
StubMessageWatchdog
585+
StubMessageClearer
586586
);
587587

588588
var queueClient = await queueManager.CreateQueueClient();
@@ -646,7 +646,7 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task<IFunction
646646
minimumTimeout,
647647
() => DateTime.UtcNow,
648648
SettingsWithDefaults.Default,
649-
StubMessageWatchdog
649+
StubMessageClearer
650650
);
651651

652652

@@ -708,7 +708,7 @@ protected async Task PullEnvelopeReturnsEnvelopeWithReceiverAndSender(Task<IFunc
708708
flowTimeouts,
709709
() => DateTime.UtcNow,
710710
SettingsWithDefaults.Default,
711-
StubMessageWatchdog
711+
StubMessageClearer
712712
);
713713

714714
var queueClient = await queueManager.CreateQueueClient();

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@ internal class InvocationHelper<TParam, TReturn>
2626
private readonly StoredType _storedType;
2727
private readonly ReplicaId _replicaId;
2828
private readonly ResultBusyWaiter<TReturn> _resultBusyWaiter;
29-
private readonly MessageWatchdog _messageWatchdog;
29+
private readonly IMessageClearer _messageClearer;
3030
public UtcNow UtcNow { get; }
3131

3232
private ISerializer Serializer { get; }
3333

34-
public InvocationHelper(FlowType flowType, StoredType storedType, ReplicaId replicaId, bool isParamlessFunction, SettingsWithDefaults settings, IFunctionStore functionStore, ShutdownCoordinator shutdownCoordinator, ISerializer serializer, UtcNow utcNow, bool clearChildren, MessageWatchdog messageWatchdog)
34+
public InvocationHelper(FlowType flowType, StoredType storedType, ReplicaId replicaId, bool isParamlessFunction, SettingsWithDefaults settings, IFunctionStore functionStore, ShutdownCoordinator shutdownCoordinator, ISerializer serializer, UtcNow utcNow, bool clearChildren, IMessageClearer messageClearer)
3535
{
3636
_flowType = flowType;
3737
_isParamlessFunction = isParamlessFunction;
@@ -44,7 +44,7 @@ public InvocationHelper(FlowType flowType, StoredType storedType, ReplicaId repl
4444
_storedType = storedType;
4545
_replicaId = replicaId;
4646
_functionStore = functionStore;
47-
_messageWatchdog = messageWatchdog;
47+
_messageClearer = messageClearer;
4848
_resultBusyWaiter = new ResultBusyWaiter<TReturn>(_functionStore, Serializer);
4949
}
5050

@@ -413,7 +413,7 @@ public async Task<ExistingEffects> CreateExistingEffects(FlowId flowId)
413413
public ExistingMessages CreateExistingMessages(FlowId flowId) => new(MapToStoredId(flowId), _functionStore.MessageStore, Serializer);
414414

415415
public QueueManager CreateQueueManager(FlowId flowId, StoredId storedId, Effect effect, FlowState flowState, FlowTimeouts timeouts, UnhandledExceptionHandler unhandledExceptionHandler)
416-
=> new(flowId, storedId, _functionStore.MessageStore, Serializer, effect, flowState, unhandledExceptionHandler, timeouts, UtcNow, _settings, _messageWatchdog);
416+
=> new(flowId, storedId, _functionStore.MessageStore, Serializer, effect, flowState, unhandledExceptionHandler, timeouts, UtcNow, _settings, _messageClearer);
417417

418418
public StoredId MapToStoredId(FlowId flowId) => StoredId.Create(_storedType, flowId.Instance.Value);
419419

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
using System.Collections.Generic;
2+
using System.Threading.Tasks;
3+
4+
namespace Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
5+
6+
/// <summary>
7+
/// The slice of <see cref="MessageClearer"/> a QueueManager depends on: deleting handled messages from the store
8+
/// and dropping their positions from the watchdog's ignore-set. Exists so tests that hand-roll a QueueManager can
9+
/// pass a no-op stub instead of a fully wired clearer.
10+
/// </summary>
11+
internal interface IMessageClearer
12+
{
13+
Task Clear(IReadOnlyList<long> positions);
14+
}

Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/IMessageWatchdog.cs

Lines changed: 0 additions & 15 deletions
This file was deleted.

0 commit comments

Comments
 (0)