Skip to content

Commit d43aa95

Browse files
authored
Remove redundant DeleteMessages(StoredId, positions) IMessageStore overload (#174)
* Remove redundant DeleteMessages(StoredId, positions) IMessageStore overload Message positions are globally-unique identity values, so the StoredId filter was redundant. ExistingMessages.Remove now routes through the positions-only DeleteMessages overload, and the two-arg overload is removed from the interface, all four store implementations, and the PostgreSQL/MariaDB/SqlServer SQL generators. The shared message-store test templates are migrated to the positions-only overload (method names unchanged, so per-store override files are untouched), giving that overload cross-store coverage it previously lacked. * Fix flaky PostponingExistingFunctionFromControlPanelSucceeds The test postponed to new DateTime(1_000_000) - a year-0001 timestamp that is already expired - so the PostponedWatchdog could resume the function and flip its status from Postponed to Executing before the assertions ran. Postpone to a future timestamp (DateTime.UtcNow.AddDays(1)) instead, matching the sibling PostponingExistingActionFromControlPanelSucceeds test, which closes the race.
1 parent a87ac7a commit d43aa95

12 files changed

Lines changed: 22 additions & 101 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/Messaging/InMemoryTests/MessageClearerTests.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,6 @@ public async Task DeleteMessages(IReadOnlyList<long> positions)
167167
public Task Initialize() => throw new NotSupportedException();
168168
public Task AppendMessages(IReadOnlyList<StoredIdAndMessage> messages) => throw new NotSupportedException();
169169
public Task<bool> ReplaceMessage(StoredId storedId, long position, StoredMessage storedMessage) => throw new NotSupportedException();
170-
public Task DeleteMessages(StoredId storedId, IEnumerable<long> positions) => throw new NotSupportedException();
171170
public Task Truncate(StoredId storedId) => throw new NotSupportedException();
172171
public Task<IReadOnlyList<StoredMessage>> GetMessages(StoredId storedId) => throw new NotSupportedException();
173172
public Task<IReadOnlyList<StoredMessage>> GetMessages(StoredId storedId, IReadOnlyList<long> skipPositions) => throw new NotSupportedException();

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessageStoreTests.cs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -845,7 +845,7 @@ await functionStore.CreateFunction(
845845
messages.Count.ShouldBe(4);
846846

847847
// Delete the first and third messages
848-
await messageStore.DeleteMessages(functionId, new[] { messages[0].Position, messages[2].Position });
848+
await messageStore.DeleteMessages(new[] { messages[0].Position, messages[2].Position });
849849

850850
var remainingMessages = (await messageStore.GetMessages(functionId)).ToList();
851851
remainingMessages.Count.ShouldBe(2);
@@ -878,7 +878,7 @@ await functionStore.CreateFunction(
878878
var messages = (await messageStore.GetMessages(functionId)).ToList();
879879
messages.Count.ShouldBe(2);
880880

881-
await messageStore.DeleteMessages(functionId, new[] { messages[0].Position });
881+
await messageStore.DeleteMessages(new[] { messages[0].Position });
882882

883883
var remainingMessages = (await messageStore.GetMessages(functionId)).ToList();
884884
remainingMessages.Count.ShouldBe(1);
@@ -902,17 +902,25 @@ await functionStore.CreateFunction(
902902
var messageStore = functionStore.MessageStore;
903903

904904
const string msg1 = "message1";
905+
const string msg2 = "message2";
905906

906907
await messageStore.AppendMessage(functionId, new StoredMessage(msg1.ToJson().ToUtf8Bytes(), typeof(string).SimpleQualifiedName().ToUtf8Bytes(), Replica: ReplicaId.Empty, Position: 0));
908+
await messageStore.AppendMessage(functionId, new StoredMessage(msg2.ToJson().ToUtf8Bytes(), typeof(string).SimpleQualifiedName().ToUtf8Bytes(), Replica: ReplicaId.Empty, Position: 0));
907909

908910
var messages = (await messageStore.GetMessages(functionId)).ToList();
909-
messages.Count.ShouldBe(1);
911+
messages.Count.ShouldBe(2);
910912

911-
// Try to delete messages at non-existent positions (should not throw)
912-
await messageStore.DeleteMessages(functionId, new[] { 999L, 1000L });
913+
// Delete the first message, then delete its now-vacant position again - deleting a position
914+
// that no longer exists should be a no-op rather than throw. A real (already-deleted) position
915+
// is used rather than an arbitrary number, since positions are globally unique and a hardcoded
916+
// value could collide with another flow's message in a shared store.
917+
var deletedPosition = messages[0].Position;
918+
await messageStore.DeleteMessages(new[] { deletedPosition });
919+
await messageStore.DeleteMessages(new[] { deletedPosition });
913920

914921
var remainingMessages = (await messageStore.GetMessages(functionId)).ToList();
915922
remainingMessages.Count.ShouldBe(1);
923+
remainingMessages[0].DefaultDeserialize().ShouldBe(msg2);
916924
}
917925

918926
public abstract Task DeleteMessagesWithEmptyPositionsDoesNotThrow();
@@ -932,7 +940,7 @@ await functionStore.CreateFunction(
932940
var messageStore = functionStore.MessageStore;
933941

934942
// Should not throw when deleting with empty positions list
935-
await messageStore.DeleteMessages(functionId, Array.Empty<long>());
943+
await messageStore.DeleteMessages(Array.Empty<long>());
936944
}
937945

938946
public abstract Task DeleteMessagesOnlyAffectsSpecifiedStoredId();
@@ -974,8 +982,9 @@ await functionStore.CreateFunction(
974982
var messages1 = (await messageStore.GetMessages(id1)).ToList();
975983
messages1.Count.ShouldBe(2);
976984

977-
// Delete first message from id1 only
978-
await messageStore.DeleteMessages(id1, new[] { messages1[0].Position });
985+
// Delete first message from id1 only - the position is globally unique to that message,
986+
// so deleting it must not affect id2's messages.
987+
await messageStore.DeleteMessages(new[] { messages1[0].Position });
979988

980989
// Verify id1 has only one message left
981990
var remainingMessages1 = (await messageStore.GetMessages(id1)).ToList();

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/ControlPanelTests.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,17 +174,18 @@ protected async Task PostponingExistingFunctionFromControlPanelSucceeds(Task<IFu
174174
controlPanel.Status.ShouldBe(Status.Failed);
175175
controlPanel.FatalWorkflowException.ShouldNotBeNull();
176176

177-
await controlPanel.Postpone(new DateTime(1_000_000));
177+
var postponeUntil = DateTime.UtcNow.AddDays(1);
178+
await controlPanel.Postpone(postponeUntil);
178179

179180
await controlPanel.Refresh();
180181
controlPanel.Status.ShouldBe(Status.Postponed);
181182
controlPanel.PostponedUntil.ShouldNotBeNull();
182-
controlPanel.PostponedUntil.Value.Ticks.ShouldBe(1_000_000);
183+
controlPanel.PostponedUntil.Value.Ticks.ShouldBe(postponeUntil.Ticks);
183184

184185
var sf = await store.GetFunction(rFunc.MapToStoredId(functionId.Instance));
185186
sf.ShouldNotBeNull();
186187
sf.Status.ShouldBe(Status.Postponed);
187-
sf.Expires.ShouldBe(1_000_000);
188+
sf.Expires.ShouldBe(postponeUntil.Ticks);
188189

189190
var fwe = (FatalWorkflowException) unhandledExceptionCatcher.ThrownExceptions.Single().InnerException!;
190191
fwe.ErrorType.ShouldBe(typeof(InvalidOperationException));

Core/Cleipnir.ResilientFunctions/Domain/ExistingMessages.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public async Task Replace<T>(int position, T message, string? idempotencyKey = n
8585
/// <param name="position">Message position</param>
8686
public async Task Remove(long position)
8787
{
88-
await _messageStore.DeleteMessages(_storedId, positions: [position]);
88+
await _messageStore.DeleteMessages(positions: [position]);
8989

9090
// Invalidate cache so it will be re-fetched with correct data
9191
_receivedMessages = null;

Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ public interface IMessageStore
1919
Task AppendMessages(IReadOnlyList<StoredIdAndMessage> messages);
2020

2121
Task<bool> ReplaceMessage(StoredId storedId, long position, StoredMessage storedMessage);
22-
Task DeleteMessages(StoredId storedId, IEnumerable<long> positions);
2322

2423
/// <summary>
2524
/// Deletes the messages at the given positions regardless of which flow they belong to. Positions are

Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -613,21 +613,6 @@ public Task<bool> ReplaceMessage(StoredId storedId, long position, StoredMessage
613613
}
614614
}
615615

616-
public Task DeleteMessages(StoredId storedId, IEnumerable<long> positions)
617-
{
618-
lock (_sync)
619-
{
620-
if (!_messages.ContainsKey(storedId))
621-
return Task.CompletedTask;
622-
623-
var messages = _messages[storedId];
624-
foreach (var position in positions)
625-
messages.Remove(position);
626-
627-
return Task.CompletedTask;
628-
}
629-
}
630-
631616
public Task DeleteMessages(IReadOnlyList<long> positions)
632617
{
633618
lock (_sync)

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -103,17 +103,6 @@ public async Task<bool> ReplaceMessage(StoredId storedId, long position, StoredM
103103
return affectedRows == 1;
104104
}
105105

106-
public async Task DeleteMessages(StoredId storedId, IEnumerable<long> positions)
107-
{
108-
var positionsList = positions.ToList();
109-
if (positionsList.Count == 0)
110-
return;
111-
112-
await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString);
113-
await using var command = _sqlGenerator.DeleteMessages(storedId, positionsList).ToSqlCommand(conn);
114-
await command.ExecuteNonQueryAsync();
115-
}
116-
117106
public async Task DeleteMessages(IReadOnlyList<long> positions)
118107
{
119108
if (positions.Count == 0)

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -749,19 +749,4 @@ WHERE id IN ({storedIds.Select(id => $"'{id.AsGuid:N}'").StringJoin(", ")})
749749
.ToList());
750750
}
751751

752-
public StoreCommand DeleteMessages(StoredId storedId, IEnumerable<long> positions)
753-
{
754-
var positionsList = positions.ToList();
755-
756-
var sql = @$"
757-
DELETE FROM {tablePrefix}_messages
758-
WHERE id = ? AND position IN ({string.Join(", ", positionsList.Select(_ => "?"))})";
759-
760-
var command = StoreCommand.Create(sql);
761-
command.AddParameter(storedId.AsGuid.ToString("N"));
762-
foreach (var position in positionsList)
763-
command.AddParameter(position);
764-
765-
return command;
766-
}
767752
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -107,17 +107,6 @@ public async Task<bool> ReplaceMessage(StoredId storedId, long position, StoredM
107107
return affectedRows == 1;
108108
}
109109

110-
public async Task DeleteMessages(StoredId storedId, IEnumerable<long> positions)
111-
{
112-
var positionsArray = positions.ToArray();
113-
if (positionsArray.Length == 0)
114-
return;
115-
116-
await using var conn = await CreateConnection();
117-
await using var command = sqlGenerator.DeleteMessages(storedId, positionsArray).ToNpgsqlCommand(conn);
118-
await command.ExecuteNonQueryAsync();
119-
}
120-
121110
public async Task DeleteMessages(IReadOnlyList<long> positions)
122111
{
123112
if (positions.Count == 0)

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -694,16 +694,6 @@ public async Task<Dictionary<StoredId, IReadOnlyList<StoredMessage>>> ReadMessag
694694
.ToList());
695695
}
696696

697-
public StoreCommand DeleteMessages(StoredId storedId, IEnumerable<long> positions)
698-
{
699-
var positionsArray = positions.ToArray();
700-
var sql = @$"
701-
DELETE FROM {tablePrefix}_messages
702-
WHERE id = $1 AND position = ANY($2)";
703-
704-
return StoreCommand.Create(sql, values: [ storedId.AsGuid, positionsArray ]);
705-
}
706-
707697
private string? _setParametersSql;
708698
private string? _setParametersSqlWithoutReplica;
709699
public StoreCommand SetParameters(

0 commit comments

Comments
 (0)