Skip to content

Commit a8f1217

Browse files
committed
Give QueueManager a MessageWatchdog reference to trim handled messages
Thread the registry's MessageWatchdog through InvocationHelper into each QueueManager via constructor injection (no cyclic dependency: the watchdog is built once before any InvocationHelper/QueueManager and only reaches a QueueManager at runtime via FlowsManagers -> FlowState -> Push). QueueManager now calls MessageWatchdog.RemoveMessages once it deletes messages from the store (AfterFlush, the Initialize replay, and the idempotency-duplicate path) so the watchdog drops those positions from its ignore-set instead of carrying them forever - realizing the existing TODO on _pushedPositions. RemoveMessages runs on flow threads, so _pushedPositions is now guarded by a lock. The hand-rolled QueueManager tests in MessagesSubscriptionTests get a shared CreateMessageWatchdog helper (the watchdog is never started, so RemoveMessages no-ops against its empty ignore-set).
1 parent 3d87fbb commit a8f1217

5 files changed

Lines changed: 67 additions & 14 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Threading.Tasks;
55
using Cleipnir.ResilientFunctions.CoreRuntime;
66
using Cleipnir.ResilientFunctions.CoreRuntime.Invocation;
7+
using Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
78
using Cleipnir.ResilientFunctions.CoreRuntime.Serialization;
89
using Cleipnir.ResilientFunctions.Domain;
910
using Cleipnir.ResilientFunctions.Helpers;
@@ -17,6 +18,20 @@ namespace Cleipnir.ResilientFunctions.Tests.Messaging.TestTemplates;
1718

1819
public abstract class MessagesSubscriptionTests
1920
{
21+
// These tests hand-roll a QueueManager, which now requires a MessageWatchdog. The watchdog is never
22+
// started here, so it only serves QueueManager.RemoveMessages calls (a no-op against its empty ignore-set).
23+
private static MessageWatchdog CreateMessageWatchdog(IFunctionStore functionStore, UnhandledExceptionHandler unhandledExceptionHandler)
24+
=> new(
25+
functionStore.MessageStore,
26+
new FlowsManagers(functionStore),
27+
new ClusterInfo(ReplicaId.NewId()),
28+
new ShutdownCoordinator(),
29+
unhandledExceptionHandler,
30+
checkFrequency: TimeSpan.FromSeconds(1),
31+
delayStartUp: TimeSpan.Zero,
32+
() => DateTime.UtcNow
33+
);
34+
2035
public abstract Task EventsSubscriptionSunshineScenario();
2136
protected async Task EventsSubscriptionSunshineScenario(Task<IFunctionStore> functionStoreTask)
2237
{
@@ -571,7 +586,8 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task<IFuncti
571586
unhandledExceptionHandler,
572587
flowTimeouts,
573588
() => DateTime.UtcNow,
574-
SettingsWithDefaults.Default
589+
SettingsWithDefaults.Default,
590+
CreateMessageWatchdog(functionStore, unhandledExceptionHandler)
575591
);
576592

577593
var queueClient = await queueManager.CreateQueueClient();
@@ -634,7 +650,8 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task<IFunction
634650
unhandledExceptionHandler,
635651
minimumTimeout,
636652
() => DateTime.UtcNow,
637-
SettingsWithDefaults.Default
653+
SettingsWithDefaults.Default,
654+
CreateMessageWatchdog(functionStore, unhandledExceptionHandler)
638655
);
639656

640657

@@ -695,7 +712,8 @@ protected async Task PullEnvelopeReturnsEnvelopeWithReceiverAndSender(Task<IFunc
695712
unhandledExceptionHandler,
696713
flowTimeouts,
697714
() => DateTime.UtcNow,
698-
SettingsWithDefaults.Default
715+
SettingsWithDefaults.Default,
716+
CreateMessageWatchdog(functionStore, unhandledExceptionHandler)
699717
);
700718

701719
var queueClient = await queueManager.CreateQueueClient();

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Runtime.ExceptionServices;
55
using System.Threading.Tasks;
66
using Cleipnir.ResilientFunctions.CoreRuntime.Serialization;
7+
using Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
78
using Cleipnir.ResilientFunctions.Domain;
89
using Cleipnir.ResilientFunctions.Messaging;
910
using Cleipnir.ResilientFunctions.Domain.Exceptions;
@@ -25,11 +26,12 @@ internal class InvocationHelper<TParam, TReturn>
2526
private readonly StoredType _storedType;
2627
private readonly ReplicaId _replicaId;
2728
private readonly ResultBusyWaiter<TReturn> _resultBusyWaiter;
29+
private readonly MessageWatchdog _messageWatchdog;
2830
public UtcNow UtcNow { get; }
2931

3032
private ISerializer Serializer { get; }
3133

32-
public InvocationHelper(FlowType flowType, StoredType storedType, ReplicaId replicaId, bool isParamlessFunction, SettingsWithDefaults settings, IFunctionStore functionStore, ShutdownCoordinator shutdownCoordinator, ISerializer serializer, UtcNow utcNow, bool clearChildren)
34+
public InvocationHelper(FlowType flowType, StoredType storedType, ReplicaId replicaId, bool isParamlessFunction, SettingsWithDefaults settings, IFunctionStore functionStore, ShutdownCoordinator shutdownCoordinator, ISerializer serializer, UtcNow utcNow, bool clearChildren, MessageWatchdog messageWatchdog)
3335
{
3436
_flowType = flowType;
3537
_isParamlessFunction = isParamlessFunction;
@@ -42,6 +44,7 @@ public InvocationHelper(FlowType flowType, StoredType storedType, ReplicaId repl
4244
_storedType = storedType;
4345
_replicaId = replicaId;
4446
_functionStore = functionStore;
47+
_messageWatchdog = messageWatchdog;
4548
_resultBusyWaiter = new ResultBusyWaiter<TReturn>(_functionStore, Serializer);
4649
}
4750

@@ -410,7 +413,7 @@ public async Task<ExistingEffects> CreateExistingEffects(FlowId flowId)
410413
public ExistingMessages CreateExistingMessages(FlowId flowId) => new(MapToStoredId(flowId), _functionStore.MessageStore, Serializer);
411414

412415
public QueueManager CreateQueueManager(FlowId flowId, StoredId storedId, Effect effect, FlowState flowState, FlowTimeouts timeouts, UnhandledExceptionHandler unhandledExceptionHandler)
413-
=> new(flowId, storedId, _functionStore.MessageStore, Serializer, effect, flowState, unhandledExceptionHandler, timeouts, UtcNow, _settings);
416+
=> new(flowId, storedId, _functionStore.MessageStore, Serializer, effect, flowState, unhandledExceptionHandler, timeouts, UtcNow, _settings, _messageWatchdog);
414417

415418
public StoredId MapToStoredId(FlowId flowId) => StoredId.Create(_storedType, flowId.Instance.Value);
416419

Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/MessageWatchdog.cs

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Linq;
4+
using System.Threading;
45
using System.Threading.Tasks;
56
using Cleipnir.ResilientFunctions.Domain;
67
using Cleipnir.ResilientFunctions.Domain.Exceptions;
@@ -21,9 +22,11 @@ internal class MessageWatchdog
2122
private readonly UtcNow _utcNow;
2223

2324
// Positions already pushed to this replica's flows. Passed as ignore-set so messages are not re-delivered
24-
// on subsequent ticks. Version 1: ever-growing - to be refined once the QueueManager reports the positions
25-
// it has persisted into its effects.
25+
// on subsequent ticks. A QueueManager calls RemoveMessages once it has deleted the corresponding messages
26+
// from the store, trimming this set so it does not grow without bound. Guarded by _pushedPositionsLock
27+
// because RemoveMessages runs on flow threads concurrently with the watchdog loop.
2628
private readonly HashSet<long> _pushedPositions = new();
29+
private readonly Lock _pushedPositionsLock = new();
2730

2831
public MessageWatchdog(
2932
IMessageStore messageStore,
@@ -59,12 +62,17 @@ public async Task Start()
5962
// Messages destined for flows currently owned by this replica (replica = COALESCE(owner, publisher)).
6063
// FlowsManagers.Push routes each group to its flow type's manager and delivers only to live
6164
// flows; entries for non-live flows (or unregistered types) are ignored.
62-
var messageGroups = await _messageStore.GetMessagesForReplica(_clusterInfo.ReplicaId, _pushedPositions.ToList());
65+
List<long> ignorePositions;
66+
lock (_pushedPositionsLock)
67+
ignorePositions = _pushedPositions.ToList();
68+
69+
var messageGroups = await _messageStore.GetMessagesForReplica(_clusterInfo.ReplicaId, ignorePositions);
6370
if (messageGroups.Count > 0)
6471
{
65-
foreach (var group in messageGroups)
66-
foreach (var message in group.Messages)
67-
_pushedPositions.Add(message.Position);
72+
lock (_pushedPositionsLock)
73+
foreach (var group in messageGroups)
74+
foreach (var message in group.Messages)
75+
_pushedPositions.Add(message.Position);
6876

6977
await _flowsManagers.Push(messageGroups);
7078
}
@@ -88,4 +96,18 @@ public async Task Start()
8896
goto Start;
8997
}
9098
}
99+
100+
/// <summary>
101+
/// Called by a QueueManager once it has deleted the given message positions from the store, so the
102+
/// watchdog drops them from its ignore-set instead of carrying them forever.
103+
/// </summary>
104+
public void RemoveMessages(IReadOnlyList<long> positions)
105+
{
106+
if (positions.Count == 0)
107+
return;
108+
109+
lock (_pushedPositionsLock)
110+
foreach (var position in positions)
111+
_pushedPositions.Remove(position);
112+
}
91113
}

Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,8 @@ public FuncRegistration<TParam, TReturn> RegisterFunc<TParam, TReturn>(
246246
_shutdownCoordinator,
247247
serializer,
248248
_settings.UtcNow,
249-
settings?.ClearChildrenAfterCapture ?? true
249+
settings?.ClearChildrenAfterCapture ?? true,
250+
_messageWatchdog
250251
);
251252
var invoker = new Invoker<TParam, TReturn>(
252253
flowType,
@@ -329,7 +330,8 @@ private ParamlessRegistration RegisterParamless(
329330
_shutdownCoordinator,
330331
serializer,
331332
_settings.UtcNow,
332-
settings?.ClearChildrenAfterCapture ?? true
333+
settings?.ClearChildrenAfterCapture ?? true,
334+
_messageWatchdog
333335
);
334336
var invoker = new Invoker<Unit, Unit>(
335337
flowType,
@@ -412,7 +414,8 @@ public ActionRegistration<TParam> RegisterAction<TParam>(
412414
_shutdownCoordinator,
413415
serializer,
414416
_settings.UtcNow,
415-
settings?.ClearChildrenAfterCapture ?? true
417+
settings?.ClearChildrenAfterCapture ?? true,
418+
_messageWatchdog
416419
);
417420
var rActionInvoker = new Invoker<TParam, Unit>(
418421
flowType,

Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Threading.Tasks;
66
using Cleipnir.ResilientFunctions.CoreRuntime;
77
using Cleipnir.ResilientFunctions.CoreRuntime.Serialization;
8+
using Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
89
using Cleipnir.ResilientFunctions.Domain;
910
using Cleipnir.ResilientFunctions.Domain.Exceptions.Commands;
1011
using Cleipnir.ResilientFunctions.Helpers;
@@ -31,6 +32,7 @@ internal class QueueManager : IDisposable
3132
private readonly FlowTimeouts _timeouts;
3233
private readonly UtcNow _utcNow;
3334
private readonly SettingsWithDefaults _settings;
35+
private readonly MessageWatchdog _messageWatchdog;
3436
private readonly IdempotencyKeys _idempotencyKeys;
3537

3638
private readonly SemaphoreSlim _initializeSemaphore = new(1);
@@ -55,6 +57,7 @@ public QueueManager(
5557
FlowTimeouts timeouts,
5658
UtcNow utcNow,
5759
SettingsWithDefaults settings,
60+
MessageWatchdog messageWatchdog,
5861
int maxIdempotencyKeyCount = 100,
5962
TimeSpan? maxIdempotencyKeyTtl = null)
6063
{
@@ -68,6 +71,7 @@ public QueueManager(
6871
_timeouts = timeouts;
6972
_utcNow = utcNow;
7073
_settings = settings;
74+
_messageWatchdog = messageWatchdog;
7175
_idempotencyKeys = new IdempotencyKeys(IdempotencyKeysRoot, _effect, maxIdempotencyKeyCount, maxIdempotencyKeyTtl, utcNow);
7276
}
7377

@@ -86,6 +90,7 @@ private async Task Initialize()
8690
if (_effect.TryGet<List<long>>(DeliveredPositionsId, out var positions) && positions is { Count: > 0 })
8791
{
8892
await _messageStore.DeleteMessages(_storedId, positions);
93+
_messageWatchdog.RemoveMessages(positions);
8994
positions.Clear();
9095
_effect.FlushlessUpsert(DeliveredPositionsId, positions, alias: null);
9196
}
@@ -224,6 +229,7 @@ private async Task ProcessMessages(IReadOnlyList<StoredMessage> messages)
224229
if (idempotencyKey != null && !_idempotencyKeys.Add(idempotencyKey, position))
225230
{
226231
await _messageStore.DeleteMessages(_storedId, [position]);
232+
_messageWatchdog.RemoveMessages([position]);
227233
continue;
228234
}
229235

@@ -282,6 +288,7 @@ public async Task AfterFlush()
282288
return;
283289

284290
await _messageStore.DeleteMessages(_storedId, deliveredPositions);
291+
_messageWatchdog.RemoveMessages(deliveredPositions);
285292
deliveredPositions.Clear();
286293
_effect.FlushlessUpsert(DeliveredPositionsId, deliveredPositions, alias: null);
287294
}

0 commit comments

Comments
 (0)