Skip to content

Commit d58afff

Browse files
committed
Single guid id refactor for Postgres
1 parent 3462c7a commit d58afff

8 files changed

Lines changed: 152 additions & 103 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/Utils/EnumerableLinq.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,4 @@ namespace Cleipnir.ResilientFunctions.Tests.Utils;
55
public static class EnumerableLinq
66
{
77
public static IEnumerable<T> AsEnumerable<T>(this T t) => [t];
8-
public static string JoinStrings(this IEnumerable<string> strings, string separator) => string.Join(separator, strings);
98
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
using System.Collections.Generic;
2+
3+
namespace Cleipnir.ResilientFunctions.Helpers;
4+
5+
public static class StringExtensions
6+
{
7+
public static string JoinStrings(this IEnumerable<string> strings, string separator)
8+
=> string.Join(separator, strings);
9+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
using System.Collections.Generic;
2+
using System.Threading.Tasks;
3+
using Npgsql;
4+
5+
namespace Cleipnir.ResilientFunctions.PostgreSQL;
6+
7+
public static class DapperLight
8+
{
9+
public static async Task<List<T>> Query<T>(this NpgsqlConnection conn, string sql, params object[] parameters)
10+
{
11+
await using var cmd = new NpgsqlCommand(sql, conn);
12+
foreach (var parameter in parameters)
13+
cmd.Parameters.AddWithValue(parameter);
14+
15+
await using var reader = await cmd.ExecuteReaderAsync();
16+
var rows = new List<T>();
17+
while (await reader.ReadAsync())
18+
rows.Add((T) reader.GetValue(0));
19+
20+
return rows;
21+
}
22+
23+
public static async Task<List<Row<T1, T2>>> Query<T1, T2>(this NpgsqlConnection conn, string sql, params object[] parameters)
24+
{
25+
await using var cmd = new NpgsqlCommand(sql, conn);
26+
foreach (var parameter in parameters)
27+
cmd.Parameters.AddWithValue(parameter);
28+
29+
await using var reader = await cmd.ExecuteReaderAsync();
30+
var rows = new List<Row<T1, T2>>();
31+
while (await reader.ReadAsync())
32+
{
33+
var t1 = (T1) reader.GetValue(0);
34+
var t2 = (T2) reader.GetValue(1);
35+
rows.Add(new Row<T1, T2>(t1, t2));
36+
}
37+
38+
return rows;
39+
}
40+
41+
public static async Task<List<Row<T1, T2, T3>>> Query<T1, T2, T3>(this NpgsqlConnection conn, string sql, params object[] parameters)
42+
{
43+
await using var cmd = new NpgsqlCommand(sql, conn);
44+
foreach (var parameter in parameters)
45+
cmd.Parameters.AddWithValue(parameter);
46+
47+
await using var reader = await cmd.ExecuteReaderAsync();
48+
var rows = new List<Row<T1, T2, T3>>();
49+
while (await reader.ReadAsync())
50+
{
51+
var t1 = (T1) reader.GetValue(0);
52+
var t2 = (T2) reader.GetValue(1);
53+
var t3 = (T3) reader.GetValue(2);
54+
rows.Add(new Row<T1, T2, T3>(t1, t2, t3));
55+
}
56+
57+
return rows;
58+
}
59+
60+
public static async Task<int> Execute(this NpgsqlConnection conn, string sql, params object[] parameters)
61+
{
62+
await using var cmd = new NpgsqlCommand(sql, conn);
63+
foreach (var parameter in parameters)
64+
cmd.Parameters.AddWithValue(parameter);
65+
66+
return await cmd.ExecuteNonQueryAsync();
67+
}
68+
}
69+
70+
public record Row<T1, T2>(T1 Value1, T2 Value2);
71+
public record Row<T1, T2, T3>(T1 Value1, T2 Value2, T3 Value3);

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlCorrelationStore.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using System.Collections.Generic;
22
using System.Threading.Tasks;
3-
using Cleipnir.ResilientFunctions.Domain;
43
using Cleipnir.ResilientFunctions.Storage;
54
using Npgsql;
65

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlEffectsStore.cs

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
using System.Collections.Generic;
33
using System.Linq;
44
using System.Threading.Tasks;
5-
using Cleipnir.ResilientFunctions.Domain;
65
using Cleipnir.ResilientFunctions.Helpers;
76
using Cleipnir.ResilientFunctions.Storage;
87
using Cleipnir.ResilientFunctions.Storage.Utils;
@@ -18,14 +17,13 @@ public async Task Initialize()
1817
await using var conn = await CreateConnection();
1918
_initializeSql ??= @$"
2019
CREATE TABLE IF NOT EXISTS {tablePrefix}_effects (
21-
type INT,
22-
instance UUID,
20+
id UUID,
2321
id_hash UUID,
2422
status INT NOT NULL,
2523
result BYTEA NULL,
2624
exception TEXT NULL,
2725
effect_id TEXT NOT NULL,
28-
PRIMARY KEY (type, instance, id_hash)
26+
PRIMARY KEY (id, id_hash)
2927
);";
3028
var command = new NpgsqlCommand(_initializeSql, conn);
3129
await command.ExecuteNonQueryAsync();
@@ -46,19 +44,18 @@ public async Task SetEffectResult(StoredId storedId, StoredEffect storedEffect)
4644
await using var conn = await CreateConnection();
4745
_setEffectResultSql ??= $@"
4846
INSERT INTO {tablePrefix}_effects
49-
(type, instance, id_hash, status, result, exception, effect_id)
47+
(id, id_hash, status, result, exception, effect_id)
5048
VALUES
51-
($1, $2, $3, $4, $5, $6, $7)
52-
ON CONFLICT (type, instance, id_hash)
49+
($1, $2, $3, $4, $5, $6)
50+
ON CONFLICT (id, id_hash)
5351
DO
5452
UPDATE SET status = EXCLUDED.status, result = EXCLUDED.result, exception = EXCLUDED.exception";
5553

5654
await using var command = new NpgsqlCommand(_setEffectResultSql, conn)
5755
{
5856
Parameters =
5957
{
60-
new() {Value = storedId.Type.Value},
61-
new() {Value = storedId.Instance.Value},
58+
new() {Value = storedId.ToGuid()},
6259
new() {Value = storedEffect.StoredEffectId.Value},
6360
new() {Value = (int) storedEffect.WorkStatus},
6461
new() {Value = storedEffect.Result ?? (object) DBNull.Value},
@@ -96,14 +93,13 @@ public async Task<IReadOnlyList<StoredEffect>> GetEffectResults(StoredId storedI
9693
public async Task DeleteEffectResult(StoredId storedId, StoredEffectId effectId)
9794
{
9895
await using var conn = await CreateConnection();
99-
_deleteEffectResultSql ??= $"DELETE FROM {tablePrefix}_effects WHERE type = $1 AND instance = $2 AND id_hash = $3";
96+
_deleteEffectResultSql ??= $"DELETE FROM {tablePrefix}_effects WHERE id = $1 AND id_hash = $2";
10097

10198
await using var command = new NpgsqlCommand(_deleteEffectResultSql, conn)
10299
{
103100
Parameters =
104101
{
105-
new() {Value = storedId.Type.Value },
106-
new() {Value = storedId.Instance.Value },
102+
new() {Value = storedId.ToGuid() },
107103
new() {Value = effectId.Value },
108104
}
109105
};
@@ -116,8 +112,7 @@ public async Task DeleteEffectResults(StoredId storedId, IReadOnlyList<StoredEff
116112
await using var conn = await CreateConnection();
117113
var sql = @$"
118114
DELETE FROM {tablePrefix}_effects
119-
WHERE type = {storedId.Type.Value} AND
120-
instance = '{storedId.Instance.Value}' AND
115+
WHERE id = {storedId.ToGuid()} AND
121116
id_hash IN ({effectIds.Select(id => $"'{id.Value}'").StringJoin(", ")})";
122117

123118
await using var command = new NpgsqlCommand(sql, conn);
@@ -128,14 +123,13 @@ DELETE FROM {tablePrefix}_effects
128123
public async Task Remove(StoredId storedId)
129124
{
130125
await using var conn = await CreateConnection();
131-
_removeSql ??= $"DELETE FROM {tablePrefix}_effects WHERE type = $1 AND instance = $2";
126+
_removeSql ??= $"DELETE FROM {tablePrefix}_effects WHERE id = $1";
132127

133128
await using var command = new NpgsqlCommand(_removeSql, conn)
134129
{
135130
Parameters =
136131
{
137-
new() {Value = storedId.Type.Value },
138-
new() {Value = storedId.Instance.Value },
132+
new() {Value = storedId.ToGuid() }
139133
}
140134
};
141135

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public async Task Initialize()
8787
await using var conn = await CreateConnection();
8888
_initializeSql ??= $@"
8989
CREATE TABLE IF NOT EXISTS {_tableName} (
90+
id UUID NOT NULL PRIMARY KEY,
9091
type INT NOT NULL,
9192
instance UUID NOT NULL,
9293
epoch INT NOT NULL DEFAULT 0,
@@ -99,20 +100,19 @@ public async Task Initialize()
99100
timestamp BIGINT NOT NULL,
100101
human_instance_id TEXT NOT NULL,
101102
parent TEXT NULL,
102-
owner UUID NULL,
103-
PRIMARY KEY (type, instance)
103+
owner UUID NULL
104104
);
105105
CREATE INDEX IF NOT EXISTS idx_{_tableName}_expires
106-
ON {_tableName}(expires, type, instance)
106+
ON {_tableName}(expires, id)
107107
INCLUDE (epoch)
108108
WHERE status = {(int) Status.Executing} OR status = {(int) Status.Postponed};
109109
110110
CREATE INDEX IF NOT EXISTS idx_{_tableName}_succeeded
111-
ON {_tableName}(type, instance)
111+
ON {_tableName}(id)
112112
WHERE status = {(int) Status.Succeeded};
113113
114114
CREATE INDEX IF NOT EXISTS idx_{_tableName}_owners
115-
ON {_tableName}(owner, type, instance)
115+
ON {_tableName}(owner)
116116
WHERE status = {(int) Status.Executing};
117117
";
118118

@@ -216,9 +216,9 @@ public async Task BulkScheduleFunctions(IEnumerable<IdWithParam> functionsWithPa
216216
{
217217
_bulkScheduleFunctionsSql ??= @$"
218218
INSERT INTO {_tableName}
219-
(type, instance, status, param_json, expires, timestamp, human_instance_id, parent)
219+
(id, type, instance, status, param_json, expires, timestamp, human_instance_id, parent)
220220
VALUES
221-
($1, $2, {(int) Status.Postponed}, $3, 0, 0, $4, $5)
221+
($1, $2, $3, {(int) Status.Postponed}, $4, 0, 0, $5, $6)
222222
ON CONFLICT DO NOTHING;";
223223

224224
await using var conn = await CreateConnection();
@@ -232,6 +232,7 @@ INSERT INTO {_tableName}
232232
{
233233
Parameters =
234234
{
235+
new() { Value = idWithParam.StoredId.ToGuid() },
235236
new() { Value = idWithParam.StoredId.Type.Value },
236237
new() { Value = idWithParam.StoredId.Instance.Value },
237238
new() { Value = idWithParam.Param == null ? DBNull.Value : idWithParam.Param },

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs

Lines changed: 24 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,12 @@ public async Task Initialize()
2828
await using var conn = await CreateConnection();
2929
_initializeSql ??= @$"
3030
CREATE TABLE IF NOT EXISTS {_tablePrefix}_messages (
31-
type INT,
32-
instance UUID,
31+
id UUID,
3332
position INT NOT NULL,
3433
message_json BYTEA NOT NULL,
3534
message_type BYTEA NOT NULL,
3635
idempotency_key VARCHAR(255),
37-
PRIMARY KEY (type, instance, position)
36+
PRIMARY KEY (id, position)
3837
);";
3938

4039
var command = new NpgsqlCommand(_initializeSql, conn);
@@ -61,18 +60,17 @@ public async Task TruncateTable()
6160
{ //append Message to message stream sql
6261
_appendMessageSql ??= @$"
6362
INSERT INTO {_tablePrefix}_messages
64-
(type, instance, position, message_json, message_type, idempotency_key)
63+
(id, position, message_json, message_type, idempotency_key)
6564
VALUES (
66-
$1, $2,
67-
(SELECT COALESCE(MAX(position), -1) + 1 FROM {_tablePrefix}_messages WHERE type = $1 AND instance = $2),
68-
$3, $4, $5
65+
$1,
66+
(SELECT COALESCE(MAX(position), -1) + 1 FROM {_tablePrefix}_messages WHERE id = $1),
67+
$2, $3, $4
6968
);";
7069
var command = new NpgsqlBatchCommand(_appendMessageSql)
7170
{
7271
Parameters =
7372
{
74-
new() {Value = storedId.Type.Value},
75-
new() {Value = storedId.Instance.Value},
73+
new() {Value = storedId.ToGuid()},
7674
new() {Value = messageJson},
7775
new() {Value = messageType},
7876
new() {Value = idempotencyKey ?? (object) DBNull.Value}
@@ -85,13 +83,12 @@ INSERT INTO {_tablePrefix}_messages
8583
_getFunctionStatusInAppendMessageSql ??= @$"
8684
SELECT epoch, status
8785
FROM {_tablePrefix}
88-
WHERE type = $1 AND instance = $2;";
86+
WHERE id = $1;";
8987

9088
var command = new NpgsqlBatchCommand(_getFunctionStatusInAppendMessageSql)
9189
{
9290
Parameters = {
93-
new() {Value = storedId.Type.Value},
94-
new() {Value = storedId.Instance.Value}
91+
new() {Value = storedId.ToGuid()}
9592
}
9693
};
9794
batch.BatchCommands.Add(command);
@@ -173,7 +170,7 @@ public async Task<bool> ReplaceMessage(StoredId storedId, int position, StoredMe
173170
_replaceMessageSql ??= @$"
174171
UPDATE {_tablePrefix}_messages
175172
SET message_json = $1, message_type = $2, idempotency_key = $3
176-
WHERE type = $4 AND instance = $5 AND position = $6";
173+
WHERE id = $4 AND position = $5";
177174

178175
var (messageJson, messageType, idempotencyKey) = storedMessage;
179176
var command = new NpgsqlCommand(_replaceMessageSql, conn)
@@ -183,8 +180,7 @@ public async Task<bool> ReplaceMessage(StoredId storedId, int position, StoredMe
183180
new() {Value = messageJson},
184181
new() {Value = messageType},
185182
new() {Value = idempotencyKey ?? (object) DBNull.Value},
186-
new() {Value = storedId.Type.Value},
187-
new() {Value = storedId.Instance.Value},
183+
new() {Value = storedId.ToGuid()},
188184
new() {Value = position},
189185
}
190186
};
@@ -199,13 +195,12 @@ public async Task Truncate(StoredId storedId)
199195
await using var conn = await CreateConnection();
200196
_truncateFunctionSql ??= @$"
201197
DELETE FROM {_tablePrefix}_messages
202-
WHERE type = $1 AND instance = $2;";
198+
WHERE id = $1;";
203199
await using var command = new NpgsqlCommand(_truncateFunctionSql, conn)
204200
{
205201
Parameters =
206202
{
207-
new() {Value = storedId.Type.Value},
208-
new() {Value = storedId.Instance.Value}
203+
new() {Value = storedId.ToGuid()}
209204
}
210205
};
211206
await command.ExecuteNonQueryAsync();
@@ -225,16 +220,17 @@ public async Task<IDictionary<StoredId, int>> GetMaxPositions(IReadOnlyList<Stor
225220
{
226221
if (storedIds.Count == 0)
227222
return new Dictionary<StoredId, int>();
228-
229-
var predicates = storedIds
230-
.GroupBy(id => id.Type.Value, id => id.Instance.Value)
231-
.Select(g => $"type = {g.Key} AND instance IN ({g.Select(instance => $"'{instance}'").StringJoin(", ")})")
232-
.StringJoin(" OR " + Environment.NewLine);
223+
224+
var idMap = storedIds.ToDictionary(id => id.ToGuid(), id => id);
225+
226+
var inClause = storedIds
227+
.Select(id => $"'{id.ToGuid()}'")
228+
.StringJoin(", ");
233229

234230
var sql = @$"
235-
SELECT type, instance, position
231+
SELECT id, position
236232
FROM {tablePrefix}_messages
237-
WHERE {predicates};";
233+
WHERE id IN ({inClause});";
238234

239235
await using var conn = await CreateConnection();
240236
await using var command = new NpgsqlCommand(sql, conn);
@@ -246,11 +242,9 @@ public async Task<IDictionary<StoredId, int>> GetMaxPositions(IReadOnlyList<Stor
246242
await using var reader = await command.ExecuteReaderAsync();
247243
while (await reader.ReadAsync())
248244
{
249-
var type = reader.GetInt32(0).ToStoredType();
250-
var instance = reader.GetGuid(1).ToStoredInstance();
251-
var storedId = new StoredId(type, instance);
252-
var position = reader.GetInt32(2);
253-
positions[storedId] = position;
245+
var id = reader.GetGuid(0);
246+
var position = reader.GetInt32(1);
247+
positions[idMap[id]] = position;
254248
}
255249

256250
return positions;

0 commit comments

Comments
 (0)