Skip to content

Commit a9fdf99

Browse files
committed
Route QueueManager message fetch through MessageWatchdog
QueueManager no longer holds an IMessageStore. Its FetchAndNotify now pulls through a MessageFetcher delegate wired to MessageWatchdog.FetchMessages, so all IMessageStore.GetMessages access is owned by the watchdog (on-demand single-flow fetch plus the existing replica poll loop). The on-demand fetch passes only the QueueManager's per-instance fetched positions as the skip set and does not consult the clearer's global pushed-set, keeping subscribe-time and restart-from-replay behaviour identical to the previous in-QueueManager fetch.
1 parent d43aa95 commit a9fdf99

6 files changed

Lines changed: 41 additions & 12 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,7 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task<IFuncti
574574
var queueManager = new QueueManager(
575575
workflow.FlowId,
576576
workflow.StoredId,
577-
functionStore.MessageStore,
577+
functionStore.MessageStore.GetMessages,
578578
exceptionThrowingSerializer,
579579
workflow.Effect,
580580
flowState,
@@ -638,7 +638,7 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task<IFunction
638638
var queueManager = new QueueManager(
639639
workflow.FlowId,
640640
workflow.StoredId,
641-
functionStore.MessageStore,
641+
functionStore.MessageStore.GetMessages,
642642
DefaultSerializer.Instance,
643643
workflow.Effect,
644644
flowState,
@@ -700,7 +700,7 @@ protected async Task PullEnvelopeReturnsEnvelopeWithReceiverAndSender(Task<IFunc
700700
var queueManager = new QueueManager(
701701
workflow.FlowId,
702702
workflow.StoredId,
703-
functionStore.MessageStore,
703+
functionStore.MessageStore.GetMessages,
704704
DefaultSerializer.Instance,
705705
workflow.Effect,
706706
flowState,

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,12 @@ internal class InvocationHelper<TParam, TReturn>
2727
private readonly ReplicaId _replicaId;
2828
private readonly ResultBusyWaiter<TReturn> _resultBusyWaiter;
2929
private readonly IMessageClearer _messageClearer;
30+
private readonly MessageFetcher _fetchMessages;
3031
public UtcNow UtcNow { get; }
3132

3233
private ISerializer Serializer { get; }
3334

34-
public InvocationHelper(FlowType flowType, StoredType storedType, ReplicaId replicaId, bool isParamlessFunction, SettingsWithDefaults settings, IFunctionStore functionStore, ShutdownCoordinator shutdownCoordinator, ISerializer serializer, UtcNow utcNow, bool clearChildren, IMessageClearer messageClearer)
35+
public InvocationHelper(FlowType flowType, StoredType storedType, ReplicaId replicaId, bool isParamlessFunction, SettingsWithDefaults settings, IFunctionStore functionStore, ShutdownCoordinator shutdownCoordinator, ISerializer serializer, UtcNow utcNow, bool clearChildren, IMessageClearer messageClearer, MessageFetcher fetchMessages)
3536
{
3637
_flowType = flowType;
3738
_isParamlessFunction = isParamlessFunction;
@@ -45,6 +46,7 @@ public InvocationHelper(FlowType flowType, StoredType storedType, ReplicaId repl
4546
_replicaId = replicaId;
4647
_functionStore = functionStore;
4748
_messageClearer = messageClearer;
49+
_fetchMessages = fetchMessages;
4850
_resultBusyWaiter = new ResultBusyWaiter<TReturn>(_functionStore, Serializer);
4951
}
5052

@@ -413,7 +415,7 @@ public async Task<ExistingEffects> CreateExistingEffects(FlowId flowId)
413415
public ExistingMessages CreateExistingMessages(FlowId flowId) => new(MapToStoredId(flowId), _functionStore.MessageStore, Serializer);
414416

415417
public QueueManager CreateQueueManager(FlowId flowId, StoredId storedId, Effect effect, FlowState flowState, FlowTimeouts timeouts, UnhandledExceptionHandler unhandledExceptionHandler)
416-
=> new(flowId, storedId, _functionStore.MessageStore, Serializer, effect, flowState, unhandledExceptionHandler, timeouts, UtcNow, _settings, _messageClearer);
418+
=> new(flowId, storedId, _fetchMessages, Serializer, effect, flowState, unhandledExceptionHandler, timeouts, UtcNow, _settings, _messageClearer);
417419

418420
public StoredId MapToStoredId(FlowId flowId) => StoredId.Create(_storedType, flowId.Instance.Value);
419421

Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/MessageWatchdog.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
using System;
2+
using System.Collections.Generic;
23
using System.Linq;
34
using System.Threading.Tasks;
45
using Cleipnir.ResilientFunctions.Domain;
56
using Cleipnir.ResilientFunctions.Domain.Exceptions;
67
using Cleipnir.ResilientFunctions.Helpers;
78
using Cleipnir.ResilientFunctions.Messaging;
9+
using Cleipnir.ResilientFunctions.Storage;
810

911
namespace Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
1012

@@ -19,6 +21,16 @@ internal class MessageWatchdog(
1921
TimeSpan delayStartUp,
2022
UtcNow utcNow)
2123
{
24+
/// <summary>
25+
/// On-demand fetch for a single flow, used by its <see cref="Queuing.QueueManager"/> instead of reaching into
26+
/// the message store directly - so all <see cref="IMessageStore"/> access stays owned by the watchdog. Unlike
27+
/// the poll loop below this does not consult the clearer's pushed-set: the caller passes its own already-fetched
28+
/// positions as <paramref name="skipPositions"/>, which keeps the subscribe-time and restart-from-replay fetch
29+
/// behaviour identical to the previous in-QueueManager fetch.
30+
/// </summary>
31+
public Task<IReadOnlyList<StoredMessage>> FetchMessages(StoredId storedId, IReadOnlyList<long> skipPositions)
32+
=> messageStore.GetMessages(storedId, skipPositions);
33+
2234
public async Task Start()
2335
{
2436
await Task.Delay(delayStartUp);

Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,8 @@ public FuncRegistration<TParam, TReturn> RegisterFunc<TParam, TReturn>(
255255
serializer,
256256
_settings.UtcNow,
257257
settings?.ClearChildrenAfterCapture ?? true,
258-
_messageClearer
258+
_messageClearer,
259+
_messageWatchdog.FetchMessages
259260
);
260261
var invoker = new Invoker<TParam, TReturn>(
261262
flowType,
@@ -339,7 +340,8 @@ private ParamlessRegistration RegisterParamless(
339340
serializer,
340341
_settings.UtcNow,
341342
settings?.ClearChildrenAfterCapture ?? true,
342-
_messageClearer
343+
_messageClearer,
344+
_messageWatchdog.FetchMessages
343345
);
344346
var invoker = new Invoker<Unit, Unit>(
345347
flowType,
@@ -423,7 +425,8 @@ public ActionRegistration<TParam> RegisterAction<TParam>(
423425
serializer,
424426
_settings.UtcNow,
425427
settings?.ClearChildrenAfterCapture ?? true,
426-
_messageClearer
428+
_messageClearer,
429+
_messageWatchdog.FetchMessages
427430
);
428431
var rActionInvoker = new Invoker<TParam, Unit>(
429432
flowType,
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using System.Collections.Generic;
2+
using System.Threading.Tasks;
3+
using Cleipnir.ResilientFunctions.Storage;
4+
5+
namespace Cleipnir.ResilientFunctions.Messaging;
6+
7+
/// <summary>
8+
/// Fetches the not-yet-skipped messages for a single flow. Implemented by the MessageWatchdog so that all
9+
/// IMessageStore access is owned by the watchdog: the QueueManager pulls on-demand through this delegate
10+
/// instead of reaching into the message store itself.
11+
/// </summary>
12+
public delegate Task<IReadOnlyList<StoredMessage>> MessageFetcher(StoredId storedId, IReadOnlyList<long> skipPositions);

Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ internal class QueueManager : IDisposable
2424

2525
private readonly FlowId _flowId;
2626
private readonly StoredId _storedId;
27-
private readonly IMessageStore _messageStore;
27+
private readonly MessageFetcher _fetchMessages;
2828
private readonly ISerializer _serializer;
2929
private readonly Effect _effect;
3030
private readonly FlowState _flowState;
@@ -49,7 +49,7 @@ internal class QueueManager : IDisposable
4949
public QueueManager(
5050
FlowId flowId,
5151
StoredId storedId,
52-
IMessageStore messageStore,
52+
MessageFetcher fetchMessages,
5353
ISerializer serializer,
5454
Effect effect,
5555
FlowState flowState,
@@ -63,7 +63,7 @@ public QueueManager(
6363
{
6464
_flowId = flowId;
6565
_storedId = storedId;
66-
_messageStore = messageStore;
66+
_fetchMessages = fetchMessages;
6767
_serializer = serializer;
6868
_effect = effect;
6969
_flowState = flowState;
@@ -199,7 +199,7 @@ private async Task FetchAndNotify()
199199
lock (_lock)
200200
skipPositions = _fetchedPositions.ToList();
201201

202-
var messages = await _messageStore.GetMessages(_storedId, skipPositions);
202+
var messages = await _fetchMessages(_storedId, skipPositions);
203203
await ProcessMessages(messages);
204204
}
205205
finally

0 commit comments

Comments
 (0)