diff --git a/libraries/src/AWS.Lambda.Powertools.Idempotency/Idempotency.cs b/libraries/src/AWS.Lambda.Powertools.Idempotency/Idempotency.cs index 196767b5c..e35551577 100644 --- a/libraries/src/AWS.Lambda.Powertools.Idempotency/Idempotency.cs +++ b/libraries/src/AWS.Lambda.Powertools.Idempotency/Idempotency.cs @@ -1,5 +1,6 @@ using System; using System.Text.Json.Serialization; +using System.Threading; using Amazon.Lambda.Core; using AWS.Lambda.Powertools.Common; using AWS.Lambda.Powertools.Idempotency.Internal.Serializers; @@ -16,6 +17,12 @@ namespace AWS.Lambda.Powertools.Idempotency; /// public sealed class Idempotency { + /// + /// AsyncLocal storage for per-invocation LambdaContext. + /// This ensures each concurrent Lambda invocation has its own isolated context. + /// + private static readonly AsyncLocal _lambdaContext = new(); + /// /// The general configurations for the idempotency /// @@ -66,18 +73,25 @@ public static void Configure(Action configurationAction) } /// - /// Holds ILambdaContext + /// Holds ILambdaContext using AsyncLocal for per-invocation isolation. + /// Each concurrent Lambda invocation will have its own isolated context. /// - public ILambdaContext LambdaContext { get; private set; } + public ILambdaContext LambdaContext + { + get => _lambdaContext.Value; + private set => _lambdaContext.Value = value; + } /// /// Can be used in a method which is not the handler to capture the Lambda context, /// to calculate the remaining time before the invocation times out. + /// This method is thread-safe and stores the context in AsyncLocal storage, + /// ensuring isolation between concurrent Lambda invocations. /// - /// + /// The Lambda context for the current invocation public static void RegisterLambdaContext(ILambdaContext context) { - Instance.LambdaContext = context; + _lambdaContext.Value = context; } /// diff --git a/libraries/src/AWS.Lambda.Powertools.Idempotency/InternalsVisibleTo.cs b/libraries/src/AWS.Lambda.Powertools.Idempotency/InternalsVisibleTo.cs index 3d73602cf..e0ccd4b0a 100644 --- a/libraries/src/AWS.Lambda.Powertools.Idempotency/InternalsVisibleTo.cs +++ b/libraries/src/AWS.Lambda.Powertools.Idempotency/InternalsVisibleTo.cs @@ -15,4 +15,5 @@ using System.Runtime.CompilerServices; -[assembly: InternalsVisibleTo("AWS.Lambda.Powertools.Idempotency.Tests")] \ No newline at end of file +[assembly: InternalsVisibleTo("AWS.Lambda.Powertools.Idempotency.Tests")] +[assembly: InternalsVisibleTo("AWS.Lambda.Powertools.ConcurrencyTests")] \ No newline at end of file diff --git a/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/BasePersistenceStore.cs b/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/BasePersistenceStore.cs index 603f981e9..14918c767 100644 --- a/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/BasePersistenceStore.cs +++ b/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/BasePersistenceStore.cs @@ -18,6 +18,16 @@ namespace AWS.Lambda.Powertools.Idempotency.Persistence; /// public abstract class BasePersistenceStore : IPersistenceStore { + /// + /// Lock object for thread-safe configuration + /// + private readonly object _configureLock = new object(); + + /// + /// Flag indicating whether the store has been configured + /// + private volatile bool _isConfigured; + /// /// Idempotency Options /// @@ -39,50 +49,95 @@ public abstract class BasePersistenceStore : IPersistenceStore private LRUCache _cache = null!; /// - /// Initialize the base persistence layer from the configuration settings + /// Initialize the base persistence layer from the configuration settings. + /// This method is thread-safe and idempotent - multiple calls with the same parameters are safe. /// /// Idempotency configuration settings /// The name of the function being decorated /// public void Configure(IdempotencyOptions idempotencyOptions, string functionName, string keyPrefix) { - if (!string.IsNullOrEmpty(keyPrefix)) + // Fast path - already configured + if (_isConfigured) return; + + lock (_configureLock) { - _functionName = keyPrefix; - } - else - { - var funcEnv = Environment.GetEnvironmentVariable(Constants.LambdaFunctionNameEnv); - - _functionName = funcEnv ?? "testFunction"; - if (!string.IsNullOrWhiteSpace(functionName)) + // Double-check pattern + if (_isConfigured) return; + + if (!string.IsNullOrEmpty(keyPrefix)) { - _functionName += "." + functionName; + _functionName = keyPrefix; } - } + else + { + var funcEnv = Environment.GetEnvironmentVariable(Constants.LambdaFunctionNameEnv); - _idempotencyOptions = idempotencyOptions; + _functionName = funcEnv ?? "testFunction"; + if (!string.IsNullOrWhiteSpace(functionName)) + { + _functionName += "." + functionName; + } + } - if (!string.IsNullOrWhiteSpace(_idempotencyOptions.PayloadValidationJmesPath)) - { - PayloadValidationEnabled = true; - } + _idempotencyOptions = idempotencyOptions; - var useLocalCache = _idempotencyOptions.UseLocalCache; - if (useLocalCache) - { - _cache = new LRUCache(_idempotencyOptions.LocalCacheMaxItems); + if (!string.IsNullOrWhiteSpace(_idempotencyOptions.PayloadValidationJmesPath)) + { + PayloadValidationEnabled = true; + } + + var useLocalCache = _idempotencyOptions.UseLocalCache; + if (useLocalCache) + { + _cache = new LRUCache(_idempotencyOptions.LocalCacheMaxItems); + } + + _isConfigured = true; } } /// - /// For test purpose only (adding a cache to mock) + /// For test purpose only (adding a cache to mock). + /// This method is thread-safe and idempotent. /// internal void Configure(IdempotencyOptions options, string functionName, string keyPrefix, LRUCache cache) { - Configure(options, functionName, keyPrefix); - _cache = cache; + // Fast path - already configured + if (_isConfigured) return; + + lock (_configureLock) + { + // Double-check pattern + if (_isConfigured) return; + + if (!string.IsNullOrEmpty(keyPrefix)) + { + _functionName = keyPrefix; + } + else + { + var funcEnv = Environment.GetEnvironmentVariable(Constants.LambdaFunctionNameEnv); + + _functionName = funcEnv ?? "testFunction"; + if (!string.IsNullOrWhiteSpace(functionName)) + { + _functionName += "." + functionName; + } + } + + _idempotencyOptions = options; + + if (!string.IsNullOrWhiteSpace(_idempotencyOptions.PayloadValidationJmesPath)) + { + PayloadValidationEnabled = true; + } + + _cache = cache; + + _isConfigured = true; + } } /// diff --git a/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/AWS.Lambda.Powertools.ConcurrencyTests.csproj b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/AWS.Lambda.Powertools.ConcurrencyTests.csproj index 50d191a83..63fc33c03 100644 --- a/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/AWS.Lambda.Powertools.ConcurrencyTests.csproj +++ b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/AWS.Lambda.Powertools.ConcurrencyTests.csproj @@ -21,9 +21,11 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive + + diff --git a/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/ConfigurationThreadSafetyTests.cs b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/ConfigurationThreadSafetyTests.cs new file mode 100644 index 000000000..0fc92cc54 --- /dev/null +++ b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/ConfigurationThreadSafetyTests.cs @@ -0,0 +1,257 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +using Xunit; + +namespace AWS.Lambda.Powertools.ConcurrencyTests.Idempotency; + +/// +/// Tests for validating thread-safe configuration in BasePersistenceStore +/// under concurrent execution scenarios. +/// +/// These tests verify that when multiple threads attempt to configure +/// the persistence store simultaneously, the configuration completes +/// without exceptions and the resulting state is consistent. +/// +[Collection("Idempotency Configuration Tests")] +public class ConfigurationThreadSafetyTests +{ + #region Helper Classes + + private class ConfigurationResult + { + public int ThreadIndex { get; set; } + public bool Success { get; set; } + public bool ExceptionThrown { get; set; } + public string? ExceptionMessage { get; set; } + public string? ExceptionType { get; set; } + } + + #endregion + + #region Property 1: Configuration Thread Safety + + /// + /// **Feature: idempotency-thread-safety, Property 1: Configuration Thread Safety** + /// *For any* number of concurrent invocations calling Configure() simultaneously, + /// the configuration should complete without exceptions and the resulting configuration + /// should be consistent (not corrupted by interleaved operations). + /// **Validates: Requirements 1.1** + /// + [Theory] + [InlineData(2)] + [InlineData(5)] + [InlineData(8)] + [InlineData(10)] + public void ConfigurationThreadSafety_ConcurrentConfigure_ShouldCompleteWithoutExceptions(int concurrencyLevel) + { + // Create a fresh persistence store for each test run + var store = new ThreadSafeInMemoryPersistenceStore(); + var results = new ConfigurationResult[concurrencyLevel]; + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + // Create idempotency options + var options = new AWS.Lambda.Powertools.Idempotency.IdempotencyOptionsBuilder() + .WithEventKeyJmesPath("id") + .WithExpiration(TimeSpan.FromMinutes(5)) + .Build(); + + for (int i = 0; i < concurrencyLevel; i++) + { + int threadIndex = i; + tasks[i] = Task.Run(() => + { + var result = new ConfigurationResult { ThreadIndex = threadIndex }; + + try + { + // Wait for all threads to be ready + barrier.SignalAndWait(); + + // All threads attempt to configure simultaneously + store.Configure(options, $"Function_{threadIndex}", null); + + result.Success = true; + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionType = ex.GetType().Name; + result.ExceptionMessage = ex.Message; + } + + results[threadIndex] = result; + }); + } + + Task.WaitAll(tasks); + + // All threads should complete without exceptions + Assert.All(results, r => Assert.True(r.Success, + $"Thread {r.ThreadIndex} failed: {r.ExceptionType}: {r.ExceptionMessage}")); + Assert.All(results, r => Assert.False(r.ExceptionThrown, + $"Thread {r.ThreadIndex} threw exception: {r.ExceptionType}: {r.ExceptionMessage}")); + } + + /// + /// **Feature: idempotency-thread-safety, Property 1b: Configuration Idempotency** + /// *For any* persistence store, calling Configure() multiple times with the same + /// parameters should be safe and idempotent (subsequent calls are no-ops). + /// **Validates: Requirements 1.1** + /// + [Theory] + [InlineData(2)] + [InlineData(3)] + [InlineData(5)] + public void ConfigurationIdempotency_MultipleCalls_ShouldBeNoOp(int numberOfCalls) + { + var store = new ThreadSafeInMemoryPersistenceStore(); + var options = new AWS.Lambda.Powertools.Idempotency.IdempotencyOptionsBuilder() + .WithEventKeyJmesPath("id") + .Build(); + + var exceptions = new List(); + + // Call Configure multiple times sequentially + for (int i = 0; i < numberOfCalls; i++) + { + try + { + store.Configure(options, "TestFunction", null); + } + catch (Exception ex) + { + exceptions.Add(ex); + } + } + + Assert.Empty(exceptions); + } + + /// + /// **Feature: idempotency-thread-safety, Property 1c: Configuration Thread Safety with Different Parameters** + /// *For any* set of concurrent threads calling Configure() with different function names, + /// the first configuration should win and subsequent calls should be no-ops. + /// **Validates: Requirements 1.1, 1.3** + /// + [Theory] + [InlineData(2)] + [InlineData(5)] + [InlineData(8)] + public void ConfigurationThreadSafety_DifferentParameters_FirstConfigurationWins(int concurrencyLevel) + { + var store = new ThreadSafeInMemoryPersistenceStore(); + var results = new ConfigurationResult[concurrencyLevel]; + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int threadIndex = i; + var options = new AWS.Lambda.Powertools.Idempotency.IdempotencyOptionsBuilder() + .WithEventKeyJmesPath($"id_{threadIndex}") + .Build(); + + tasks[i] = Task.Run(() => + { + var result = new ConfigurationResult { ThreadIndex = threadIndex }; + + try + { + barrier.SignalAndWait(); + + // Each thread tries to configure with different parameters + store.Configure(options, $"Function_{threadIndex}", null); + + result.Success = true; + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionType = ex.GetType().Name; + result.ExceptionMessage = ex.Message; + } + + results[threadIndex] = result; + }); + } + + Task.WaitAll(tasks); + + // All threads should complete without exceptions (even if their config was ignored) + Assert.All(results, r => Assert.True(r.Success, + $"Thread {r.ThreadIndex} failed: {r.ExceptionType}: {r.ExceptionMessage}")); + Assert.All(results, r => Assert.False(r.ExceptionThrown, + $"Thread {r.ThreadIndex} threw exception: {r.ExceptionType}: {r.ExceptionMessage}")); + } + + #endregion + + #region Additional Concurrency Tests + + /// + /// Stress test for configuration thread safety with high concurrency. + /// This test uses a higher number of threads to stress test the locking mechanism. + /// + [Theory] + [InlineData(20)] + [InlineData(30)] + public void ConfigurationThreadSafety_HighConcurrency_ShouldCompleteWithoutExceptions(int concurrencyLevel) + { + var store = new ThreadSafeInMemoryPersistenceStore(); + var results = new ConfigurationResult[concurrencyLevel]; + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + var options = new AWS.Lambda.Powertools.Idempotency.IdempotencyOptionsBuilder() + .WithEventKeyJmesPath("id") + .Build(); + + for (int i = 0; i < concurrencyLevel; i++) + { + int threadIndex = i; + tasks[i] = Task.Run(() => + { + var result = new ConfigurationResult { ThreadIndex = threadIndex }; + + try + { + barrier.SignalAndWait(); + store.Configure(options, $"Function_{threadIndex}", null); + result.Success = true; + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionType = ex.GetType().Name; + result.ExceptionMessage = ex.Message; + } + + results[threadIndex] = result; + }); + } + + Task.WaitAll(tasks); + + // Verify all threads completed without exceptions + Assert.All(results, r => Assert.True(r.Success, + $"Thread {r.ThreadIndex} failed: {r.ExceptionType}: {r.ExceptionMessage}")); + Assert.All(results, r => Assert.False(r.ExceptionThrown, + $"Thread {r.ThreadIndex} threw exception: {r.ExceptionType}: {r.ExceptionMessage}")); + } + + #endregion +} diff --git a/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/IdempotencyAsyncContextTests.cs b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/IdempotencyAsyncContextTests.cs new file mode 100644 index 000000000..3dc60685d --- /dev/null +++ b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/IdempotencyAsyncContextTests.cs @@ -0,0 +1,476 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +using System.Collections.Concurrent; +using System.Text.Json; +using Amazon.Lambda.TestUtilities; +using AWS.Lambda.Powertools.Idempotency.Internal; +using Xunit; +using IdempotencyLib = AWS.Lambda.Powertools.Idempotency; + +namespace AWS.Lambda.Powertools.ConcurrencyTests.Idempotency; + +/// +/// Tests for validating async context preservation in Powertools Idempotency. +/// **Feature: idempotency-thread-safety, Property 7: Async Context Preservation** +/// **Validates: Requirements 6.1, 6.2, 6.3** +/// +[Collection("Idempotency Async Context Tests")] +public class IdempotencyAsyncContextTests : IDisposable +{ + private readonly ThreadSafeInMemoryPersistenceStore _store; + + public IdempotencyAsyncContextTests() + { + _store = new ThreadSafeInMemoryPersistenceStore(); + IdempotencyLib.Idempotency.Configure(builder => builder + .WithPersistenceStore(_store) + .WithOptions(opt => opt + .WithEventKeyJmesPath("id") + .WithExpiration(TimeSpan.FromMinutes(5)))); + } + + public void Dispose() + { + _store.Clear(); + _store.ResetCounters(); + } + + private class AsyncContextResult + { + public int InvocationIndex { get; set; } + public string InvocationId { get; set; } = string.Empty; + public string ExpectedFunctionName { get; set; } = string.Empty; + public List FunctionNamesAtAwaitPoints { get; set; } = new(); + public bool ContextPreservedAcrossAllAwaits { get; set; } + public bool ExceptionThrown { get; set; } + public string? ExceptionType { get; set; } + public string? ExceptionMessage { get; set; } + } + + private class HandlerAsyncResult + { + public int InvocationIndex { get; set; } + public string InvocationId { get; set; } = string.Empty; + public string ExpectedResult { get; set; } = string.Empty; + public string? ActualResult { get; set; } + public bool ResultMatched { get; set; } + public List ContextsAtAwaitPoints { get; set; } = new(); + public bool ContextPreserved { get; set; } + public bool ExceptionThrown { get; set; } + public string? ExceptionType { get; set; } + public string? ExceptionMessage { get; set; } + } + + private class TestRequest + { + [System.Text.Json.Serialization.JsonPropertyName("id")] + public string Id { get; set; } = string.Empty; + [System.Text.Json.Serialization.JsonPropertyName("data")] + public string Data { get; set; } = string.Empty; + } + + private static JsonDocument CreatePayload(string id, string data) + { + var request = new TestRequest { Id = id, Data = data }; + return JsonDocument.Parse(JsonSerializer.Serialize(request)); + } + + private static TestLambdaContext CreateTestContext(string functionName, string requestId) + { + return new TestLambdaContext + { + FunctionName = functionName, + AwsRequestId = requestId, + RemainingTime = TimeSpan.FromMinutes(5) + }; + } + + /// + /// **Feature: idempotency-thread-safety, Property 7: Async Context Preservation** + /// **Validates: Requirements 6.1, 6.2, 6.3** + /// + [Theory] + [InlineData(1)] + [InlineData(3)] + [InlineData(5)] + public void AsyncContextPreservation_AcrossAwaitPoints_ShouldPreserveContext(int awaitCount) + { + var invocationId = Guid.NewGuid().ToString("N"); + var expectedFunctionName = $"AsyncFunction_{invocationId}"; + var functionsAtAwaitPoints = new List(); + + var context = CreateTestContext(expectedFunctionName, invocationId); + IdempotencyLib.Idempotency.RegisterLambdaContext(context); + + var contextBefore = IdempotencyLib.Idempotency.Instance.LambdaContext; + Assert.Equal(expectedFunctionName, contextBefore?.FunctionName); + + var task = Task.Run(async () => + { + for (int i = 0; i < awaitCount; i++) + { + await Task.Delay(Random.Shared.Next(1, 10)); + var currentContext = IdempotencyLib.Idempotency.Instance.LambdaContext; + lock (functionsAtAwaitPoints) + { + functionsAtAwaitPoints.Add(currentContext?.FunctionName ?? "null"); + } + } + }); + + task.Wait(); + + Assert.All(functionsAtAwaitPoints, fn => Assert.Equal(expectedFunctionName, fn)); + } + + [Theory] + [InlineData(2)] + [InlineData(5)] + [InlineData(10)] + public void AsyncContextPreservation_ConcurrentAsyncInvocations_ShouldMaintainIsolation(int concurrencyLevel) + { + var results = new ConcurrentBag(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int invocationIndex = i; + tasks[i] = Task.Run(async () => + { + var invocationId = Guid.NewGuid().ToString("N"); + var expectedFunctionName = $"ConcurrentAsync_{invocationIndex}_{invocationId}"; + + var result = new AsyncContextResult + { + InvocationIndex = invocationIndex, + InvocationId = invocationId, + ExpectedFunctionName = expectedFunctionName + }; + + try + { + var context = CreateTestContext(expectedFunctionName, invocationId); + + barrier.SignalAndWait(); + + IdempotencyLib.Idempotency.RegisterLambdaContext(context); + + for (int awaitPoint = 0; awaitPoint < 3; awaitPoint++) + { + await Task.Delay(Random.Shared.Next(5, 20)); + var currentContext = IdempotencyLib.Idempotency.Instance.LambdaContext; + result.FunctionNamesAtAwaitPoints.Add(currentContext?.FunctionName ?? "null"); + } + + result.ContextPreservedAcrossAllAwaits = + result.FunctionNamesAtAwaitPoints.All(fn => fn == expectedFunctionName); + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionType = ex.GetType().Name; + result.ExceptionMessage = ex.Message; + } + + results.Add(result); + }); + } + + Task.WaitAll(tasks); + + Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage)); + Assert.All(results, r => Assert.True(r.ContextPreservedAcrossAllAwaits, + $"Invocation {r.InvocationIndex} did not preserve context. Expected '{r.ExpectedFunctionName}', got: [{string.Join(", ", r.FunctionNamesAtAwaitPoints)}]")); + } + + [Theory] + [InlineData(2)] + [InlineData(5)] + [InlineData(8)] + public void AsyncContextPreservation_InHandler_ShouldPreserveContext(int concurrencyLevel) + { + var results = new ConcurrentBag(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int invocationIndex = i; + tasks[i] = Task.Run(async () => + { + var invocationId = Guid.NewGuid().ToString("N"); + var expectedFunctionName = $"HandlerAsync_{invocationIndex}_{invocationId}"; + var expectedResult = $"result_{invocationId}"; + + var result = new HandlerAsyncResult + { + InvocationIndex = invocationIndex, + InvocationId = invocationId, + ExpectedResult = expectedResult + }; + + try + { + var payload = CreatePayload(invocationId, $"async_data_{invocationIndex}"); + var context = CreateTestContext(expectedFunctionName, invocationId); + + IdempotencyLib.Idempotency.RegisterLambdaContext(context); + + Func> targetFunc = async () => + { + var ctx1 = IdempotencyLib.Idempotency.Instance.LambdaContext; + result.ContextsAtAwaitPoints.Add(ctx1?.FunctionName ?? "null"); + + await Task.Delay(Random.Shared.Next(5, 15)); + + var ctx2 = IdempotencyLib.Idempotency.Instance.LambdaContext; + result.ContextsAtAwaitPoints.Add(ctx2?.FunctionName ?? "null"); + + await Task.Delay(Random.Shared.Next(5, 15)); + + var ctx3 = IdempotencyLib.Idempotency.Instance.LambdaContext; + result.ContextsAtAwaitPoints.Add(ctx3?.FunctionName ?? "null"); + + return expectedResult; + }; + + barrier.SignalAndWait(); + + var handler = new IdempotencyAspectHandler( + targetFunc, $"HandlerAsyncFunction_{invocationIndex}", null, payload, context); + + result.ActualResult = await handler.Handle(); + result.ResultMatched = result.ActualResult == expectedResult; + result.ContextPreserved = result.ContextsAtAwaitPoints.All(fn => fn == expectedFunctionName); + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionType = ex.GetType().Name; + result.ExceptionMessage = ex.Message; + } + + results.Add(result); + }); + } + + Task.WaitAll(tasks); + + Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage)); + Assert.All(results, r => Assert.True(r.ResultMatched, $"Expected '{r.ExpectedResult}' but got '{r.ActualResult}'")); + Assert.All(results, r => Assert.True(r.ContextPreserved, + $"Context not preserved: [{string.Join(", ", r.ContextsAtAwaitPoints)}]")); + } + + [Theory] + [InlineData(2)] + [InlineData(5)] + [InlineData(10)] + public async Task AsyncContext_WithConfigureAwaitFalse_ShouldPreserveContext(int concurrencyLevel) + { + var results = new ConcurrentBag(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int invocationIndex = i; + tasks[i] = Task.Run(async () => + { + var invocationId = Guid.NewGuid().ToString("N"); + var expectedFunctionName = $"ConfigureAwaitFalse_{invocationIndex}_{invocationId}"; + + var result = new AsyncContextResult + { + InvocationIndex = invocationIndex, + InvocationId = invocationId, + ExpectedFunctionName = expectedFunctionName + }; + + try + { + var context = CreateTestContext(expectedFunctionName, invocationId); + + barrier.SignalAndWait(); + + IdempotencyLib.Idempotency.RegisterLambdaContext(context); + + var ctxBefore = IdempotencyLib.Idempotency.Instance.LambdaContext; + result.FunctionNamesAtAwaitPoints.Add(ctxBefore?.FunctionName ?? "null"); + + await Task.Delay(Random.Shared.Next(10, 30)).ConfigureAwait(false); + + var ctxAfter = IdempotencyLib.Idempotency.Instance.LambdaContext; + result.FunctionNamesAtAwaitPoints.Add(ctxAfter?.FunctionName ?? "null"); + + await Task.Delay(Random.Shared.Next(10, 30)).ConfigureAwait(false); + + var ctxFinal = IdempotencyLib.Idempotency.Instance.LambdaContext; + result.FunctionNamesAtAwaitPoints.Add(ctxFinal?.FunctionName ?? "null"); + + result.ContextPreservedAcrossAllAwaits = + result.FunctionNamesAtAwaitPoints.All(fn => fn == expectedFunctionName); + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionType = ex.GetType().Name; + result.ExceptionMessage = ex.Message; + } + + results.Add(result); + }); + } + + await Task.WhenAll(tasks); + + Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage)); + Assert.All(results, r => Assert.True(r.ContextPreservedAcrossAllAwaits, + $"Invocation {r.InvocationIndex} did not preserve context. Expected '{r.ExpectedFunctionName}', got: [{string.Join(", ", r.FunctionNamesAtAwaitPoints)}]")); + } + + [Theory] + [InlineData(2, 2)] + [InlineData(5, 3)] + [InlineData(10, 2)] + public async Task AsyncContext_NestedAsyncOperations_ShouldPreserveContext(int concurrencyLevel, int nestingDepth) + { + var results = new ConcurrentBag(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int invocationIndex = i; + tasks[i] = Task.Run(async () => + { + var invocationId = Guid.NewGuid().ToString("N"); + var expectedFunctionName = $"NestedAsync_{invocationIndex}_{invocationId}"; + + var result = new AsyncContextResult + { + InvocationIndex = invocationIndex, + InvocationId = invocationId, + ExpectedFunctionName = expectedFunctionName + }; + + try + { + var context = CreateTestContext(expectedFunctionName, invocationId); + + barrier.SignalAndWait(); + + IdempotencyLib.Idempotency.RegisterLambdaContext(context); + + async Task NestedAsync(int depth) + { + if (depth <= 0) return; + + await Task.Delay(Random.Shared.Next(1, 5)); + + var currentContext = IdempotencyLib.Idempotency.Instance.LambdaContext; + lock (result.FunctionNamesAtAwaitPoints) + { + result.FunctionNamesAtAwaitPoints.Add(currentContext?.FunctionName ?? "null"); + } + + await NestedAsync(depth - 1); + } + + await NestedAsync(nestingDepth); + + result.ContextPreservedAcrossAllAwaits = + result.FunctionNamesAtAwaitPoints.All(fn => fn == expectedFunctionName); + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionType = ex.GetType().Name; + result.ExceptionMessage = ex.Message; + } + + results.Add(result); + }); + } + + await Task.WhenAll(tasks); + + Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage)); + Assert.All(results, r => Assert.True(r.ContextPreservedAcrossAllAwaits, + $"Invocation {r.InvocationIndex} did not preserve context in nested async. Expected '{r.ExpectedFunctionName}', got: [{string.Join(", ", r.FunctionNamesAtAwaitPoints)}]")); + } + + [Theory] + [InlineData(20)] + [InlineData(30)] + public async Task AsyncContext_HighConcurrency_ShouldPreserveContext(int concurrencyLevel) + { + var results = new ConcurrentBag(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int invocationIndex = i; + tasks[i] = Task.Run(async () => + { + var invocationId = Guid.NewGuid().ToString("N"); + var expectedFunctionName = $"StressAsync_{invocationIndex}_{invocationId}"; + + var result = new AsyncContextResult + { + InvocationIndex = invocationIndex, + InvocationId = invocationId, + ExpectedFunctionName = expectedFunctionName + }; + + try + { + var context = CreateTestContext(expectedFunctionName, invocationId); + + barrier.SignalAndWait(); + + IdempotencyLib.Idempotency.RegisterLambdaContext(context); + + for (int awaitPoint = 0; awaitPoint < 5; awaitPoint++) + { + await Task.Delay(Random.Shared.Next(1, 10)); + var currentContext = IdempotencyLib.Idempotency.Instance.LambdaContext; + result.FunctionNamesAtAwaitPoints.Add(currentContext?.FunctionName ?? "null"); + } + + result.ContextPreservedAcrossAllAwaits = + result.FunctionNamesAtAwaitPoints.All(fn => fn == expectedFunctionName); + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionType = ex.GetType().Name; + result.ExceptionMessage = ex.Message; + } + + results.Add(result); + }); + } + + await Task.WhenAll(tasks); + + Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage)); + Assert.All(results, r => Assert.True(r.ContextPreservedAcrossAllAwaits, + $"Invocation {r.InvocationIndex} did not preserve context under stress. Expected '{r.ExpectedFunctionName}', got: [{string.Join(", ", r.FunctionNamesAtAwaitPoints)}]")); + } +} diff --git a/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/IdempotencyHandlerIsolationTests.cs b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/IdempotencyHandlerIsolationTests.cs new file mode 100644 index 000000000..efab09a3d --- /dev/null +++ b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/IdempotencyHandlerIsolationTests.cs @@ -0,0 +1,337 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +using System.Collections.Concurrent; +using System.Text.Json; +using Amazon.Lambda.TestUtilities; +using AWS.Lambda.Powertools.Idempotency.Internal; +using Xunit; +using IdempotencyLib = AWS.Lambda.Powertools.Idempotency; + +namespace AWS.Lambda.Powertools.ConcurrencyTests.Idempotency; + +/// +/// Tests for validating IdempotencyAspectHandler isolation under concurrent execution scenarios. +/// **Feature: idempotency-thread-safety, Property 5: IdempotencyAspectHandler Isolation** +/// **Validates: Requirements 4.1, 4.2, 4.3, 4.4** +/// +[Collection("Idempotency Handler Isolation Tests")] +public class IdempotencyHandlerIsolationTests : IDisposable +{ + private readonly ThreadSafeInMemoryPersistenceStore _store; + + public IdempotencyHandlerIsolationTests() + { + _store = new ThreadSafeInMemoryPersistenceStore(); + IdempotencyLib.Idempotency.Configure(builder => builder + .WithPersistenceStore(_store) + .WithOptions(opt => opt + .WithEventKeyJmesPath("id") + .WithExpiration(TimeSpan.FromMinutes(5)))); + } + + public void Dispose() + { + _store.Clear(); + _store.ResetCounters(); + } + + private class HandlerInvocationResult + { + public int InvocationIndex { get; set; } + public string InvocationId { get; set; } = string.Empty; + public string ExpectedResult { get; set; } = string.Empty; + public string? ActualResult { get; set; } + public bool ResultMatched { get; set; } + public bool ExceptionThrown { get; set; } + public string? ExceptionType { get; set; } + public string? ExceptionMessage { get; set; } + } + + private class TestRequest + { + [System.Text.Json.Serialization.JsonPropertyName("id")] + public string Id { get; set; } = string.Empty; + [System.Text.Json.Serialization.JsonPropertyName("data")] + public string Data { get; set; } = string.Empty; + } + + private class TestResponse + { + public string RequestId { get; set; } = string.Empty; + public string ProcessedData { get; set; } = string.Empty; + } + + private static JsonDocument CreatePayload(string id, string data) + { + var request = new TestRequest { Id = id, Data = data }; + return JsonDocument.Parse(JsonSerializer.Serialize(request)); + } + + private static TestLambdaContext CreateTestContext(string functionName, string requestId) + { + return new TestLambdaContext + { + FunctionName = functionName, + AwsRequestId = requestId, + RemainingTime = TimeSpan.FromMinutes(5) + }; + } + + /// + /// **Feature: idempotency-thread-safety, Property 5: IdempotencyAspectHandler Isolation** + /// **Validates: Requirements 4.1, 4.2, 4.3, 4.4** + /// + [Theory] + [InlineData(2)] + [InlineData(5)] + [InlineData(10)] + public void HandlerIsolation_ConcurrentInvocations_ShouldProcessIndependently(int concurrencyLevel) + { + var results = new ConcurrentBag(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int invocationIndex = i; + tasks[i] = Task.Run(async () => + { + var invocationId = Guid.NewGuid().ToString("N"); + var expectedResult = $"processed_{invocationId}"; + var result = new HandlerInvocationResult + { + InvocationIndex = invocationIndex, + InvocationId = invocationId, + ExpectedResult = expectedResult + }; + + try + { + var payload = CreatePayload(invocationId, $"data_{invocationIndex}"); + var context = CreateTestContext($"Function_{invocationIndex}", invocationId); + IdempotencyLib.Idempotency.RegisterLambdaContext(context); + + Func> targetFunc = async () => + { + await Task.Delay(Random.Shared.Next(5, 20)); + return new TestResponse { RequestId = invocationId, ProcessedData = expectedResult }; + }; + + barrier.SignalAndWait(); + + var handler = new IdempotencyAspectHandler( + targetFunc, $"TestFunction_{invocationIndex}", null, payload, context); + + var response = await handler.Handle(); + result.ActualResult = response?.ProcessedData; + result.ResultMatched = response?.ProcessedData == expectedResult && response?.RequestId == invocationId; + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionType = ex.GetType().Name; + result.ExceptionMessage = ex.Message; + } + + results.Add(result); + }); + } + + Task.WaitAll(tasks); + + Assert.All(results, r => Assert.False(r.ExceptionThrown, $"{r.ExceptionType}: {r.ExceptionMessage}")); + Assert.All(results, r => Assert.True(r.ResultMatched, $"Expected '{r.ExpectedResult}' but got '{r.ActualResult}'")); + } + + [Theory] + [InlineData(2)] + [InlineData(5)] + [InlineData(8)] + public void HandlerIsolation_MultipleHandlerInstances_ShouldOperateIndependently(int concurrencyLevel) + { + var results = new ConcurrentBag(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int invocationIndex = i; + tasks[i] = Task.Run(async () => + { + var invocationId = Guid.NewGuid().ToString("N"); + var result = new HandlerInvocationResult + { + InvocationIndex = invocationIndex, + InvocationId = invocationId, + ExpectedResult = $"result_{invocationId}" + }; + + try + { + var payload = CreatePayload(invocationId, $"data_{invocationIndex}"); + var context = CreateTestContext($"IndependentFunction_{invocationIndex}", invocationId); + IdempotencyLib.Idempotency.RegisterLambdaContext(context); + + Func> targetFunc = async () => + { + await Task.Delay(Random.Shared.Next(1, 10)); + return result.ExpectedResult; + }; + + barrier.SignalAndWait(); + + var handler = new IdempotencyAspectHandler( + targetFunc, $"IndependentFunction_{invocationIndex}", null, payload, context); + + result.ActualResult = await handler.Handle(); + result.ResultMatched = result.ActualResult == result.ExpectedResult; + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionType = ex.GetType().Name; + result.ExceptionMessage = ex.Message; + } + + results.Add(result); + }); + } + + Task.WaitAll(tasks); + + Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage)); + Assert.All(results, r => Assert.True(r.ResultMatched, $"Expected '{r.ExpectedResult}' but got '{r.ActualResult}'")); + } + + [Theory] + [InlineData(2)] + [InlineData(5)] + [InlineData(10)] + public void HandlerIsolation_ConcurrentHandleExecution_ShouldNotCrossContaminate(int concurrencyLevel) + { + var processedIds = new ConcurrentBag(); + var results = new ConcurrentBag(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int invocationIndex = i; + tasks[i] = Task.Run(async () => + { + var invocationId = Guid.NewGuid().ToString("N"); + var result = new HandlerInvocationResult { InvocationIndex = invocationIndex, InvocationId = invocationId }; + + try + { + var payload = CreatePayload(invocationId, $"concurrent_data_{invocationIndex}"); + var context = CreateTestContext($"ConcurrentFunction", invocationId); + IdempotencyLib.Idempotency.RegisterLambdaContext(context); + + string capturedId = invocationId; + Func> targetFunc = async () => + { + await Task.Delay(Random.Shared.Next(5, 15)); + processedIds.Add(capturedId); + return capturedId; + }; + + barrier.SignalAndWait(); + + var handler = new IdempotencyAspectHandler( + targetFunc, $"ConcurrentFunction_{invocationIndex}", null, payload, context); + + result.ActualResult = await handler.Handle(); + result.ExpectedResult = invocationId; + result.ResultMatched = result.ActualResult == invocationId; + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionType = ex.GetType().Name; + result.ExceptionMessage = ex.Message; + } + + results.Add(result); + }); + } + + Task.WaitAll(tasks); + + Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage)); + Assert.All(results, r => Assert.True(r.ResultMatched, $"Expected '{r.ExpectedResult}' but got '{r.ActualResult}'")); + Assert.Equal(concurrencyLevel, processedIds.Distinct().Count()); + } + + [Theory] + [InlineData(20)] + [InlineData(30)] + public async Task HandlerIsolation_HighConcurrency_ShouldMaintainIsolation(int concurrencyLevel) + { + var results = new ConcurrentBag(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int invocationIndex = i; + tasks[i] = Task.Run(async () => + { + var invocationId = Guid.NewGuid().ToString("N"); + var result = new HandlerInvocationResult + { + InvocationIndex = invocationIndex, + InvocationId = invocationId, + ExpectedResult = $"stress_result_{invocationId}" + }; + + try + { + var payload = CreatePayload(invocationId, $"stress_data_{invocationIndex}"); + var context = CreateTestContext($"StressFunction_{invocationIndex}", invocationId); + IdempotencyLib.Idempotency.RegisterLambdaContext(context); + + Func> targetFunc = async () => + { + await Task.Delay(Random.Shared.Next(1, 5)); + return result.ExpectedResult; + }; + + barrier.SignalAndWait(); + + var handler = new IdempotencyAspectHandler( + targetFunc, $"StressFunction_{invocationIndex}", null, payload, context); + + result.ActualResult = await handler.Handle(); + result.ResultMatched = result.ActualResult == result.ExpectedResult; + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionType = ex.GetType().Name; + result.ExceptionMessage = ex.Message; + } + + results.Add(result); + }); + } + + await Task.WhenAll(tasks); + + Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage)); + Assert.All(results, r => Assert.True(r.ResultMatched, $"Expected '{r.ExpectedResult}' but got '{r.ActualResult}'")); + } +} diff --git a/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/IdempotencyHashGenerationTests.cs b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/IdempotencyHashGenerationTests.cs new file mode 100644 index 000000000..ad220b83f --- /dev/null +++ b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/IdempotencyHashGenerationTests.cs @@ -0,0 +1,431 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +using System.Collections.Concurrent; +using System.Text.Json; +using System.Text.Json.Nodes; +using AWS.Lambda.Powertools.Idempotency; +using Xunit; + +namespace AWS.Lambda.Powertools.ConcurrencyTests.Idempotency; + +/// +/// Tests for validating hash generation thread safety under concurrent execution scenarios. +/// **Feature: idempotency-thread-safety, Property 6: Hash Generation Thread Safety** +/// **Validates: Requirements 1.4, 5.1, 5.2, 5.3** +/// +[Collection("Idempotency Hash Generation Tests")] +public class IdempotencyHashGenerationTests +{ + private class HashResult + { + public int ThreadIndex { get; set; } + public string Payload { get; set; } = string.Empty; + public string ExpectedHash { get; set; } = string.Empty; + public string ActualHash { get; set; } = string.Empty; + public bool Success { get; set; } + public bool ExceptionThrown { get; set; } + public string? ExceptionMessage { get; set; } + public string? ExceptionType { get; set; } + } + + /// + /// **Feature: idempotency-thread-safety, Property 6: Hash Generation Thread Safety** + /// **Validates: Requirements 1.4, 5.1, 5.2, 5.3** + /// + [Theory] + [InlineData(2, 5)] + [InlineData(5, 25)] + [InlineData(10, 50)] + public void HashGenerationThreadSafety_ConcurrentHashGeneration_ShouldProduceCorrectHashes(int concurrencyLevel, int operationsPerThread) + { + var persistenceStore = new ThreadSafeInMemoryPersistenceStore(); + persistenceStore.Configure(new IdempotencyOptionsBuilder().Build(), null, null); + + var results = new ConcurrentBag(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int threadIndex = i; + tasks[i] = Task.Run(() => + { + try + { + barrier.SignalAndWait(); + + for (int op = 0; op < operationsPerThread; op++) + { + var result = new HashResult { ThreadIndex = threadIndex }; + + try + { + string payload = $"thread_{threadIndex}_operation_{op}"; + result.Payload = payload; + + var jsonValue = JsonValue.Create(payload); + var jsonDoc = JsonDocument.Parse(jsonValue!.ToJsonString()); + var hash = persistenceStore.GenerateHash(jsonDoc.RootElement); + + var expectedHash = ComputeExpectedMd5Hash(payload); + result.ExpectedHash = expectedHash; + result.ActualHash = hash; + result.Success = hash == expectedHash; + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionType = ex.GetType().Name; + result.ExceptionMessage = ex.Message; + } + + results.Add(result); + } + } + catch (Exception ex) + { + results.Add(new HashResult + { + ThreadIndex = threadIndex, + ExceptionThrown = true, + ExceptionType = ex.GetType().Name, + ExceptionMessage = ex.Message + }); + } + }); + } + + Task.WaitAll(tasks); + + Assert.All(results, r => Assert.True(r.Success && !r.ExceptionThrown, + $"Thread {r.ThreadIndex} failed: expected '{r.ExpectedHash}', got '{r.ActualHash}'. {r.ExceptionType}: {r.ExceptionMessage}")); + } + + [Theory] + [InlineData(2)] + [InlineData(5)] + [InlineData(10)] + public void HashGenerationThreadSafety_SamePayload_ShouldProduceSameHash(int concurrencyLevel) + { + var persistenceStore = new ThreadSafeInMemoryPersistenceStore(); + persistenceStore.Configure(new IdempotencyOptionsBuilder().Build(), null, null); + + const string sharedPayload = "shared_test_payload"; + var jsonValue = JsonValue.Create(sharedPayload); + var jsonString = jsonValue!.ToJsonString(); + + var hashes = new ConcurrentBag(); + var exceptions = new ConcurrentBag(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + tasks[i] = Task.Run(() => + { + try + { + barrier.SignalAndWait(); + + for (int j = 0; j < 10; j++) + { + var jsonDoc = JsonDocument.Parse(jsonString); + var hash = persistenceStore.GenerateHash(jsonDoc.RootElement); + hashes.Add(hash); + } + } + catch (Exception ex) + { + exceptions.Add(ex); + } + }); + } + + Task.WaitAll(tasks); + + Assert.Empty(exceptions); + var distinctHashes = hashes.Distinct().ToList(); + Assert.Single(distinctHashes); + } + + [Theory] + [InlineData(2)] + [InlineData(5)] + [InlineData(10)] + public void HashGenerationThreadSafety_DifferentPayloads_ShouldProduceDifferentHashes(int concurrencyLevel) + { + var persistenceStore = new ThreadSafeInMemoryPersistenceStore(); + persistenceStore.Configure(new IdempotencyOptionsBuilder().Build(), null, null); + + var hashToPayload = new ConcurrentDictionary(); + var exceptions = new ConcurrentBag(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int threadIndex = i; + tasks[i] = Task.Run(() => + { + try + { + barrier.SignalAndWait(); + + for (int j = 0; j < 10; j++) + { + string payload = $"unique_payload_thread_{threadIndex}_op_{j}"; + var jsonValue = JsonValue.Create(payload); + var jsonDoc = JsonDocument.Parse(jsonValue!.ToJsonString()); + var hash = persistenceStore.GenerateHash(jsonDoc.RootElement); + + hashToPayload.TryAdd(hash, payload); + } + } + catch (Exception ex) + { + exceptions.Add(ex); + } + }); + } + + Task.WaitAll(tasks); + + Assert.Empty(exceptions); + int expectedUniquePayloads = concurrencyLevel * 10; + Assert.Equal(expectedUniquePayloads, hashToPayload.Count); + } + + [Theory] + [InlineData(2)] + [InlineData(5)] + [InlineData(10)] + public void HashGenerationThreadSafety_ComplexObjects_ShouldProduceCorrectHashes(int concurrencyLevel) + { + var persistenceStore = new ThreadSafeInMemoryPersistenceStore(); + persistenceStore.Configure(new IdempotencyOptionsBuilder().Build(), null, null); + + var results = new ConcurrentBag(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int threadIndex = i; + tasks[i] = Task.Run(() => + { + try + { + barrier.SignalAndWait(); + + for (int op = 0; op < 10; op++) + { + var result = new HashResult { ThreadIndex = threadIndex }; + + try + { + var complexObject = new + { + Id = threadIndex * 100 + op, + Name = $"Item_{threadIndex}_{op}", + Price = (threadIndex + 1) * 10.5 + op, + Tags = new[] { $"tag_{threadIndex}", $"tag_{op}" }, + Metadata = new { ThreadId = threadIndex, OperationId = op } + }; + + var jsonString = JsonSerializer.Serialize(complexObject); + result.Payload = jsonString; + + var jsonDoc = JsonDocument.Parse(jsonString); + var hash = persistenceStore.GenerateHash(jsonDoc.RootElement); + result.ActualHash = hash; + + result.Success = !string.IsNullOrEmpty(hash) && + hash.Length == 32 && + hash.All(c => char.IsLetterOrDigit(c)); + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionType = ex.GetType().Name; + result.ExceptionMessage = ex.Message; + } + + results.Add(result); + } + } + catch (Exception ex) + { + results.Add(new HashResult + { + ThreadIndex = threadIndex, + ExceptionThrown = true, + ExceptionType = ex.GetType().Name, + ExceptionMessage = ex.Message + }); + } + }); + } + + Task.WaitAll(tasks); + + Assert.All(results, r => Assert.True(r.Success && !r.ExceptionThrown, + $"Thread {r.ThreadIndex} failed: {r.ExceptionType}: {r.ExceptionMessage}")); + } + + private static string ComputeExpectedMd5Hash(string input) + { + using var md5 = System.Security.Cryptography.MD5.Create(); + var inputBytes = System.Text.Encoding.UTF8.GetBytes(input); + var hashBytes = md5.ComputeHash(inputBytes); + return Convert.ToHexString(hashBytes).ToLowerInvariant(); + } + + [Theory] + [InlineData(20, 100)] + [InlineData(30, 50)] + public void HashGenerationThreadSafety_HighConcurrency_ShouldCompleteWithoutExceptions(int concurrencyLevel, int operationsPerThread) + { + var persistenceStore = new ThreadSafeInMemoryPersistenceStore(); + persistenceStore.Configure(new IdempotencyOptionsBuilder().Build(), null, null); + + var exceptions = new ConcurrentBag(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int threadIndex = i; + tasks[i] = Task.Run(() => + { + try + { + barrier.SignalAndWait(); + + for (int op = 0; op < operationsPerThread; op++) + { + string payload = $"stress_test_thread_{threadIndex}_op_{op}"; + var jsonValue = JsonValue.Create(payload); + var jsonDoc = JsonDocument.Parse(jsonValue!.ToJsonString()); + persistenceStore.GenerateHash(jsonDoc.RootElement); + } + } + catch (Exception ex) + { + exceptions.Add(ex); + } + }); + } + + Task.WaitAll(tasks); + + Assert.Empty(exceptions); + } + + [Fact] + public void HashGenerationThreadSafety_MultipleStoreInstances_ShouldProduceSameHashes() + { + const string testPayload = "test_payload_for_consistency"; + var jsonValue = JsonValue.Create(testPayload); + var jsonString = jsonValue!.ToJsonString(); + + var hashes = new ConcurrentBag(); + var exceptions = new ConcurrentBag(); + var barrier = new Barrier(10); + var tasks = new Task[10]; + + for (int i = 0; i < 10; i++) + { + tasks[i] = Task.Run(() => + { + try + { + var store = new ThreadSafeInMemoryPersistenceStore(); + store.Configure(new IdempotencyOptionsBuilder().Build(), null, null); + + barrier.SignalAndWait(); + + var jsonDoc = JsonDocument.Parse(jsonString); + var hash = store.GenerateHash(jsonDoc.RootElement); + hashes.Add(hash); + } + catch (Exception ex) + { + exceptions.Add(ex); + } + }); + } + + Task.WaitAll(tasks); + + Assert.Empty(exceptions); + var distinctHashes = hashes.Distinct().ToList(); + Assert.Single(distinctHashes); + } + + [Fact] + public void HashGenerationThreadSafety_VariousJsonTypes_ShouldProduceValidHashes() + { + var persistenceStore = new ThreadSafeInMemoryPersistenceStore(); + persistenceStore.Configure(new IdempotencyOptionsBuilder().Build(), null, null); + + var testCases = new[] + { + ("string", "\"test string\""), + ("number", "42"), + ("decimal", "3.14159"), + ("boolean_true", "true"), + ("boolean_false", "false"), + ("null", "null"), + ("array", "[1, 2, 3]"), + ("object", "{\"key\": \"value\"}") + }; + + var results = new ConcurrentBag<(string type, string hash, bool valid)>(); + var exceptions = new ConcurrentBag(); + var barrier = new Barrier(testCases.Length); + var tasks = new Task[testCases.Length]; + + for (int i = 0; i < testCases.Length; i++) + { + var (typeName, jsonValue) = testCases[i]; + tasks[i] = Task.Run(() => + { + try + { + barrier.SignalAndWait(); + + var jsonDoc = JsonDocument.Parse(jsonValue); + var hash = persistenceStore.GenerateHash(jsonDoc.RootElement); + + bool isValid = !string.IsNullOrEmpty(hash) && + hash.Length == 32 && + hash.All(c => char.IsLetterOrDigit(c)); + + results.Add((typeName, hash, isValid)); + } + catch (Exception ex) + { + exceptions.Add(ex); + } + }); + } + + Task.WaitAll(tasks); + + Assert.Empty(exceptions); + Assert.All(results, r => Assert.True(r.valid, $"Hash for type '{r.type}' was invalid: {r.hash}")); + } +} diff --git a/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/IdempotencyPersistenceStoreTests.cs b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/IdempotencyPersistenceStoreTests.cs new file mode 100644 index 000000000..10ae488e5 --- /dev/null +++ b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/IdempotencyPersistenceStoreTests.cs @@ -0,0 +1,715 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +using System.Collections.Concurrent; +using AWS.Lambda.Powertools.Idempotency; +using AWS.Lambda.Powertools.Idempotency.Persistence; +using Xunit; + +namespace AWS.Lambda.Powertools.ConcurrencyTests.Idempotency; + +/// +/// Tests for validating persistence store thread safety under concurrent execution scenarios. +/// +/// These tests verify that when multiple threads perform SaveInProgress, SaveSuccess, +/// GetRecord, and DeleteRecord operations simultaneously, all operations complete without +/// exceptions and each operation affects only its intended record. +/// +[Collection("Idempotency Persistence Store Tests")] +public class IdempotencyPersistenceStoreTests +{ + #region Helper Classes + + /// + /// Result of a persistence store operation for tracking test outcomes. + /// + private class PersistenceOperationResult + { + public int ThreadIndex { get; set; } + public string OperationType { get; set; } = string.Empty; + public string IdempotencyKey { get; set; } = string.Empty; + public bool Success { get; set; } + public bool ExceptionThrown { get; set; } + public string? ExceptionMessage { get; set; } + public string? ExceptionType { get; set; } + public DataRecord? RetrievedRecord { get; set; } + } + + /// + /// Creates a configured ThreadSafeInMemoryPersistenceStore for testing. + /// + private static ThreadSafeInMemoryPersistenceStore CreateConfiguredStore() + { + var store = new ThreadSafeInMemoryPersistenceStore(); + var options = new IdempotencyOptionsBuilder() + .WithEventKeyJmesPath("id") + .WithExpiration(TimeSpan.FromMinutes(5)) + .Build(); + store.Configure(options, "TestFunction", null); + return store; + } + + /// + /// Creates a DataRecord for testing purposes. + /// + private static DataRecord CreateTestRecord(string key, DataRecord.DataRecordStatus status, string? responseData = null) + { + return new DataRecord( + key, + status, + DateTimeOffset.UtcNow.AddMinutes(5).ToUnixTimeSeconds(), + responseData ?? $"response_{key}", + $"hash_{key}" + ); + } + + #endregion + + + #region Property 4: Persistence Store Operations Thread Safety + + /// + /// **Feature: idempotency-thread-safety, Property 4: Persistence Store Operations Thread Safety** + /// *For any* set of concurrent persistence store operations (SaveInProgress, SaveSuccess, GetRecord, DeleteRecord) + /// with different idempotency keys, all operations should complete without throwing concurrency-related exceptions + /// and each operation should affect only its intended record. + /// **Validates: Requirements 1.3, 3.1, 3.2, 3.3, 3.4, 3.5** + /// + [Theory] + [InlineData(2, 5)] + [InlineData(5, 10)] + [InlineData(10, 20)] + public void PersistenceStoreThreadSafety_ConcurrentOperations_ShouldCompleteWithoutExceptions(int concurrencyLevel, int operationsPerThread) + { + var store = CreateConfiguredStore(); + var results = new ConcurrentBag(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int threadIndex = i; + tasks[i] = Task.Run(async () => + { + try + { + barrier.SignalAndWait(); + + for (int op = 0; op < operationsPerThread; op++) + { + var result = new PersistenceOperationResult { ThreadIndex = threadIndex }; + string key = $"key_{threadIndex}_{op}"; + result.IdempotencyKey = key; + + try + { + // Mix of operations: PutRecord, GetRecord, UpdateRecord, DeleteRecord + int opType = (threadIndex + op) % 4; + var record = CreateTestRecord(key, DataRecord.DataRecordStatus.INPROGRESS); + + switch (opType) + { + case 0: // PutRecord + result.OperationType = "PutRecord"; + await store.PutRecord(record, DateTimeOffset.UtcNow); + break; + case 1: // GetRecord + result.OperationType = "GetRecord"; + result.RetrievedRecord = await store.GetRecord(key); + break; + case 2: // UpdateRecord + result.OperationType = "UpdateRecord"; + var completedRecord = CreateTestRecord(key, DataRecord.DataRecordStatus.COMPLETED, $"result_{threadIndex}_{op}"); + await store.UpdateRecord(completedRecord); + break; + case 3: // DeleteRecord + result.OperationType = "DeleteRecord"; + await store.DeleteRecord(key); + break; + } + + result.Success = true; + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionType = ex.GetType().Name; + result.ExceptionMessage = ex.Message; + } + + results.Add(result); + } + } + catch (Exception ex) + { + results.Add(new PersistenceOperationResult + { + ThreadIndex = threadIndex, + OperationType = "Barrier", + ExceptionThrown = true, + ExceptionType = ex.GetType().Name, + ExceptionMessage = ex.Message + }); + } + }); + } + + Task.WaitAll(tasks); + + // All operations should complete without exceptions + Assert.All(results, r => Assert.True(r.Success && !r.ExceptionThrown, + $"Thread {r.ThreadIndex} {r.OperationType} for key '{r.IdempotencyKey}' failed: {r.ExceptionType}: {r.ExceptionMessage}")); + } + + /// + /// **Feature: idempotency-thread-safety, Property 4a: Concurrent SaveInProgress Operations** + /// *For any* set of concurrent SaveInProgress operations with different keys, + /// all operations should complete without interference. + /// **Validates: Requirements 3.1** + /// + [Theory] + [InlineData(2)] + [InlineData(5)] + [InlineData(10)] + public void PersistenceStoreThreadSafety_ConcurrentSaveInProgress_ShouldCompleteWithoutInterference(int concurrencyLevel) + { + var store = CreateConfiguredStore(); + var results = new ConcurrentBag(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int threadIndex = i; + tasks[i] = Task.Run(async () => + { + var result = new PersistenceOperationResult + { + ThreadIndex = threadIndex, + OperationType = "PutRecord", + IdempotencyKey = $"inprogress_key_{threadIndex}" + }; + + try + { + barrier.SignalAndWait(); + + var record = CreateTestRecord(result.IdempotencyKey, DataRecord.DataRecordStatus.INPROGRESS); + await store.PutRecord(record, DateTimeOffset.UtcNow); + + result.Success = true; + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionType = ex.GetType().Name; + result.ExceptionMessage = ex.Message; + } + + results.Add(result); + }); + } + + Task.WaitAll(tasks); + + // All operations should succeed + Assert.All(results, r => Assert.True(r.Success && !r.ExceptionThrown, + $"Thread {r.ThreadIndex} failed: {r.ExceptionType}: {r.ExceptionMessage}")); + + // Verify all records were saved + for (int i = 0; i < concurrencyLevel; i++) + { + Assert.True(store.ContainsKey($"inprogress_key_{i}"), + $"Record for key 'inprogress_key_{i}' was not saved"); + } + } + + + /// + /// **Feature: idempotency-thread-safety, Property 4b: Concurrent SaveSuccess Operations** + /// *For any* set of concurrent SaveSuccess operations with different keys, + /// all records should be persisted correctly. + /// **Validates: Requirements 3.2** + /// + [Theory] + [InlineData(2)] + [InlineData(5)] + [InlineData(10)] + public void PersistenceStoreThreadSafety_ConcurrentSaveSuccess_ShouldPersistAllRecords(int concurrencyLevel) + { + var store = CreateConfiguredStore(); + var results = new ConcurrentBag(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + var expectedResponses = new ConcurrentDictionary(); + + for (int i = 0; i < concurrencyLevel; i++) + { + int threadIndex = i; + tasks[i] = Task.Run(async () => + { + string key = $"success_key_{threadIndex}"; + string responseData = $"response_data_{threadIndex}"; + expectedResponses[key] = responseData; + + var result = new PersistenceOperationResult + { + ThreadIndex = threadIndex, + OperationType = "UpdateRecord", + IdempotencyKey = key + }; + + try + { + barrier.SignalAndWait(); + + var record = CreateTestRecord(key, DataRecord.DataRecordStatus.COMPLETED, responseData); + await store.UpdateRecord(record); + + result.Success = true; + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionType = ex.GetType().Name; + result.ExceptionMessage = ex.Message; + } + + results.Add(result); + }); + } + + Task.WaitAll(tasks); + + // All operations should succeed + Assert.All(results, r => Assert.True(r.Success && !r.ExceptionThrown, + $"Thread {r.ThreadIndex} failed: {r.ExceptionType}: {r.ExceptionMessage}")); + + // Verify all records were saved with correct data + foreach (var kvp in expectedResponses) + { + Assert.True(store.TryGetRecord(kvp.Key, out var record), + $"Record for key '{kvp.Key}' was not found"); + Assert.Equal(kvp.Value, record?.ResponseData); + } + } + + /// + /// **Feature: idempotency-thread-safety, Property 4c: Concurrent GetRecord Operations** + /// *For any* set of concurrent GetRecord operations for different keys, + /// each operation should return the correct record for its key. + /// **Validates: Requirements 3.3** + /// + [Theory] + [InlineData(2)] + [InlineData(5)] + [InlineData(10)] + public void PersistenceStoreThreadSafety_ConcurrentGetRecord_ShouldReturnCorrectRecords(int concurrencyLevel) + { + var store = CreateConfiguredStore(); + + // Pre-populate store with known records + var expectedRecords = new Dictionary(); + for (int i = 0; i < concurrencyLevel; i++) + { + string key = $"get_key_{i}"; + var record = CreateTestRecord(key, DataRecord.DataRecordStatus.COMPLETED, $"response_{i}"); + store.PutRecord(record, DateTimeOffset.UtcNow).Wait(); + expectedRecords[key] = record; + } + + var results = new ConcurrentBag(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int threadIndex = i; + tasks[i] = Task.Run(async () => + { + string key = $"get_key_{threadIndex}"; + var result = new PersistenceOperationResult + { + ThreadIndex = threadIndex, + OperationType = "GetRecord", + IdempotencyKey = key + }; + + try + { + barrier.SignalAndWait(); + + result.RetrievedRecord = await store.GetRecord(key); + result.Success = true; + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionType = ex.GetType().Name; + result.ExceptionMessage = ex.Message; + } + + results.Add(result); + }); + } + + Task.WaitAll(tasks); + + // All operations should succeed + Assert.All(results, r => Assert.True(r.Success && !r.ExceptionThrown, + $"Thread {r.ThreadIndex} failed: {r.ExceptionType}: {r.ExceptionMessage}")); + + // Verify all retrieved records match expected + foreach (var r in results) + { + Assert.NotNull(r.RetrievedRecord); + var expected = expectedRecords[r.IdempotencyKey]; + Assert.Equal(expected.IdempotencyKey, r.RetrievedRecord.IdempotencyKey); + Assert.Equal(expected.ResponseData, r.RetrievedRecord.ResponseData); + } + } + + + /// + /// **Feature: idempotency-thread-safety, Property 4d: Concurrent DeleteRecord Operations** + /// *For any* set of concurrent DeleteRecord operations for different keys, + /// only the specified records should be deleted. + /// **Validates: Requirements 3.4** + /// + [Theory] + [InlineData(2)] + [InlineData(5)] + [InlineData(10)] + public void PersistenceStoreThreadSafety_ConcurrentDeleteRecord_ShouldDeleteOnlySpecifiedRecords(int concurrencyLevel) + { + var store = CreateConfiguredStore(); + + // Pre-populate store with records to delete and records to keep + var keysToDelete = new List(); + var keysToKeep = new List(); + + for (int i = 0; i < concurrencyLevel; i++) + { + string deleteKey = $"delete_key_{i}"; + string keepKey = $"keep_key_{i}"; + + var deleteRecord = CreateTestRecord(deleteKey, DataRecord.DataRecordStatus.COMPLETED); + var keepRecord = CreateTestRecord(keepKey, DataRecord.DataRecordStatus.COMPLETED); + + store.PutRecord(deleteRecord, DateTimeOffset.UtcNow).Wait(); + store.PutRecord(keepRecord, DateTimeOffset.UtcNow).Wait(); + + keysToDelete.Add(deleteKey); + keysToKeep.Add(keepKey); + } + + var results = new ConcurrentBag(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int threadIndex = i; + tasks[i] = Task.Run(async () => + { + string key = $"delete_key_{threadIndex}"; + var result = new PersistenceOperationResult + { + ThreadIndex = threadIndex, + OperationType = "DeleteRecord", + IdempotencyKey = key + }; + + try + { + barrier.SignalAndWait(); + + await store.DeleteRecord(key); + result.Success = true; + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionType = ex.GetType().Name; + result.ExceptionMessage = ex.Message; + } + + results.Add(result); + }); + } + + Task.WaitAll(tasks); + + // All operations should succeed + Assert.All(results, r => Assert.True(r.Success && !r.ExceptionThrown, + $"Thread {r.ThreadIndex} failed: {r.ExceptionType}: {r.ExceptionMessage}")); + + // Verify deleted keys are gone + foreach (var key in keysToDelete) + { + Assert.False(store.ContainsKey(key), $"Key '{key}' should have been deleted"); + } + + // Verify kept keys still exist + foreach (var key in keysToKeep) + { + Assert.True(store.ContainsKey(key), $"Key '{key}' should still exist"); + } + } + + /// + /// **Feature: idempotency-thread-safety, Property 4e: Mixed Concurrent Operations** + /// *For any* combination of concurrent save, get, and delete operations, + /// data integrity should be maintained across all operations. + /// **Validates: Requirements 3.5** + /// + [Theory] + [InlineData(4)] + [InlineData(8)] + [InlineData(12)] + public void PersistenceStoreThreadSafety_MixedOperations_ShouldMaintainDataIntegrity(int concurrencyLevel) + { + var store = CreateConfiguredStore(); + var results = new ConcurrentBag(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + // Pre-populate some records + for (int i = 0; i < concurrencyLevel / 2; i++) + { + var record = CreateTestRecord($"existing_key_{i}", DataRecord.DataRecordStatus.COMPLETED); + store.PutRecord(record, DateTimeOffset.UtcNow).Wait(); + } + + for (int i = 0; i < concurrencyLevel; i++) + { + int threadIndex = i; + tasks[i] = Task.Run(async () => + { + var result = new PersistenceOperationResult { ThreadIndex = threadIndex }; + + try + { + barrier.SignalAndWait(); + + // Different threads do different operations + int opType = threadIndex % 4; + + switch (opType) + { + case 0: // Put new record + result.OperationType = "PutRecord"; + result.IdempotencyKey = $"new_key_{threadIndex}"; + var newRecord = CreateTestRecord(result.IdempotencyKey, DataRecord.DataRecordStatus.INPROGRESS); + await store.PutRecord(newRecord, DateTimeOffset.UtcNow); + break; + + case 1: // Get existing record + result.OperationType = "GetRecord"; + result.IdempotencyKey = $"existing_key_{threadIndex % (concurrencyLevel / 2)}"; + result.RetrievedRecord = await store.GetRecord(result.IdempotencyKey); + break; + + case 2: // Update existing record + result.OperationType = "UpdateRecord"; + result.IdempotencyKey = $"existing_key_{threadIndex % (concurrencyLevel / 2)}"; + var updateRecord = CreateTestRecord(result.IdempotencyKey, DataRecord.DataRecordStatus.COMPLETED, $"updated_{threadIndex}"); + await store.UpdateRecord(updateRecord); + break; + + case 3: // Delete (non-existing key to avoid affecting other tests) + result.OperationType = "DeleteRecord"; + result.IdempotencyKey = $"delete_target_{threadIndex}"; + await store.DeleteRecord(result.IdempotencyKey); + break; + } + + result.Success = true; + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionType = ex.GetType().Name; + result.ExceptionMessage = ex.Message; + } + + results.Add(result); + }); + } + + Task.WaitAll(tasks); + + // All operations should complete without exceptions + Assert.All(results, r => Assert.True(r.Success && !r.ExceptionThrown, + $"Thread {r.ThreadIndex} {r.OperationType} for key '{r.IdempotencyKey}' failed: {r.ExceptionType}: {r.ExceptionMessage}")); + } + + #endregion + + + #region Additional Stress Tests + + /// + /// High concurrency stress test for persistence store operations. + /// + [Theory] + [InlineData(20, 50)] + [InlineData(30, 30)] + public void PersistenceStoreThreadSafety_HighConcurrency_ShouldCompleteWithoutExceptions(int concurrencyLevel, int operationsPerThread) + { + var store = CreateConfiguredStore(); + var exceptions = new ConcurrentBag(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int threadIndex = i; + tasks[i] = Task.Run(async () => + { + try + { + barrier.SignalAndWait(); + + for (int op = 0; op < operationsPerThread; op++) + { + int opType = (threadIndex + op) % 4; + string key = $"stress_key_{threadIndex}_{op}"; + var record = CreateTestRecord(key, DataRecord.DataRecordStatus.INPROGRESS); + + switch (opType) + { + case 0: + await store.PutRecord(record, DateTimeOffset.UtcNow); + break; + case 1: + await store.GetRecord(key); + break; + case 2: + var completedRecord = CreateTestRecord(key, DataRecord.DataRecordStatus.COMPLETED); + await store.UpdateRecord(completedRecord); + break; + case 3: + await store.DeleteRecord(key); + break; + } + } + } + catch (Exception ex) + { + exceptions.Add(ex); + } + }); + } + + Task.WaitAll(tasks); + + Assert.Empty(exceptions); + } + + /// + /// Test that verifies operation counters are correctly incremented under concurrent access. + /// + [Fact] + public void PersistenceStoreThreadSafety_OperationCounters_ShouldBeAccurate() + { + var store = CreateConfiguredStore(); + int concurrencyLevel = 10; + int operationsPerThread = 20; + + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int threadIndex = i; + tasks[i] = Task.Run(async () => + { + barrier.SignalAndWait(); + + for (int op = 0; op < operationsPerThread; op++) + { + string key = $"counter_key_{threadIndex}_{op}"; + var record = CreateTestRecord(key, DataRecord.DataRecordStatus.COMPLETED); + + await store.PutRecord(record, DateTimeOffset.UtcNow); + await store.GetRecord(key); + await store.UpdateRecord(record); + await store.DeleteRecord(key); + } + }); + } + + Task.WaitAll(tasks); + + int expectedOperations = concurrencyLevel * operationsPerThread; + + Assert.Equal(expectedOperations, store.PutRecordCount); + Assert.Equal(expectedOperations, store.GetRecordCount); + Assert.Equal(expectedOperations, store.UpdateRecordCount); + Assert.Equal(expectedOperations, store.DeleteRecordCount); + } + + /// + /// Test that verifies concurrent operations on the same key are handled correctly. + /// + [Fact] + public void PersistenceStoreThreadSafety_SameKeyConcurrentOperations_ShouldNotCorruptData() + { + var store = CreateConfiguredStore(); + int concurrencyLevel = 10; + string sharedKey = "shared_key"; + + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + var exceptions = new ConcurrentBag(); + + for (int i = 0; i < concurrencyLevel; i++) + { + int threadIndex = i; + tasks[i] = Task.Run(async () => + { + try + { + barrier.SignalAndWait(); + + // All threads operate on the same key + var record = CreateTestRecord(sharedKey, DataRecord.DataRecordStatus.COMPLETED, $"response_{threadIndex}"); + + await store.PutRecord(record, DateTimeOffset.UtcNow); + await store.GetRecord(sharedKey); + await store.UpdateRecord(record); + } + catch (Exception ex) + { + exceptions.Add(ex); + } + }); + } + + Task.WaitAll(tasks); + + // No exceptions should be thrown + Assert.Empty(exceptions); + + // The key should still exist with valid data + Assert.True(store.ContainsKey(sharedKey)); + Assert.True(store.TryGetRecord(sharedKey, out var finalRecord)); + Assert.NotNull(finalRecord); + Assert.Equal(sharedKey, finalRecord.IdempotencyKey); + } + + #endregion +} diff --git a/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/LRUCacheThreadSafetyTests.cs b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/LRUCacheThreadSafetyTests.cs new file mode 100644 index 000000000..1c6ac7b80 --- /dev/null +++ b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/LRUCacheThreadSafetyTests.cs @@ -0,0 +1,485 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +using System.Collections.Concurrent; +using Xunit; + +namespace AWS.Lambda.Powertools.ConcurrencyTests.Idempotency; + +/// +/// Tests for validating LRU Cache thread safety under concurrent execution scenarios. +/// +/// These tests verify that when multiple threads perform read, write, and delete +/// operations on the LRU cache simultaneously, all operations complete without +/// exceptions and without data corruption. +/// +[Collection("Idempotency LRU Cache Tests")] +public class LRUCacheThreadSafetyTests +{ + #region Helper Classes + + private class OperationResult + { + public int ThreadIndex { get; set; } + public string OperationType { get; set; } = string.Empty; + public bool Success { get; set; } + public bool ExceptionThrown { get; set; } + public string? ExceptionMessage { get; set; } + public string? ExceptionType { get; set; } + } + + /// + /// Wrapper to access internal LRUCache for testing purposes. + /// Uses reflection to create and interact with the internal LRUCache class. + /// + private class LRUCacheWrapper where TKey : notnull + { + private readonly object _cache; + private readonly Type _cacheType; + + public LRUCacheWrapper(int capacity) + { + var assembly = typeof(AWS.Lambda.Powertools.Idempotency.Idempotency).Assembly; + _cacheType = assembly.GetType("AWS.Lambda.Powertools.Idempotency.Internal.LRUCache`2")! + .MakeGenericType(typeof(TKey), typeof(TValue)); + _cache = Activator.CreateInstance(_cacheType, capacity)!; + } + + public bool TryGet(TKey key, out TValue? value) + { + var method = _cacheType.GetMethod("TryGet")!; + var parameters = new object?[] { key, null }; + var result = (bool)method.Invoke(_cache, parameters)!; + value = (TValue?)parameters[1]; + return result; + } + + public void Set(TKey key, TValue value) + { + var method = _cacheType.GetMethod("Set")!; + method.Invoke(_cache, new object?[] { key, value }); + } + + public void Delete(TKey key) + { + var method = _cacheType.GetMethod("Delete")!; + method.Invoke(_cache, new object?[] { key }); + } + + public int Count + { + get + { + var property = _cacheType.GetProperty("Count")!; + return (int)property.GetValue(_cache)!; + } + } + } + + #endregion + + #region Property 3: LRU Cache Thread Safety + + /// + /// **Feature: idempotency-thread-safety, Property 3: LRU Cache Thread Safety** + /// *For any* combination of concurrent read, write, and delete operations on the LRU cache, + /// all operations should complete without throwing exceptions and without data corruption + /// (reads return correct values, writes persist correctly, deletes remove only specified entries). + /// **Validates: Requirements 2.1, 2.2, 2.3, 2.4** + /// + [Theory] + [InlineData(2, 10)] + [InlineData(5, 50)] + [InlineData(10, 100)] + public void LRUCacheThreadSafety_ConcurrentOperations_ShouldCompleteWithoutExceptions(int concurrencyLevel, int operationsPerThread) + { + var cache = new LRUCacheWrapper(50); + var results = new ConcurrentBag(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int threadIndex = i; + tasks[i] = Task.Run(() => + { + try + { + barrier.SignalAndWait(); + + for (int op = 0; op < operationsPerThread; op++) + { + var result = new OperationResult { ThreadIndex = threadIndex }; + + try + { + // Mix of operations: write, read, delete + int opType = (threadIndex + op) % 3; + string key = $"key_{threadIndex}_{op % 20}"; // Reuse some keys + + switch (opType) + { + case 0: // Write + result.OperationType = "Set"; + cache.Set(key, $"value_{threadIndex}_{op}"); + break; + case 1: // Read + result.OperationType = "TryGet"; + cache.TryGet(key, out _); + break; + case 2: // Delete + result.OperationType = "Delete"; + cache.Delete(key); + break; + } + + result.Success = true; + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionType = ex.GetType().Name; + result.ExceptionMessage = ex.Message; + } + + results.Add(result); + } + } + catch (Exception ex) + { + results.Add(new OperationResult + { + ThreadIndex = threadIndex, + OperationType = "Barrier", + ExceptionThrown = true, + ExceptionType = ex.GetType().Name, + ExceptionMessage = ex.Message + }); + } + }); + } + + Task.WaitAll(tasks); + + // All operations should complete without exceptions + Assert.All(results, r => Assert.True(r.Success && !r.ExceptionThrown, + $"Thread {r.ThreadIndex} {r.OperationType} failed: {r.ExceptionType}: {r.ExceptionMessage}")); + } + + /// + /// **Feature: idempotency-thread-safety, Property 3a: Concurrent Reads Return Correct Values** + /// *For any* set of concurrent read operations on the LRU cache, each read should return + /// the correct value that was previously written for that key. + /// **Validates: Requirements 2.1** + /// + [Theory] + [InlineData(2)] + [InlineData(5)] + [InlineData(10)] + public void LRUCacheThreadSafety_ConcurrentReads_ShouldReturnCorrectValues(int concurrencyLevel) + { + var cache = new LRUCacheWrapper(100); + + // Pre-populate cache with known values + for (int i = 0; i < concurrencyLevel * 5; i++) + { + cache.Set(i, $"value_{i}"); + } + + var results = new ConcurrentBag<(int key, string? value, bool found, bool correct)>(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int threadIndex = i; + tasks[i] = Task.Run(() => + { + barrier.SignalAndWait(); + + // Each thread reads multiple keys + for (int j = 0; j < concurrencyLevel * 5; j++) + { + bool found = cache.TryGet(j, out var value); + bool correct = !found || value == $"value_{j}"; + results.Add((j, value, found, correct)); + } + }); + } + + Task.WaitAll(tasks); + + // All reads that found a value should have the correct value + Assert.All(results, r => Assert.True(r.correct, + $"Key {r.key} returned incorrect value: expected 'value_{r.key}', got '{r.value}'")); + } + + /// + /// **Feature: idempotency-thread-safety, Property 3b: Concurrent Writes Complete Without Corruption** + /// *For any* set of concurrent write operations on the LRU cache, all writes should + /// complete and the final state should be consistent (no corrupted entries). + /// **Validates: Requirements 2.2** + /// + [Theory] + [InlineData(2)] + [InlineData(5)] + [InlineData(10)] + public void LRUCacheThreadSafety_ConcurrentWrites_ShouldCompleteWithoutCorruption(int concurrencyLevel) + { + int keysPerThread = 10; + + var cache = new LRUCacheWrapper(concurrencyLevel * keysPerThread); + var writtenValues = new ConcurrentDictionary(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + var exceptions = new ConcurrentBag(); + + for (int i = 0; i < concurrencyLevel; i++) + { + int threadIndex = i; + tasks[i] = Task.Run(() => + { + try + { + barrier.SignalAndWait(); + + for (int j = 0; j < keysPerThread; j++) + { + string key = $"thread_{threadIndex}_key_{j}"; + int value = threadIndex * 1000 + j; + cache.Set(key, value); + writtenValues[key] = value; + } + } + catch (Exception ex) + { + exceptions.Add(ex); + } + }); + } + + Task.WaitAll(tasks); + + Assert.Empty(exceptions); + + // Verify all written values can be read back correctly + foreach (var kvp in writtenValues) + { + if (cache.TryGet(kvp.Key, out var value)) + { + Assert.Equal(kvp.Value, value); + } + } + } + + /// + /// **Feature: idempotency-thread-safety, Property 3c: Mixed Read/Write Operations Are Safe** + /// *For any* combination of concurrent read and write operations, the cache should + /// handle the concurrent access without throwing exceptions. + /// **Validates: Requirements 2.3** + /// + [Theory] + [InlineData(4)] + [InlineData(8)] + [InlineData(10)] + public void LRUCacheThreadSafety_MixedReadWrite_ShouldBeSafe(int concurrencyLevel) + { + int halfConcurrency = concurrencyLevel / 2; + + var cache = new LRUCacheWrapper(50); + var exceptions = new ConcurrentBag(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + // Half threads write, half threads read + for (int i = 0; i < concurrencyLevel; i++) + { + int threadIndex = i; + bool isWriter = threadIndex < halfConcurrency; + + tasks[i] = Task.Run(() => + { + try + { + barrier.SignalAndWait(); + + for (int j = 0; j < 100; j++) + { + int key = j % 30; // Shared key space + + if (isWriter) + { + cache.Set(key, $"value_{threadIndex}_{j}"); + } + else + { + cache.TryGet(key, out _); + } + } + } + catch (Exception ex) + { + exceptions.Add(ex); + } + }); + } + + Task.WaitAll(tasks); + + Assert.Empty(exceptions); + } + + /// + /// **Feature: idempotency-thread-safety, Property 3d: Eviction Under Load Is Safe** + /// *For any* set of concurrent writes that exceed cache capacity, the eviction + /// mechanism should work correctly without corruption. + /// **Validates: Requirements 2.4** + /// + [Theory] + [InlineData(2)] + [InlineData(5)] + [InlineData(10)] + public void LRUCacheThreadSafety_EvictionUnderLoad_ShouldBeSafe(int concurrencyLevel) + { + int cacheCapacity = 20; + int keysPerThread = 50; // More keys than capacity to force eviction + + var cache = new LRUCacheWrapper(cacheCapacity); + var exceptions = new ConcurrentBag(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int threadIndex = i; + tasks[i] = Task.Run(() => + { + try + { + barrier.SignalAndWait(); + + for (int j = 0; j < keysPerThread; j++) + { + string key = $"key_{threadIndex}_{j}"; + cache.Set(key, threadIndex * 1000 + j); + } + } + catch (Exception ex) + { + exceptions.Add(ex); + } + }); + } + + Task.WaitAll(tasks); + + Assert.Empty(exceptions); + + // Verify cache count doesn't exceed capacity + Assert.True(cache.Count <= cacheCapacity, + $"Cache count {cache.Count} exceeds capacity {cacheCapacity}"); + } + + #endregion + + #region Additional Stress Tests + + /// + /// High concurrency stress test for LRU cache operations. + /// + [Theory] + [InlineData(20, 200)] + [InlineData(30, 100)] + public void LRUCacheThreadSafety_HighConcurrency_ShouldCompleteWithoutExceptions(int concurrencyLevel, int operationsPerThread) + { + var cache = new LRUCacheWrapper(100); + var exceptions = new ConcurrentBag(); + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int threadIndex = i; + tasks[i] = Task.Run(() => + { + try + { + barrier.SignalAndWait(); + + for (int op = 0; op < operationsPerThread; op++) + { + int opType = (threadIndex + op) % 3; + string key = $"key_{op % 50}"; + + switch (opType) + { + case 0: + cache.Set(key, $"value_{threadIndex}_{op}"); + break; + case 1: + cache.TryGet(key, out _); + break; + case 2: + cache.Delete(key); + break; + } + } + } + catch (Exception ex) + { + exceptions.Add(ex); + } + }); + } + + Task.WaitAll(tasks); + + Assert.Empty(exceptions); + } + + /// + /// Test that verifies LRU eviction order is maintained under concurrent access. + /// + [Fact] + public void LRUCacheThreadSafety_EvictionOrder_ShouldMaintainLRUProperty() + { + var cache = new LRUCacheWrapper(5); + + // Add 5 items + for (int i = 0; i < 5; i++) + { + cache.Set(i, $"value_{i}"); + } + + // Access item 0 to make it most recently used + cache.TryGet(0, out _); + + // Add a new item, which should evict item 1 (least recently used) + cache.Set(5, "value_5"); + + // Item 0 should still exist (was accessed) + Assert.True(cache.TryGet(0, out var value0)); + Assert.Equal("value_0", value0); + + // Item 1 should be evicted + Assert.False(cache.TryGet(1, out _)); + + // Item 5 should exist + Assert.True(cache.TryGet(5, out var value5)); + Assert.Equal("value_5", value5); + } + + #endregion +} diff --git a/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/LambdaContextIsolationTests.cs b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/LambdaContextIsolationTests.cs new file mode 100644 index 000000000..d83de5fb9 --- /dev/null +++ b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/LambdaContextIsolationTests.cs @@ -0,0 +1,325 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +using Amazon.Lambda.Core; +using Amazon.Lambda.TestUtilities; +using Xunit; +using IdempotencyLib = AWS.Lambda.Powertools.Idempotency; + +namespace AWS.Lambda.Powertools.ConcurrencyTests.Idempotency; + +/// +/// Tests for validating LambdaContext isolation in Powertools Idempotency +/// under concurrent execution scenarios. +/// +/// These tests verify that when multiple Lambda invocations run concurrently, +/// each invocation's LambdaContext remains isolated from other invocations. +/// +/// The Idempotency implementation uses AsyncLocal storage to ensure +/// isolation between concurrent Lambda invocations. +/// +[Collection("Idempotency Tests")] +public class LambdaContextIsolationTests : IDisposable +{ + public LambdaContextIsolationTests() + { + // Configure Idempotency with a thread-safe in-memory store for testing + IdempotencyLib.Idempotency.Configure(builder => builder + .WithPersistenceStore(new ThreadSafeInMemoryPersistenceStore())); + } + + public void Dispose() + { + // Clean up after tests + } + + #region Helper Result Classes + + private class ContextIsolationResult + { + public string InvocationId { get; set; } = string.Empty; + public int InvocationIndex { get; set; } + public string ExpectedFunctionName { get; set; } = string.Empty; + public string ActualFunctionName { get; set; } = string.Empty; + public bool ContextMatched { get; set; } + public bool ExceptionThrown { get; set; } + public string? ExceptionMessage { get; set; } + } + + private class AsyncContextResult + { + public string InvocationId { get; set; } = string.Empty; + public string ExpectedFunctionName { get; set; } = string.Empty; + public string FunctionNameBeforeAwait { get; set; } = string.Empty; + public string FunctionNameAfterAwait { get; set; } = string.Empty; + public bool ContextPreservedAcrossAwait { get; set; } + public bool ExceptionThrown { get; set; } + public string? ExceptionMessage { get; set; } + } + + #endregion + + #region Property 2: LambdaContext Isolation + + /// + /// **Feature: idempotency-thread-safety, Property 2: LambdaContext Isolation** + /// *For any* set of concurrent invocations registering different LambdaContext instances, + /// each invocation should be able to retrieve its own context without interference from other invocations. + /// **Validates: Requirements 1.2** + /// + [Theory] + [InlineData(2)] + [InlineData(3)] + [InlineData(5)] + [InlineData(10)] + [InlineData(20)] + public void LambdaContextIsolation_ConcurrentInvocations_ShouldMaintainSeparateContexts( + int concurrencyLevel) + { + var results = new ContextIsolationResult[concurrencyLevel]; + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int invocationIndex = i; + + tasks[i] = Task.Run(() => + { + var invocationId = Guid.NewGuid().ToString("N"); + var expectedFunctionName = $"Function_{invocationIndex}_{invocationId}"; + + var result = new ContextIsolationResult + { + InvocationId = invocationId, + InvocationIndex = invocationIndex, + ExpectedFunctionName = expectedFunctionName + }; + + try + { + // Create a unique context for this invocation + var context = new TestLambdaContext + { + FunctionName = expectedFunctionName, + AwsRequestId = invocationId, + RemainingTime = TimeSpan.FromMinutes(5) + }; + + // Wait for all threads to be ready + barrier.SignalAndWait(); + + // Register the context + IdempotencyLib.Idempotency.RegisterLambdaContext(context); + + // Small delay to allow other threads to potentially interfere + Thread.Sleep(Random.Shared.Next(1, 10)); + + // Retrieve the context and verify it's the one we registered + var retrievedContext = IdempotencyLib.Idempotency.Instance.LambdaContext; + result.ActualFunctionName = retrievedContext?.FunctionName ?? "null"; + result.ContextMatched = retrievedContext?.FunctionName == expectedFunctionName; + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionMessage = $"{ex.GetType().Name}: {ex.Message}"; + } + + results[invocationIndex] = result; + }); + } + + Task.WaitAll(tasks); + + // Verify no exceptions were thrown + Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage)); + + // Verify each invocation retrieved its own context + Assert.All(results, r => Assert.True(r.ContextMatched, + $"Invocation {r.InvocationIndex} expected '{r.ExpectedFunctionName}' but got '{r.ActualFunctionName}'")); + } + + /// + /// **Feature: idempotency-thread-safety, Property 2b: LambdaContext Isolation with Multiple Reads** + /// *For any* set of concurrent invocations, multiple reads of the LambdaContext within the same + /// invocation should consistently return the same context. + /// **Validates: Requirements 1.2** + /// + [Theory] + [InlineData(2, 5)] + [InlineData(5, 10)] + [InlineData(10, 3)] + public void LambdaContextIsolation_MultipleReads_ShouldReturnConsistentContext( + int concurrencyLevel, int readsPerInvocation) + { + var results = new List[concurrencyLevel]; + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int invocationIndex = i; + results[invocationIndex] = new List(); + + tasks[i] = Task.Run(() => + { + var invocationId = Guid.NewGuid().ToString("N"); + var expectedFunctionName = $"Function_{invocationIndex}_{invocationId}"; + + try + { + var context = new TestLambdaContext + { + FunctionName = expectedFunctionName, + AwsRequestId = invocationId, + RemainingTime = TimeSpan.FromMinutes(5) + }; + + barrier.SignalAndWait(); + + IdempotencyLib.Idempotency.RegisterLambdaContext(context); + + // Perform multiple reads with small delays + for (int r = 0; r < readsPerInvocation; r++) + { + Thread.Sleep(Random.Shared.Next(1, 5)); + + var retrievedContext = IdempotencyLib.Idempotency.Instance.LambdaContext; + var result = new ContextIsolationResult + { + InvocationId = invocationId, + InvocationIndex = invocationIndex, + ExpectedFunctionName = expectedFunctionName, + ActualFunctionName = retrievedContext?.FunctionName ?? "null", + ContextMatched = retrievedContext?.FunctionName == expectedFunctionName + }; + + lock (results[invocationIndex]) + { + results[invocationIndex].Add(result); + } + } + } + catch (Exception ex) + { + lock (results[invocationIndex]) + { + results[invocationIndex].Add(new ContextIsolationResult + { + InvocationId = invocationId, + InvocationIndex = invocationIndex, + ExceptionThrown = true, + ExceptionMessage = $"{ex.GetType().Name}: {ex.Message}" + }); + } + } + }); + } + + Task.WaitAll(tasks); + + // Verify all reads returned the correct context + foreach (var invocationResults in results) + { + Assert.All(invocationResults, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage)); + Assert.All(invocationResults, r => Assert.True(r.ContextMatched, + $"Invocation {r.InvocationIndex} expected '{r.ExpectedFunctionName}' but got '{r.ActualFunctionName}'")); + } + } + + /// + /// **Feature: idempotency-thread-safety, Property 2c: LambdaContext Isolation Across Async Boundaries** + /// *For any* invocation with async operations, the LambdaContext should be preserved + /// across await points. + /// **Validates: Requirements 1.2** + /// + [Theory] + [InlineData(2)] + [InlineData(5)] + [InlineData(10)] + public async Task LambdaContextIsolation_AsyncOperations_ShouldPreserveContextAcrossAwait( + int concurrencyLevel) + { + var results = new AsyncContextResult[concurrencyLevel]; + var barrier = new Barrier(concurrencyLevel); + var tasks = new Task[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + { + int invocationIndex = i; + + tasks[i] = Task.Run(async () => + { + var invocationId = Guid.NewGuid().ToString("N"); + var expectedFunctionName = $"AsyncFunction_{invocationIndex}_{invocationId}"; + + var result = new AsyncContextResult + { + InvocationId = invocationId, + ExpectedFunctionName = expectedFunctionName + }; + + try + { + var context = new TestLambdaContext + { + FunctionName = expectedFunctionName, + AwsRequestId = invocationId, + RemainingTime = TimeSpan.FromMinutes(5) + }; + + barrier.SignalAndWait(); + + IdempotencyLib.Idempotency.RegisterLambdaContext(context); + + // Read before await + var contextBeforeAwait = IdempotencyLib.Idempotency.Instance.LambdaContext; + result.FunctionNameBeforeAwait = contextBeforeAwait?.FunctionName ?? "null"; + + // Simulate async operation + await Task.Delay(Random.Shared.Next(10, 50)); + + // Read after await + var contextAfterAwait = IdempotencyLib.Idempotency.Instance.LambdaContext; + result.FunctionNameAfterAwait = contextAfterAwait?.FunctionName ?? "null"; + + result.ContextPreservedAcrossAwait = + result.FunctionNameBeforeAwait == expectedFunctionName && + result.FunctionNameAfterAwait == expectedFunctionName; + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionMessage = $"{ex.GetType().Name}: {ex.Message}"; + } + + results[invocationIndex] = result; + }); + } + + await Task.WhenAll(tasks); + + // Verify no exceptions were thrown + Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage)); + + // Verify context was preserved across await + Assert.All(results, r => Assert.True(r.ContextPreservedAcrossAwait, + $"Context not preserved: expected '{r.ExpectedFunctionName}', " + + $"before await: '{r.FunctionNameBeforeAwait}', after await: '{r.FunctionNameAfterAwait}'")); + } + + #endregion +} diff --git a/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/ThreadSafeInMemoryPersistenceStore.cs b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/ThreadSafeInMemoryPersistenceStore.cs new file mode 100644 index 000000000..e9ba2004d --- /dev/null +++ b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/ThreadSafeInMemoryPersistenceStore.cs @@ -0,0 +1,183 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +using System.Collections.Concurrent; +using AWS.Lambda.Powertools.Idempotency.Persistence; + +namespace AWS.Lambda.Powertools.ConcurrencyTests.Idempotency; + +/// +/// A thread-safe in-memory persistence store for concurrency testing. +/// This implementation wraps all dictionary operations with proper synchronization +/// to ensure safe concurrent access from multiple threads. +/// +/// This store is designed for testing purposes only and should not be used in production. +/// +public class ThreadSafeInMemoryPersistenceStore : BasePersistenceStore +{ + /// + /// Thread-safe dictionary to store idempotency records. + /// Using ConcurrentDictionary for atomic operations. + /// + private readonly ConcurrentDictionary _records = new(); + + /// + /// Lock object for operations that require multiple steps to be atomic. + /// + private readonly object _lockObj = new(); + + /// + /// Counter for tracking the number of GetRecord operations (for testing purposes). + /// + private int _getRecordCount; + + /// + /// Counter for tracking the number of PutRecord operations (for testing purposes). + /// + private int _putRecordCount; + + /// + /// Counter for tracking the number of UpdateRecord operations (for testing purposes). + /// + private int _updateRecordCount; + + /// + /// Counter for tracking the number of DeleteRecord operations (for testing purposes). + /// + private int _deleteRecordCount; + + /// + /// Gets the number of GetRecord operations performed. + /// + public int GetRecordCount => _getRecordCount; + + /// + /// Gets the number of PutRecord operations performed. + /// + public int PutRecordCount => _putRecordCount; + + /// + /// Gets the number of UpdateRecord operations performed. + /// + public int UpdateRecordCount => _updateRecordCount; + + /// + /// Gets the number of DeleteRecord operations performed. + /// + public int DeleteRecordCount => _deleteRecordCount; + + /// + /// Gets the current number of records in the store. + /// + public int RecordCount => _records.Count; + + /// + public override Task GetRecord(string idempotencyKey) + { + Interlocked.Increment(ref _getRecordCount); + + _records.TryGetValue(idempotencyKey, out var record); + return Task.FromResult(record); + } + + /// + public override Task PutRecord(DataRecord record, DateTimeOffset now) + { + Interlocked.Increment(ref _putRecordCount); + + // Use AddOrUpdate to handle concurrent puts atomically + _records.AddOrUpdate( + record.IdempotencyKey, + record, + (_, _) => record); + + return Task.CompletedTask; + } + + /// + public override Task UpdateRecord(DataRecord record) + { + Interlocked.Increment(ref _updateRecordCount); + + // Use AddOrUpdate to handle concurrent updates atomically + _records.AddOrUpdate( + record.IdempotencyKey, + record, + (_, _) => record); + + return Task.CompletedTask; + } + + /// + public override Task DeleteRecord(string idempotencyKey) + { + Interlocked.Increment(ref _deleteRecordCount); + + _records.TryRemove(idempotencyKey, out _); + return Task.CompletedTask; + } + + /// + /// Clears all records from the store. + /// Thread-safe operation. + /// + public void Clear() + { + _records.Clear(); + } + + /// + /// Resets all operation counters to zero. + /// Thread-safe operation. + /// + public void ResetCounters() + { + Interlocked.Exchange(ref _getRecordCount, 0); + Interlocked.Exchange(ref _putRecordCount, 0); + Interlocked.Exchange(ref _updateRecordCount, 0); + Interlocked.Exchange(ref _deleteRecordCount, 0); + } + + /// + /// Gets all records currently in the store. + /// Returns a snapshot of the records at the time of the call. + /// + /// A dictionary containing all records. + public IDictionary GetAllRecords() + { + return new Dictionary(_records); + } + + /// + /// Checks if a record exists for the given idempotency key. + /// + /// The idempotency key to check. + /// True if a record exists, false otherwise. + public bool ContainsKey(string idempotencyKey) + { + return _records.ContainsKey(idempotencyKey); + } + + /// + /// Tries to get a record for the given idempotency key. + /// + /// The idempotency key to look up. + /// The record if found, null otherwise. + /// True if the record was found, false otherwise. + public bool TryGetRecord(string idempotencyKey, out DataRecord? record) + { + return _records.TryGetValue(idempotencyKey, out record); + } +} diff --git a/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/BasePersistenceStoreTests.cs b/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/BasePersistenceStoreTests.cs index 1afe2824e..5e4cd9f94 100644 --- a/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/BasePersistenceStoreTests.cs +++ b/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/BasePersistenceStoreTests.cs @@ -592,4 +592,261 @@ public async Task ProcessExistingRecord_WhenValidRecord_ShouldReturnRecordAndSav cache.TryGet("testFunction#5eff007a9ed2789a9f9f6bc182fc6ae6", out var cachedRecord).Should().BeTrue(); cachedRecord.Should().Be(existingRecord); } + + #region Configure Code Coverage Tests + + [Fact] + public async Task Configure_WhenUseLocalCacheIsFalse_ShouldNotCreateCache() + { + // Arrange + var persistenceStore = new InMemoryPersistenceStore(); + var request = LoadApiGatewayProxyRequest(); + + // Configure with UseLocalCache = false (default) + persistenceStore.Configure(new IdempotencyOptionsBuilder() + .WithUseLocalCache(false) + .Build(), null, null); + + var now = DateTimeOffset.UtcNow; + + // Act - SaveSuccess should work without cache + var product = new Product(34543, "product", 42); + await persistenceStore.SaveSuccess(JsonSerializer.SerializeToDocument(request)!, product, now); + + // Assert - Record should be saved to persistence store + var dr = persistenceStore.DataRecord; + dr.Status.Should().Be(DataRecord.DataRecordStatus.COMPLETED); + dr.IdempotencyKey.Should().Be("testFunction#5eff007a9ed2789a9f9f6bc182fc6ae6"); + persistenceStore.Status.Should().Be(2); + } + + [Fact] + public async Task Configure_WhenUseLocalCacheIsTrue_ShouldCreateCacheWithPublicMethod() + { + // Arrange - This test covers the positive path: if (useLocalCache) { _cache = new LRUCache... } + // Using the PUBLIC Configure method (not the internal one with cache parameter) + var persistenceStore = new InMemoryPersistenceStore(); + var request = LoadApiGatewayProxyRequest(); + + // Configure with UseLocalCache = true using the PUBLIC method + persistenceStore.Configure(new IdempotencyOptionsBuilder() + .WithUseLocalCache(true) + .Build(), null, null); + + var now = DateTimeOffset.UtcNow; + + // Act - SaveSuccess should save to the internally created cache + var product = new Product(34543, "product", 42); + await persistenceStore.SaveSuccess(JsonSerializer.SerializeToDocument(request)!, product, now); + + // Assert - Record should be saved to persistence store + var dr = persistenceStore.DataRecord; + dr.Status.Should().Be(DataRecord.DataRecordStatus.COMPLETED); + persistenceStore.Status.Should().Be(2); + + // Verify cache is working by getting the record (should come from cache, not persistence) + var record = await persistenceStore.GetRecord(JsonSerializer.SerializeToDocument(request)!, now); + record.Status.Should().Be(DataRecord.DataRecordStatus.COMPLETED); + // Status should still be 2 (not 0) because record came from cache, not from GetRecord override + persistenceStore.Status.Should().Be(2); + } + + [Fact] + public async Task Configure_WhenKeyPrefixIsSet_ShouldUsePrefixAsFunctionName() + { + // Arrange - This test covers the positive path: if (!string.IsNullOrEmpty(keyPrefix)) { _functionName = keyPrefix; } + var persistenceStore = new InMemoryPersistenceStore(); + var request = LoadApiGatewayProxyRequest(); + + // Configure with a non-empty keyPrefix + persistenceStore.Configure(new IdempotencyOptionsBuilder().Build(), "ignoredFunctionName", "MyKeyPrefix"); + + var now = DateTimeOffset.UtcNow; + + // Act + await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, null); + + // Assert - IdempotencyKey should use the keyPrefix, not the functionName + var dr = persistenceStore.DataRecord; + dr.IdempotencyKey.Should().StartWith("MyKeyPrefix#"); + dr.IdempotencyKey.Should().NotContain("ignoredFunctionName"); + } + + [Fact] + public async Task Configure_WhenPayloadValidationJmesPathIsSet_ShouldEnablePayloadValidation() + { + // Arrange - This test covers the positive path: if (!string.IsNullOrWhiteSpace(...PayloadValidationJmesPath)) { PayloadValidationEnabled = true; } + var persistenceStore = new InMemoryPersistenceStore(); + var request = LoadApiGatewayProxyRequest(); + + // Configure with a valid PayloadValidationJmesPath + persistenceStore.Configure(new IdempotencyOptionsBuilder() + .WithEventKeyJmesPath("powertools_json(Body).id") + .WithPayloadValidationJmesPath("powertools_json(Body).message") + .Build(), "myfunc", null); + + var now = DateTimeOffset.UtcNow; + + // Act + await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, null); + + // Assert - PayloadHash should NOT be empty when validation IS enabled + var dr = persistenceStore.DataRecord; + dr.PayloadHash.Should().NotBeEmpty(); + // The hash should be the MD5 of "Lambda rocks" (the message in the test payload) + dr.PayloadHash.Should().Be("70c24d88041893f7fbab4105b76fd9e1"); + } + + [Fact] + public async Task Configure_WhenKeyPrefixIsNull_ShouldUseFunctionNameFromEnvironment() + { + // Arrange + var persistenceStore = new InMemoryPersistenceStore(); + var request = LoadApiGatewayProxyRequest(); + + // Configure with null keyPrefix - should use default function name + persistenceStore.Configure(new IdempotencyOptionsBuilder().Build(), "myFunction", null); + + var now = DateTimeOffset.UtcNow; + + // Act + await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, null); + + // Assert - IdempotencyKey should include the function name + var dr = persistenceStore.DataRecord; + dr.IdempotencyKey.Should().Contain("myFunction"); + dr.IdempotencyKey.Should().Be("testFunction.myFunction#5eff007a9ed2789a9f9f6bc182fc6ae6"); + } + + [Fact] + public async Task Configure_WhenKeyPrefixIsEmpty_ShouldUseFunctionNameFromEnvironment() + { + // Arrange + var persistenceStore = new InMemoryPersistenceStore(); + var request = LoadApiGatewayProxyRequest(); + + // Configure with empty keyPrefix - should use default function name + persistenceStore.Configure(new IdempotencyOptionsBuilder().Build(), "anotherFunction", ""); + + var now = DateTimeOffset.UtcNow; + + // Act + await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, null); + + // Assert - IdempotencyKey should include the function name + var dr = persistenceStore.DataRecord; + dr.IdempotencyKey.Should().Contain("anotherFunction"); + dr.IdempotencyKey.Should().Be("testFunction.anotherFunction#5eff007a9ed2789a9f9f6bc182fc6ae6"); + } + + [Fact] + public async Task Configure_WhenPayloadValidationJmesPathIsNull_ShouldNotEnablePayloadValidation() + { + // Arrange + var persistenceStore = new InMemoryPersistenceStore(); + var request = LoadApiGatewayProxyRequest(); + + // Configure without PayloadValidationJmesPath + persistenceStore.Configure(new IdempotencyOptionsBuilder() + .WithEventKeyJmesPath("powertools_json(Body).id") + .Build(), "myfunc", null); + + var now = DateTimeOffset.UtcNow; + + // Act + await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, null); + + // Assert - PayloadHash should be empty when validation is not enabled + var dr = persistenceStore.DataRecord; + dr.PayloadHash.Should().BeEmpty(); + } + + [Fact] + public async Task Configure_WhenPayloadValidationJmesPathIsEmpty_ShouldNotEnablePayloadValidation() + { + // Arrange + var persistenceStore = new InMemoryPersistenceStore(); + var request = LoadApiGatewayProxyRequest(); + + // Configure with empty PayloadValidationJmesPath + persistenceStore.Configure(new IdempotencyOptionsBuilder() + .WithEventKeyJmesPath("powertools_json(Body).id") + .WithPayloadValidationJmesPath("") + .Build(), "myfunc", null); + + var now = DateTimeOffset.UtcNow; + + // Act + await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, null); + + // Assert - PayloadHash should be empty when validation is not enabled + var dr = persistenceStore.DataRecord; + dr.PayloadHash.Should().BeEmpty(); + } + + [Fact] + public async Task Configure_WhenPayloadValidationJmesPathIsWhitespace_ShouldNotEnablePayloadValidation() + { + // Arrange + var persistenceStore = new InMemoryPersistenceStore(); + var request = LoadApiGatewayProxyRequest(); + + // Configure with whitespace PayloadValidationJmesPath + persistenceStore.Configure(new IdempotencyOptionsBuilder() + .WithEventKeyJmesPath("powertools_json(Body).id") + .WithPayloadValidationJmesPath(" ") + .Build(), "myfunc", null); + + var now = DateTimeOffset.UtcNow; + + // Act + await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, null); + + // Assert - PayloadHash should be empty when validation is not enabled + var dr = persistenceStore.DataRecord; + dr.PayloadHash.Should().BeEmpty(); + } + + [Fact] + public async Task Configure_WhenFunctionNameIsNullAndKeyPrefixIsNull_ShouldUseDefaultFunctionName() + { + // Arrange + var persistenceStore = new InMemoryPersistenceStore(); + var request = LoadApiGatewayProxyRequest(); + + // Configure with both functionName and keyPrefix as null + persistenceStore.Configure(new IdempotencyOptionsBuilder().Build(), null, null); + + var now = DateTimeOffset.UtcNow; + + // Act + await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, null); + + // Assert - IdempotencyKey should use default "testFunction" + var dr = persistenceStore.DataRecord; + dr.IdempotencyKey.Should().StartWith("testFunction#"); + } + + [Fact] + public async Task Configure_WhenFunctionNameIsWhitespace_ShouldUseDefaultFunctionNameOnly() + { + // Arrange + var persistenceStore = new InMemoryPersistenceStore(); + var request = LoadApiGatewayProxyRequest(); + + // Configure with whitespace functionName + persistenceStore.Configure(new IdempotencyOptionsBuilder().Build(), " ", null); + + var now = DateTimeOffset.UtcNow; + + // Act + await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, null); + + // Assert - IdempotencyKey should use default "testFunction" without appending whitespace + var dr = persistenceStore.DataRecord; + dr.IdempotencyKey.Should().StartWith("testFunction#"); + dr.IdempotencyKey.Should().NotContain("testFunction. "); + } + + #endregion } \ No newline at end of file diff --git a/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/DynamoDBPersistenceStoreTests.cs b/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/DynamoDBPersistenceStoreTests.cs index 09b4c781d..1a3a8b39b 100644 --- a/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/DynamoDBPersistenceStoreTests.cs +++ b/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/DynamoDBPersistenceStoreTests.cs @@ -281,15 +281,21 @@ await _client.PutItemAsync(new PutItemRequest TableName = _tableName, Item = item }); - // enable payload validation - _dynamoDbPersistenceStore.Configure( + + // Create a new store instance with payload validation enabled + // (Configure is idempotent and thread-safe, so we need a fresh instance to change configuration) + var storeWithValidation = new DynamoDBPersistenceStoreBuilder() + .WithTableName(_tableName) + .WithDynamoDBClient(_client) + .Build(); + storeWithValidation.Configure( new IdempotencyOptionsBuilder().WithPayloadValidationJmesPath("path").Build(), null, null); // Act expiry = now.AddSeconds(3600).ToUnixTimeSeconds(); var record = new DataRecord("key", DataRecord.DataRecordStatus.COMPLETED, expiry, "Fake result", "hash"); - await _dynamoDbPersistenceStore.UpdateRecord(record); + await storeWithValidation.UpdateRecord(record); // Assert var itemInDb = (await _client.GetItemAsync(new GetItemRequest