diff --git a/libraries/src/AWS.Lambda.Powertools.Idempotency/Idempotency.cs b/libraries/src/AWS.Lambda.Powertools.Idempotency/Idempotency.cs
index 196767b5c..e35551577 100644
--- a/libraries/src/AWS.Lambda.Powertools.Idempotency/Idempotency.cs
+++ b/libraries/src/AWS.Lambda.Powertools.Idempotency/Idempotency.cs
@@ -1,5 +1,6 @@
using System;
using System.Text.Json.Serialization;
+using System.Threading;
using Amazon.Lambda.Core;
using AWS.Lambda.Powertools.Common;
using AWS.Lambda.Powertools.Idempotency.Internal.Serializers;
@@ -16,6 +17,12 @@ namespace AWS.Lambda.Powertools.Idempotency;
///
public sealed class Idempotency
{
+ ///
+ /// AsyncLocal storage for per-invocation LambdaContext.
+ /// This ensures each concurrent Lambda invocation has its own isolated context.
+ ///
+ private static readonly AsyncLocal _lambdaContext = new();
+
///
/// The general configurations for the idempotency
///
@@ -66,18 +73,25 @@ public static void Configure(Action configurationAction)
}
///
- /// Holds ILambdaContext
+ /// Holds ILambdaContext using AsyncLocal for per-invocation isolation.
+ /// Each concurrent Lambda invocation will have its own isolated context.
///
- public ILambdaContext LambdaContext { get; private set; }
+ public ILambdaContext LambdaContext
+ {
+ get => _lambdaContext.Value;
+ private set => _lambdaContext.Value = value;
+ }
///
/// Can be used in a method which is not the handler to capture the Lambda context,
/// to calculate the remaining time before the invocation times out.
+ /// This method is thread-safe and stores the context in AsyncLocal storage,
+ /// ensuring isolation between concurrent Lambda invocations.
///
- ///
+ /// The Lambda context for the current invocation
public static void RegisterLambdaContext(ILambdaContext context)
{
- Instance.LambdaContext = context;
+ _lambdaContext.Value = context;
}
///
diff --git a/libraries/src/AWS.Lambda.Powertools.Idempotency/InternalsVisibleTo.cs b/libraries/src/AWS.Lambda.Powertools.Idempotency/InternalsVisibleTo.cs
index 3d73602cf..e0ccd4b0a 100644
--- a/libraries/src/AWS.Lambda.Powertools.Idempotency/InternalsVisibleTo.cs
+++ b/libraries/src/AWS.Lambda.Powertools.Idempotency/InternalsVisibleTo.cs
@@ -15,4 +15,5 @@
using System.Runtime.CompilerServices;
-[assembly: InternalsVisibleTo("AWS.Lambda.Powertools.Idempotency.Tests")]
\ No newline at end of file
+[assembly: InternalsVisibleTo("AWS.Lambda.Powertools.Idempotency.Tests")]
+[assembly: InternalsVisibleTo("AWS.Lambda.Powertools.ConcurrencyTests")]
\ No newline at end of file
diff --git a/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/BasePersistenceStore.cs b/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/BasePersistenceStore.cs
index 603f981e9..14918c767 100644
--- a/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/BasePersistenceStore.cs
+++ b/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/BasePersistenceStore.cs
@@ -18,6 +18,16 @@ namespace AWS.Lambda.Powertools.Idempotency.Persistence;
///
public abstract class BasePersistenceStore : IPersistenceStore
{
+ ///
+ /// Lock object for thread-safe configuration
+ ///
+ private readonly object _configureLock = new object();
+
+ ///
+ /// Flag indicating whether the store has been configured
+ ///
+ private volatile bool _isConfigured;
+
///
/// Idempotency Options
///
@@ -39,50 +49,95 @@ public abstract class BasePersistenceStore : IPersistenceStore
private LRUCache _cache = null!;
///
- /// Initialize the base persistence layer from the configuration settings
+ /// Initialize the base persistence layer from the configuration settings.
+ /// This method is thread-safe and idempotent - multiple calls with the same parameters are safe.
///
/// Idempotency configuration settings
/// The name of the function being decorated
///
public void Configure(IdempotencyOptions idempotencyOptions, string functionName, string keyPrefix)
{
- if (!string.IsNullOrEmpty(keyPrefix))
+ // Fast path - already configured
+ if (_isConfigured) return;
+
+ lock (_configureLock)
{
- _functionName = keyPrefix;
- }
- else
- {
- var funcEnv = Environment.GetEnvironmentVariable(Constants.LambdaFunctionNameEnv);
-
- _functionName = funcEnv ?? "testFunction";
- if (!string.IsNullOrWhiteSpace(functionName))
+ // Double-check pattern
+ if (_isConfigured) return;
+
+ if (!string.IsNullOrEmpty(keyPrefix))
{
- _functionName += "." + functionName;
+ _functionName = keyPrefix;
}
- }
+ else
+ {
+ var funcEnv = Environment.GetEnvironmentVariable(Constants.LambdaFunctionNameEnv);
- _idempotencyOptions = idempotencyOptions;
+ _functionName = funcEnv ?? "testFunction";
+ if (!string.IsNullOrWhiteSpace(functionName))
+ {
+ _functionName += "." + functionName;
+ }
+ }
- if (!string.IsNullOrWhiteSpace(_idempotencyOptions.PayloadValidationJmesPath))
- {
- PayloadValidationEnabled = true;
- }
+ _idempotencyOptions = idempotencyOptions;
- var useLocalCache = _idempotencyOptions.UseLocalCache;
- if (useLocalCache)
- {
- _cache = new LRUCache(_idempotencyOptions.LocalCacheMaxItems);
+ if (!string.IsNullOrWhiteSpace(_idempotencyOptions.PayloadValidationJmesPath))
+ {
+ PayloadValidationEnabled = true;
+ }
+
+ var useLocalCache = _idempotencyOptions.UseLocalCache;
+ if (useLocalCache)
+ {
+ _cache = new LRUCache(_idempotencyOptions.LocalCacheMaxItems);
+ }
+
+ _isConfigured = true;
}
}
///
- /// For test purpose only (adding a cache to mock)
+ /// For test purpose only (adding a cache to mock).
+ /// This method is thread-safe and idempotent.
///
internal void Configure(IdempotencyOptions options, string functionName, string keyPrefix,
LRUCache cache)
{
- Configure(options, functionName, keyPrefix);
- _cache = cache;
+ // Fast path - already configured
+ if (_isConfigured) return;
+
+ lock (_configureLock)
+ {
+ // Double-check pattern
+ if (_isConfigured) return;
+
+ if (!string.IsNullOrEmpty(keyPrefix))
+ {
+ _functionName = keyPrefix;
+ }
+ else
+ {
+ var funcEnv = Environment.GetEnvironmentVariable(Constants.LambdaFunctionNameEnv);
+
+ _functionName = funcEnv ?? "testFunction";
+ if (!string.IsNullOrWhiteSpace(functionName))
+ {
+ _functionName += "." + functionName;
+ }
+ }
+
+ _idempotencyOptions = options;
+
+ if (!string.IsNullOrWhiteSpace(_idempotencyOptions.PayloadValidationJmesPath))
+ {
+ PayloadValidationEnabled = true;
+ }
+
+ _cache = cache;
+
+ _isConfigured = true;
+ }
}
///
diff --git a/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/AWS.Lambda.Powertools.ConcurrencyTests.csproj b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/AWS.Lambda.Powertools.ConcurrencyTests.csproj
index 50d191a83..63fc33c03 100644
--- a/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/AWS.Lambda.Powertools.ConcurrencyTests.csproj
+++ b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/AWS.Lambda.Powertools.ConcurrencyTests.csproj
@@ -21,9 +21,11 @@
all
runtime; build; native; contentfiles; analyzers; buildtransitive
+
+
diff --git a/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/ConfigurationThreadSafetyTests.cs b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/ConfigurationThreadSafetyTests.cs
new file mode 100644
index 000000000..0fc92cc54
--- /dev/null
+++ b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/ConfigurationThreadSafetyTests.cs
@@ -0,0 +1,257 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+using Xunit;
+
+namespace AWS.Lambda.Powertools.ConcurrencyTests.Idempotency;
+
+///
+/// Tests for validating thread-safe configuration in BasePersistenceStore
+/// under concurrent execution scenarios.
+///
+/// These tests verify that when multiple threads attempt to configure
+/// the persistence store simultaneously, the configuration completes
+/// without exceptions and the resulting state is consistent.
+///
+[Collection("Idempotency Configuration Tests")]
+public class ConfigurationThreadSafetyTests
+{
+ #region Helper Classes
+
+ private class ConfigurationResult
+ {
+ public int ThreadIndex { get; set; }
+ public bool Success { get; set; }
+ public bool ExceptionThrown { get; set; }
+ public string? ExceptionMessage { get; set; }
+ public string? ExceptionType { get; set; }
+ }
+
+ #endregion
+
+ #region Property 1: Configuration Thread Safety
+
+ ///
+ /// **Feature: idempotency-thread-safety, Property 1: Configuration Thread Safety**
+ /// *For any* number of concurrent invocations calling Configure() simultaneously,
+ /// the configuration should complete without exceptions and the resulting configuration
+ /// should be consistent (not corrupted by interleaved operations).
+ /// **Validates: Requirements 1.1**
+ ///
+ [Theory]
+ [InlineData(2)]
+ [InlineData(5)]
+ [InlineData(8)]
+ [InlineData(10)]
+ public void ConfigurationThreadSafety_ConcurrentConfigure_ShouldCompleteWithoutExceptions(int concurrencyLevel)
+ {
+ // Create a fresh persistence store for each test run
+ var store = new ThreadSafeInMemoryPersistenceStore();
+ var results = new ConfigurationResult[concurrencyLevel];
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ // Create idempotency options
+ var options = new AWS.Lambda.Powertools.Idempotency.IdempotencyOptionsBuilder()
+ .WithEventKeyJmesPath("id")
+ .WithExpiration(TimeSpan.FromMinutes(5))
+ .Build();
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int threadIndex = i;
+ tasks[i] = Task.Run(() =>
+ {
+ var result = new ConfigurationResult { ThreadIndex = threadIndex };
+
+ try
+ {
+ // Wait for all threads to be ready
+ barrier.SignalAndWait();
+
+ // All threads attempt to configure simultaneously
+ store.Configure(options, $"Function_{threadIndex}", null);
+
+ result.Success = true;
+ }
+ catch (Exception ex)
+ {
+ result.ExceptionThrown = true;
+ result.ExceptionType = ex.GetType().Name;
+ result.ExceptionMessage = ex.Message;
+ }
+
+ results[threadIndex] = result;
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ // All threads should complete without exceptions
+ Assert.All(results, r => Assert.True(r.Success,
+ $"Thread {r.ThreadIndex} failed: {r.ExceptionType}: {r.ExceptionMessage}"));
+ Assert.All(results, r => Assert.False(r.ExceptionThrown,
+ $"Thread {r.ThreadIndex} threw exception: {r.ExceptionType}: {r.ExceptionMessage}"));
+ }
+
+ ///
+ /// **Feature: idempotency-thread-safety, Property 1b: Configuration Idempotency**
+ /// *For any* persistence store, calling Configure() multiple times with the same
+ /// parameters should be safe and idempotent (subsequent calls are no-ops).
+ /// **Validates: Requirements 1.1**
+ ///
+ [Theory]
+ [InlineData(2)]
+ [InlineData(3)]
+ [InlineData(5)]
+ public void ConfigurationIdempotency_MultipleCalls_ShouldBeNoOp(int numberOfCalls)
+ {
+ var store = new ThreadSafeInMemoryPersistenceStore();
+ var options = new AWS.Lambda.Powertools.Idempotency.IdempotencyOptionsBuilder()
+ .WithEventKeyJmesPath("id")
+ .Build();
+
+ var exceptions = new List();
+
+ // Call Configure multiple times sequentially
+ for (int i = 0; i < numberOfCalls; i++)
+ {
+ try
+ {
+ store.Configure(options, "TestFunction", null);
+ }
+ catch (Exception ex)
+ {
+ exceptions.Add(ex);
+ }
+ }
+
+ Assert.Empty(exceptions);
+ }
+
+ ///
+ /// **Feature: idempotency-thread-safety, Property 1c: Configuration Thread Safety with Different Parameters**
+ /// *For any* set of concurrent threads calling Configure() with different function names,
+ /// the first configuration should win and subsequent calls should be no-ops.
+ /// **Validates: Requirements 1.1, 1.3**
+ ///
+ [Theory]
+ [InlineData(2)]
+ [InlineData(5)]
+ [InlineData(8)]
+ public void ConfigurationThreadSafety_DifferentParameters_FirstConfigurationWins(int concurrencyLevel)
+ {
+ var store = new ThreadSafeInMemoryPersistenceStore();
+ var results = new ConfigurationResult[concurrencyLevel];
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int threadIndex = i;
+ var options = new AWS.Lambda.Powertools.Idempotency.IdempotencyOptionsBuilder()
+ .WithEventKeyJmesPath($"id_{threadIndex}")
+ .Build();
+
+ tasks[i] = Task.Run(() =>
+ {
+ var result = new ConfigurationResult { ThreadIndex = threadIndex };
+
+ try
+ {
+ barrier.SignalAndWait();
+
+ // Each thread tries to configure with different parameters
+ store.Configure(options, $"Function_{threadIndex}", null);
+
+ result.Success = true;
+ }
+ catch (Exception ex)
+ {
+ result.ExceptionThrown = true;
+ result.ExceptionType = ex.GetType().Name;
+ result.ExceptionMessage = ex.Message;
+ }
+
+ results[threadIndex] = result;
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ // All threads should complete without exceptions (even if their config was ignored)
+ Assert.All(results, r => Assert.True(r.Success,
+ $"Thread {r.ThreadIndex} failed: {r.ExceptionType}: {r.ExceptionMessage}"));
+ Assert.All(results, r => Assert.False(r.ExceptionThrown,
+ $"Thread {r.ThreadIndex} threw exception: {r.ExceptionType}: {r.ExceptionMessage}"));
+ }
+
+ #endregion
+
+ #region Additional Concurrency Tests
+
+ ///
+ /// Stress test for configuration thread safety with high concurrency.
+ /// This test uses a higher number of threads to stress test the locking mechanism.
+ ///
+ [Theory]
+ [InlineData(20)]
+ [InlineData(30)]
+ public void ConfigurationThreadSafety_HighConcurrency_ShouldCompleteWithoutExceptions(int concurrencyLevel)
+ {
+ var store = new ThreadSafeInMemoryPersistenceStore();
+ var results = new ConfigurationResult[concurrencyLevel];
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ var options = new AWS.Lambda.Powertools.Idempotency.IdempotencyOptionsBuilder()
+ .WithEventKeyJmesPath("id")
+ .Build();
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int threadIndex = i;
+ tasks[i] = Task.Run(() =>
+ {
+ var result = new ConfigurationResult { ThreadIndex = threadIndex };
+
+ try
+ {
+ barrier.SignalAndWait();
+ store.Configure(options, $"Function_{threadIndex}", null);
+ result.Success = true;
+ }
+ catch (Exception ex)
+ {
+ result.ExceptionThrown = true;
+ result.ExceptionType = ex.GetType().Name;
+ result.ExceptionMessage = ex.Message;
+ }
+
+ results[threadIndex] = result;
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ // Verify all threads completed without exceptions
+ Assert.All(results, r => Assert.True(r.Success,
+ $"Thread {r.ThreadIndex} failed: {r.ExceptionType}: {r.ExceptionMessage}"));
+ Assert.All(results, r => Assert.False(r.ExceptionThrown,
+ $"Thread {r.ThreadIndex} threw exception: {r.ExceptionType}: {r.ExceptionMessage}"));
+ }
+
+ #endregion
+}
diff --git a/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/IdempotencyAsyncContextTests.cs b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/IdempotencyAsyncContextTests.cs
new file mode 100644
index 000000000..3dc60685d
--- /dev/null
+++ b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/IdempotencyAsyncContextTests.cs
@@ -0,0 +1,476 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+using System.Collections.Concurrent;
+using System.Text.Json;
+using Amazon.Lambda.TestUtilities;
+using AWS.Lambda.Powertools.Idempotency.Internal;
+using Xunit;
+using IdempotencyLib = AWS.Lambda.Powertools.Idempotency;
+
+namespace AWS.Lambda.Powertools.ConcurrencyTests.Idempotency;
+
+///
+/// Tests for validating async context preservation in Powertools Idempotency.
+/// **Feature: idempotency-thread-safety, Property 7: Async Context Preservation**
+/// **Validates: Requirements 6.1, 6.2, 6.3**
+///
+[Collection("Idempotency Async Context Tests")]
+public class IdempotencyAsyncContextTests : IDisposable
+{
+ private readonly ThreadSafeInMemoryPersistenceStore _store;
+
+ public IdempotencyAsyncContextTests()
+ {
+ _store = new ThreadSafeInMemoryPersistenceStore();
+ IdempotencyLib.Idempotency.Configure(builder => builder
+ .WithPersistenceStore(_store)
+ .WithOptions(opt => opt
+ .WithEventKeyJmesPath("id")
+ .WithExpiration(TimeSpan.FromMinutes(5))));
+ }
+
+ public void Dispose()
+ {
+ _store.Clear();
+ _store.ResetCounters();
+ }
+
+ private class AsyncContextResult
+ {
+ public int InvocationIndex { get; set; }
+ public string InvocationId { get; set; } = string.Empty;
+ public string ExpectedFunctionName { get; set; } = string.Empty;
+ public List FunctionNamesAtAwaitPoints { get; set; } = new();
+ public bool ContextPreservedAcrossAllAwaits { get; set; }
+ public bool ExceptionThrown { get; set; }
+ public string? ExceptionType { get; set; }
+ public string? ExceptionMessage { get; set; }
+ }
+
+ private class HandlerAsyncResult
+ {
+ public int InvocationIndex { get; set; }
+ public string InvocationId { get; set; } = string.Empty;
+ public string ExpectedResult { get; set; } = string.Empty;
+ public string? ActualResult { get; set; }
+ public bool ResultMatched { get; set; }
+ public List ContextsAtAwaitPoints { get; set; } = new();
+ public bool ContextPreserved { get; set; }
+ public bool ExceptionThrown { get; set; }
+ public string? ExceptionType { get; set; }
+ public string? ExceptionMessage { get; set; }
+ }
+
+ private class TestRequest
+ {
+ [System.Text.Json.Serialization.JsonPropertyName("id")]
+ public string Id { get; set; } = string.Empty;
+ [System.Text.Json.Serialization.JsonPropertyName("data")]
+ public string Data { get; set; } = string.Empty;
+ }
+
+ private static JsonDocument CreatePayload(string id, string data)
+ {
+ var request = new TestRequest { Id = id, Data = data };
+ return JsonDocument.Parse(JsonSerializer.Serialize(request));
+ }
+
+ private static TestLambdaContext CreateTestContext(string functionName, string requestId)
+ {
+ return new TestLambdaContext
+ {
+ FunctionName = functionName,
+ AwsRequestId = requestId,
+ RemainingTime = TimeSpan.FromMinutes(5)
+ };
+ }
+
+ ///
+ /// **Feature: idempotency-thread-safety, Property 7: Async Context Preservation**
+ /// **Validates: Requirements 6.1, 6.2, 6.3**
+ ///
+ [Theory]
+ [InlineData(1)]
+ [InlineData(3)]
+ [InlineData(5)]
+ public void AsyncContextPreservation_AcrossAwaitPoints_ShouldPreserveContext(int awaitCount)
+ {
+ var invocationId = Guid.NewGuid().ToString("N");
+ var expectedFunctionName = $"AsyncFunction_{invocationId}";
+ var functionsAtAwaitPoints = new List();
+
+ var context = CreateTestContext(expectedFunctionName, invocationId);
+ IdempotencyLib.Idempotency.RegisterLambdaContext(context);
+
+ var contextBefore = IdempotencyLib.Idempotency.Instance.LambdaContext;
+ Assert.Equal(expectedFunctionName, contextBefore?.FunctionName);
+
+ var task = Task.Run(async () =>
+ {
+ for (int i = 0; i < awaitCount; i++)
+ {
+ await Task.Delay(Random.Shared.Next(1, 10));
+ var currentContext = IdempotencyLib.Idempotency.Instance.LambdaContext;
+ lock (functionsAtAwaitPoints)
+ {
+ functionsAtAwaitPoints.Add(currentContext?.FunctionName ?? "null");
+ }
+ }
+ });
+
+ task.Wait();
+
+ Assert.All(functionsAtAwaitPoints, fn => Assert.Equal(expectedFunctionName, fn));
+ }
+
+ [Theory]
+ [InlineData(2)]
+ [InlineData(5)]
+ [InlineData(10)]
+ public void AsyncContextPreservation_ConcurrentAsyncInvocations_ShouldMaintainIsolation(int concurrencyLevel)
+ {
+ var results = new ConcurrentBag();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int invocationIndex = i;
+ tasks[i] = Task.Run(async () =>
+ {
+ var invocationId = Guid.NewGuid().ToString("N");
+ var expectedFunctionName = $"ConcurrentAsync_{invocationIndex}_{invocationId}";
+
+ var result = new AsyncContextResult
+ {
+ InvocationIndex = invocationIndex,
+ InvocationId = invocationId,
+ ExpectedFunctionName = expectedFunctionName
+ };
+
+ try
+ {
+ var context = CreateTestContext(expectedFunctionName, invocationId);
+
+ barrier.SignalAndWait();
+
+ IdempotencyLib.Idempotency.RegisterLambdaContext(context);
+
+ for (int awaitPoint = 0; awaitPoint < 3; awaitPoint++)
+ {
+ await Task.Delay(Random.Shared.Next(5, 20));
+ var currentContext = IdempotencyLib.Idempotency.Instance.LambdaContext;
+ result.FunctionNamesAtAwaitPoints.Add(currentContext?.FunctionName ?? "null");
+ }
+
+ result.ContextPreservedAcrossAllAwaits =
+ result.FunctionNamesAtAwaitPoints.All(fn => fn == expectedFunctionName);
+ }
+ catch (Exception ex)
+ {
+ result.ExceptionThrown = true;
+ result.ExceptionType = ex.GetType().Name;
+ result.ExceptionMessage = ex.Message;
+ }
+
+ results.Add(result);
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage));
+ Assert.All(results, r => Assert.True(r.ContextPreservedAcrossAllAwaits,
+ $"Invocation {r.InvocationIndex} did not preserve context. Expected '{r.ExpectedFunctionName}', got: [{string.Join(", ", r.FunctionNamesAtAwaitPoints)}]"));
+ }
+
+ [Theory]
+ [InlineData(2)]
+ [InlineData(5)]
+ [InlineData(8)]
+ public void AsyncContextPreservation_InHandler_ShouldPreserveContext(int concurrencyLevel)
+ {
+ var results = new ConcurrentBag();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int invocationIndex = i;
+ tasks[i] = Task.Run(async () =>
+ {
+ var invocationId = Guid.NewGuid().ToString("N");
+ var expectedFunctionName = $"HandlerAsync_{invocationIndex}_{invocationId}";
+ var expectedResult = $"result_{invocationId}";
+
+ var result = new HandlerAsyncResult
+ {
+ InvocationIndex = invocationIndex,
+ InvocationId = invocationId,
+ ExpectedResult = expectedResult
+ };
+
+ try
+ {
+ var payload = CreatePayload(invocationId, $"async_data_{invocationIndex}");
+ var context = CreateTestContext(expectedFunctionName, invocationId);
+
+ IdempotencyLib.Idempotency.RegisterLambdaContext(context);
+
+ Func> targetFunc = async () =>
+ {
+ var ctx1 = IdempotencyLib.Idempotency.Instance.LambdaContext;
+ result.ContextsAtAwaitPoints.Add(ctx1?.FunctionName ?? "null");
+
+ await Task.Delay(Random.Shared.Next(5, 15));
+
+ var ctx2 = IdempotencyLib.Idempotency.Instance.LambdaContext;
+ result.ContextsAtAwaitPoints.Add(ctx2?.FunctionName ?? "null");
+
+ await Task.Delay(Random.Shared.Next(5, 15));
+
+ var ctx3 = IdempotencyLib.Idempotency.Instance.LambdaContext;
+ result.ContextsAtAwaitPoints.Add(ctx3?.FunctionName ?? "null");
+
+ return expectedResult;
+ };
+
+ barrier.SignalAndWait();
+
+ var handler = new IdempotencyAspectHandler(
+ targetFunc, $"HandlerAsyncFunction_{invocationIndex}", null, payload, context);
+
+ result.ActualResult = await handler.Handle();
+ result.ResultMatched = result.ActualResult == expectedResult;
+ result.ContextPreserved = result.ContextsAtAwaitPoints.All(fn => fn == expectedFunctionName);
+ }
+ catch (Exception ex)
+ {
+ result.ExceptionThrown = true;
+ result.ExceptionType = ex.GetType().Name;
+ result.ExceptionMessage = ex.Message;
+ }
+
+ results.Add(result);
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage));
+ Assert.All(results, r => Assert.True(r.ResultMatched, $"Expected '{r.ExpectedResult}' but got '{r.ActualResult}'"));
+ Assert.All(results, r => Assert.True(r.ContextPreserved,
+ $"Context not preserved: [{string.Join(", ", r.ContextsAtAwaitPoints)}]"));
+ }
+
+ [Theory]
+ [InlineData(2)]
+ [InlineData(5)]
+ [InlineData(10)]
+ public async Task AsyncContext_WithConfigureAwaitFalse_ShouldPreserveContext(int concurrencyLevel)
+ {
+ var results = new ConcurrentBag();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int invocationIndex = i;
+ tasks[i] = Task.Run(async () =>
+ {
+ var invocationId = Guid.NewGuid().ToString("N");
+ var expectedFunctionName = $"ConfigureAwaitFalse_{invocationIndex}_{invocationId}";
+
+ var result = new AsyncContextResult
+ {
+ InvocationIndex = invocationIndex,
+ InvocationId = invocationId,
+ ExpectedFunctionName = expectedFunctionName
+ };
+
+ try
+ {
+ var context = CreateTestContext(expectedFunctionName, invocationId);
+
+ barrier.SignalAndWait();
+
+ IdempotencyLib.Idempotency.RegisterLambdaContext(context);
+
+ var ctxBefore = IdempotencyLib.Idempotency.Instance.LambdaContext;
+ result.FunctionNamesAtAwaitPoints.Add(ctxBefore?.FunctionName ?? "null");
+
+ await Task.Delay(Random.Shared.Next(10, 30)).ConfigureAwait(false);
+
+ var ctxAfter = IdempotencyLib.Idempotency.Instance.LambdaContext;
+ result.FunctionNamesAtAwaitPoints.Add(ctxAfter?.FunctionName ?? "null");
+
+ await Task.Delay(Random.Shared.Next(10, 30)).ConfigureAwait(false);
+
+ var ctxFinal = IdempotencyLib.Idempotency.Instance.LambdaContext;
+ result.FunctionNamesAtAwaitPoints.Add(ctxFinal?.FunctionName ?? "null");
+
+ result.ContextPreservedAcrossAllAwaits =
+ result.FunctionNamesAtAwaitPoints.All(fn => fn == expectedFunctionName);
+ }
+ catch (Exception ex)
+ {
+ result.ExceptionThrown = true;
+ result.ExceptionType = ex.GetType().Name;
+ result.ExceptionMessage = ex.Message;
+ }
+
+ results.Add(result);
+ });
+ }
+
+ await Task.WhenAll(tasks);
+
+ Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage));
+ Assert.All(results, r => Assert.True(r.ContextPreservedAcrossAllAwaits,
+ $"Invocation {r.InvocationIndex} did not preserve context. Expected '{r.ExpectedFunctionName}', got: [{string.Join(", ", r.FunctionNamesAtAwaitPoints)}]"));
+ }
+
+ [Theory]
+ [InlineData(2, 2)]
+ [InlineData(5, 3)]
+ [InlineData(10, 2)]
+ public async Task AsyncContext_NestedAsyncOperations_ShouldPreserveContext(int concurrencyLevel, int nestingDepth)
+ {
+ var results = new ConcurrentBag();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int invocationIndex = i;
+ tasks[i] = Task.Run(async () =>
+ {
+ var invocationId = Guid.NewGuid().ToString("N");
+ var expectedFunctionName = $"NestedAsync_{invocationIndex}_{invocationId}";
+
+ var result = new AsyncContextResult
+ {
+ InvocationIndex = invocationIndex,
+ InvocationId = invocationId,
+ ExpectedFunctionName = expectedFunctionName
+ };
+
+ try
+ {
+ var context = CreateTestContext(expectedFunctionName, invocationId);
+
+ barrier.SignalAndWait();
+
+ IdempotencyLib.Idempotency.RegisterLambdaContext(context);
+
+ async Task NestedAsync(int depth)
+ {
+ if (depth <= 0) return;
+
+ await Task.Delay(Random.Shared.Next(1, 5));
+
+ var currentContext = IdempotencyLib.Idempotency.Instance.LambdaContext;
+ lock (result.FunctionNamesAtAwaitPoints)
+ {
+ result.FunctionNamesAtAwaitPoints.Add(currentContext?.FunctionName ?? "null");
+ }
+
+ await NestedAsync(depth - 1);
+ }
+
+ await NestedAsync(nestingDepth);
+
+ result.ContextPreservedAcrossAllAwaits =
+ result.FunctionNamesAtAwaitPoints.All(fn => fn == expectedFunctionName);
+ }
+ catch (Exception ex)
+ {
+ result.ExceptionThrown = true;
+ result.ExceptionType = ex.GetType().Name;
+ result.ExceptionMessage = ex.Message;
+ }
+
+ results.Add(result);
+ });
+ }
+
+ await Task.WhenAll(tasks);
+
+ Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage));
+ Assert.All(results, r => Assert.True(r.ContextPreservedAcrossAllAwaits,
+ $"Invocation {r.InvocationIndex} did not preserve context in nested async. Expected '{r.ExpectedFunctionName}', got: [{string.Join(", ", r.FunctionNamesAtAwaitPoints)}]"));
+ }
+
+ [Theory]
+ [InlineData(20)]
+ [InlineData(30)]
+ public async Task AsyncContext_HighConcurrency_ShouldPreserveContext(int concurrencyLevel)
+ {
+ var results = new ConcurrentBag();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int invocationIndex = i;
+ tasks[i] = Task.Run(async () =>
+ {
+ var invocationId = Guid.NewGuid().ToString("N");
+ var expectedFunctionName = $"StressAsync_{invocationIndex}_{invocationId}";
+
+ var result = new AsyncContextResult
+ {
+ InvocationIndex = invocationIndex,
+ InvocationId = invocationId,
+ ExpectedFunctionName = expectedFunctionName
+ };
+
+ try
+ {
+ var context = CreateTestContext(expectedFunctionName, invocationId);
+
+ barrier.SignalAndWait();
+
+ IdempotencyLib.Idempotency.RegisterLambdaContext(context);
+
+ for (int awaitPoint = 0; awaitPoint < 5; awaitPoint++)
+ {
+ await Task.Delay(Random.Shared.Next(1, 10));
+ var currentContext = IdempotencyLib.Idempotency.Instance.LambdaContext;
+ result.FunctionNamesAtAwaitPoints.Add(currentContext?.FunctionName ?? "null");
+ }
+
+ result.ContextPreservedAcrossAllAwaits =
+ result.FunctionNamesAtAwaitPoints.All(fn => fn == expectedFunctionName);
+ }
+ catch (Exception ex)
+ {
+ result.ExceptionThrown = true;
+ result.ExceptionType = ex.GetType().Name;
+ result.ExceptionMessage = ex.Message;
+ }
+
+ results.Add(result);
+ });
+ }
+
+ await Task.WhenAll(tasks);
+
+ Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage));
+ Assert.All(results, r => Assert.True(r.ContextPreservedAcrossAllAwaits,
+ $"Invocation {r.InvocationIndex} did not preserve context under stress. Expected '{r.ExpectedFunctionName}', got: [{string.Join(", ", r.FunctionNamesAtAwaitPoints)}]"));
+ }
+}
diff --git a/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/IdempotencyHandlerIsolationTests.cs b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/IdempotencyHandlerIsolationTests.cs
new file mode 100644
index 000000000..efab09a3d
--- /dev/null
+++ b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/IdempotencyHandlerIsolationTests.cs
@@ -0,0 +1,337 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+using System.Collections.Concurrent;
+using System.Text.Json;
+using Amazon.Lambda.TestUtilities;
+using AWS.Lambda.Powertools.Idempotency.Internal;
+using Xunit;
+using IdempotencyLib = AWS.Lambda.Powertools.Idempotency;
+
+namespace AWS.Lambda.Powertools.ConcurrencyTests.Idempotency;
+
+///
+/// Tests for validating IdempotencyAspectHandler isolation under concurrent execution scenarios.
+/// **Feature: idempotency-thread-safety, Property 5: IdempotencyAspectHandler Isolation**
+/// **Validates: Requirements 4.1, 4.2, 4.3, 4.4**
+///
+[Collection("Idempotency Handler Isolation Tests")]
+public class IdempotencyHandlerIsolationTests : IDisposable
+{
+ private readonly ThreadSafeInMemoryPersistenceStore _store;
+
+ public IdempotencyHandlerIsolationTests()
+ {
+ _store = new ThreadSafeInMemoryPersistenceStore();
+ IdempotencyLib.Idempotency.Configure(builder => builder
+ .WithPersistenceStore(_store)
+ .WithOptions(opt => opt
+ .WithEventKeyJmesPath("id")
+ .WithExpiration(TimeSpan.FromMinutes(5))));
+ }
+
+ public void Dispose()
+ {
+ _store.Clear();
+ _store.ResetCounters();
+ }
+
+ private class HandlerInvocationResult
+ {
+ public int InvocationIndex { get; set; }
+ public string InvocationId { get; set; } = string.Empty;
+ public string ExpectedResult { get; set; } = string.Empty;
+ public string? ActualResult { get; set; }
+ public bool ResultMatched { get; set; }
+ public bool ExceptionThrown { get; set; }
+ public string? ExceptionType { get; set; }
+ public string? ExceptionMessage { get; set; }
+ }
+
+ private class TestRequest
+ {
+ [System.Text.Json.Serialization.JsonPropertyName("id")]
+ public string Id { get; set; } = string.Empty;
+ [System.Text.Json.Serialization.JsonPropertyName("data")]
+ public string Data { get; set; } = string.Empty;
+ }
+
+ private class TestResponse
+ {
+ public string RequestId { get; set; } = string.Empty;
+ public string ProcessedData { get; set; } = string.Empty;
+ }
+
+ private static JsonDocument CreatePayload(string id, string data)
+ {
+ var request = new TestRequest { Id = id, Data = data };
+ return JsonDocument.Parse(JsonSerializer.Serialize(request));
+ }
+
+ private static TestLambdaContext CreateTestContext(string functionName, string requestId)
+ {
+ return new TestLambdaContext
+ {
+ FunctionName = functionName,
+ AwsRequestId = requestId,
+ RemainingTime = TimeSpan.FromMinutes(5)
+ };
+ }
+
+ ///
+ /// **Feature: idempotency-thread-safety, Property 5: IdempotencyAspectHandler Isolation**
+ /// **Validates: Requirements 4.1, 4.2, 4.3, 4.4**
+ ///
+ [Theory]
+ [InlineData(2)]
+ [InlineData(5)]
+ [InlineData(10)]
+ public void HandlerIsolation_ConcurrentInvocations_ShouldProcessIndependently(int concurrencyLevel)
+ {
+ var results = new ConcurrentBag();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int invocationIndex = i;
+ tasks[i] = Task.Run(async () =>
+ {
+ var invocationId = Guid.NewGuid().ToString("N");
+ var expectedResult = $"processed_{invocationId}";
+ var result = new HandlerInvocationResult
+ {
+ InvocationIndex = invocationIndex,
+ InvocationId = invocationId,
+ ExpectedResult = expectedResult
+ };
+
+ try
+ {
+ var payload = CreatePayload(invocationId, $"data_{invocationIndex}");
+ var context = CreateTestContext($"Function_{invocationIndex}", invocationId);
+ IdempotencyLib.Idempotency.RegisterLambdaContext(context);
+
+ Func> targetFunc = async () =>
+ {
+ await Task.Delay(Random.Shared.Next(5, 20));
+ return new TestResponse { RequestId = invocationId, ProcessedData = expectedResult };
+ };
+
+ barrier.SignalAndWait();
+
+ var handler = new IdempotencyAspectHandler(
+ targetFunc, $"TestFunction_{invocationIndex}", null, payload, context);
+
+ var response = await handler.Handle();
+ result.ActualResult = response?.ProcessedData;
+ result.ResultMatched = response?.ProcessedData == expectedResult && response?.RequestId == invocationId;
+ }
+ catch (Exception ex)
+ {
+ result.ExceptionThrown = true;
+ result.ExceptionType = ex.GetType().Name;
+ result.ExceptionMessage = ex.Message;
+ }
+
+ results.Add(result);
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ Assert.All(results, r => Assert.False(r.ExceptionThrown, $"{r.ExceptionType}: {r.ExceptionMessage}"));
+ Assert.All(results, r => Assert.True(r.ResultMatched, $"Expected '{r.ExpectedResult}' but got '{r.ActualResult}'"));
+ }
+
+ [Theory]
+ [InlineData(2)]
+ [InlineData(5)]
+ [InlineData(8)]
+ public void HandlerIsolation_MultipleHandlerInstances_ShouldOperateIndependently(int concurrencyLevel)
+ {
+ var results = new ConcurrentBag();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int invocationIndex = i;
+ tasks[i] = Task.Run(async () =>
+ {
+ var invocationId = Guid.NewGuid().ToString("N");
+ var result = new HandlerInvocationResult
+ {
+ InvocationIndex = invocationIndex,
+ InvocationId = invocationId,
+ ExpectedResult = $"result_{invocationId}"
+ };
+
+ try
+ {
+ var payload = CreatePayload(invocationId, $"data_{invocationIndex}");
+ var context = CreateTestContext($"IndependentFunction_{invocationIndex}", invocationId);
+ IdempotencyLib.Idempotency.RegisterLambdaContext(context);
+
+ Func> targetFunc = async () =>
+ {
+ await Task.Delay(Random.Shared.Next(1, 10));
+ return result.ExpectedResult;
+ };
+
+ barrier.SignalAndWait();
+
+ var handler = new IdempotencyAspectHandler(
+ targetFunc, $"IndependentFunction_{invocationIndex}", null, payload, context);
+
+ result.ActualResult = await handler.Handle();
+ result.ResultMatched = result.ActualResult == result.ExpectedResult;
+ }
+ catch (Exception ex)
+ {
+ result.ExceptionThrown = true;
+ result.ExceptionType = ex.GetType().Name;
+ result.ExceptionMessage = ex.Message;
+ }
+
+ results.Add(result);
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage));
+ Assert.All(results, r => Assert.True(r.ResultMatched, $"Expected '{r.ExpectedResult}' but got '{r.ActualResult}'"));
+ }
+
+ [Theory]
+ [InlineData(2)]
+ [InlineData(5)]
+ [InlineData(10)]
+ public void HandlerIsolation_ConcurrentHandleExecution_ShouldNotCrossContaminate(int concurrencyLevel)
+ {
+ var processedIds = new ConcurrentBag();
+ var results = new ConcurrentBag();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int invocationIndex = i;
+ tasks[i] = Task.Run(async () =>
+ {
+ var invocationId = Guid.NewGuid().ToString("N");
+ var result = new HandlerInvocationResult { InvocationIndex = invocationIndex, InvocationId = invocationId };
+
+ try
+ {
+ var payload = CreatePayload(invocationId, $"concurrent_data_{invocationIndex}");
+ var context = CreateTestContext($"ConcurrentFunction", invocationId);
+ IdempotencyLib.Idempotency.RegisterLambdaContext(context);
+
+ string capturedId = invocationId;
+ Func> targetFunc = async () =>
+ {
+ await Task.Delay(Random.Shared.Next(5, 15));
+ processedIds.Add(capturedId);
+ return capturedId;
+ };
+
+ barrier.SignalAndWait();
+
+ var handler = new IdempotencyAspectHandler(
+ targetFunc, $"ConcurrentFunction_{invocationIndex}", null, payload, context);
+
+ result.ActualResult = await handler.Handle();
+ result.ExpectedResult = invocationId;
+ result.ResultMatched = result.ActualResult == invocationId;
+ }
+ catch (Exception ex)
+ {
+ result.ExceptionThrown = true;
+ result.ExceptionType = ex.GetType().Name;
+ result.ExceptionMessage = ex.Message;
+ }
+
+ results.Add(result);
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage));
+ Assert.All(results, r => Assert.True(r.ResultMatched, $"Expected '{r.ExpectedResult}' but got '{r.ActualResult}'"));
+ Assert.Equal(concurrencyLevel, processedIds.Distinct().Count());
+ }
+
+ [Theory]
+ [InlineData(20)]
+ [InlineData(30)]
+ public async Task HandlerIsolation_HighConcurrency_ShouldMaintainIsolation(int concurrencyLevel)
+ {
+ var results = new ConcurrentBag();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int invocationIndex = i;
+ tasks[i] = Task.Run(async () =>
+ {
+ var invocationId = Guid.NewGuid().ToString("N");
+ var result = new HandlerInvocationResult
+ {
+ InvocationIndex = invocationIndex,
+ InvocationId = invocationId,
+ ExpectedResult = $"stress_result_{invocationId}"
+ };
+
+ try
+ {
+ var payload = CreatePayload(invocationId, $"stress_data_{invocationIndex}");
+ var context = CreateTestContext($"StressFunction_{invocationIndex}", invocationId);
+ IdempotencyLib.Idempotency.RegisterLambdaContext(context);
+
+ Func> targetFunc = async () =>
+ {
+ await Task.Delay(Random.Shared.Next(1, 5));
+ return result.ExpectedResult;
+ };
+
+ barrier.SignalAndWait();
+
+ var handler = new IdempotencyAspectHandler(
+ targetFunc, $"StressFunction_{invocationIndex}", null, payload, context);
+
+ result.ActualResult = await handler.Handle();
+ result.ResultMatched = result.ActualResult == result.ExpectedResult;
+ }
+ catch (Exception ex)
+ {
+ result.ExceptionThrown = true;
+ result.ExceptionType = ex.GetType().Name;
+ result.ExceptionMessage = ex.Message;
+ }
+
+ results.Add(result);
+ });
+ }
+
+ await Task.WhenAll(tasks);
+
+ Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage));
+ Assert.All(results, r => Assert.True(r.ResultMatched, $"Expected '{r.ExpectedResult}' but got '{r.ActualResult}'"));
+ }
+}
diff --git a/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/IdempotencyHashGenerationTests.cs b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/IdempotencyHashGenerationTests.cs
new file mode 100644
index 000000000..ad220b83f
--- /dev/null
+++ b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/IdempotencyHashGenerationTests.cs
@@ -0,0 +1,431 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+using System.Collections.Concurrent;
+using System.Text.Json;
+using System.Text.Json.Nodes;
+using AWS.Lambda.Powertools.Idempotency;
+using Xunit;
+
+namespace AWS.Lambda.Powertools.ConcurrencyTests.Idempotency;
+
+///
+/// Tests for validating hash generation thread safety under concurrent execution scenarios.
+/// **Feature: idempotency-thread-safety, Property 6: Hash Generation Thread Safety**
+/// **Validates: Requirements 1.4, 5.1, 5.2, 5.3**
+///
+[Collection("Idempotency Hash Generation Tests")]
+public class IdempotencyHashGenerationTests
+{
+ private class HashResult
+ {
+ public int ThreadIndex { get; set; }
+ public string Payload { get; set; } = string.Empty;
+ public string ExpectedHash { get; set; } = string.Empty;
+ public string ActualHash { get; set; } = string.Empty;
+ public bool Success { get; set; }
+ public bool ExceptionThrown { get; set; }
+ public string? ExceptionMessage { get; set; }
+ public string? ExceptionType { get; set; }
+ }
+
+ ///
+ /// **Feature: idempotency-thread-safety, Property 6: Hash Generation Thread Safety**
+ /// **Validates: Requirements 1.4, 5.1, 5.2, 5.3**
+ ///
+ [Theory]
+ [InlineData(2, 5)]
+ [InlineData(5, 25)]
+ [InlineData(10, 50)]
+ public void HashGenerationThreadSafety_ConcurrentHashGeneration_ShouldProduceCorrectHashes(int concurrencyLevel, int operationsPerThread)
+ {
+ var persistenceStore = new ThreadSafeInMemoryPersistenceStore();
+ persistenceStore.Configure(new IdempotencyOptionsBuilder().Build(), null, null);
+
+ var results = new ConcurrentBag();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int threadIndex = i;
+ tasks[i] = Task.Run(() =>
+ {
+ try
+ {
+ barrier.SignalAndWait();
+
+ for (int op = 0; op < operationsPerThread; op++)
+ {
+ var result = new HashResult { ThreadIndex = threadIndex };
+
+ try
+ {
+ string payload = $"thread_{threadIndex}_operation_{op}";
+ result.Payload = payload;
+
+ var jsonValue = JsonValue.Create(payload);
+ var jsonDoc = JsonDocument.Parse(jsonValue!.ToJsonString());
+ var hash = persistenceStore.GenerateHash(jsonDoc.RootElement);
+
+ var expectedHash = ComputeExpectedMd5Hash(payload);
+ result.ExpectedHash = expectedHash;
+ result.ActualHash = hash;
+ result.Success = hash == expectedHash;
+ }
+ catch (Exception ex)
+ {
+ result.ExceptionThrown = true;
+ result.ExceptionType = ex.GetType().Name;
+ result.ExceptionMessage = ex.Message;
+ }
+
+ results.Add(result);
+ }
+ }
+ catch (Exception ex)
+ {
+ results.Add(new HashResult
+ {
+ ThreadIndex = threadIndex,
+ ExceptionThrown = true,
+ ExceptionType = ex.GetType().Name,
+ ExceptionMessage = ex.Message
+ });
+ }
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ Assert.All(results, r => Assert.True(r.Success && !r.ExceptionThrown,
+ $"Thread {r.ThreadIndex} failed: expected '{r.ExpectedHash}', got '{r.ActualHash}'. {r.ExceptionType}: {r.ExceptionMessage}"));
+ }
+
+ [Theory]
+ [InlineData(2)]
+ [InlineData(5)]
+ [InlineData(10)]
+ public void HashGenerationThreadSafety_SamePayload_ShouldProduceSameHash(int concurrencyLevel)
+ {
+ var persistenceStore = new ThreadSafeInMemoryPersistenceStore();
+ persistenceStore.Configure(new IdempotencyOptionsBuilder().Build(), null, null);
+
+ const string sharedPayload = "shared_test_payload";
+ var jsonValue = JsonValue.Create(sharedPayload);
+ var jsonString = jsonValue!.ToJsonString();
+
+ var hashes = new ConcurrentBag();
+ var exceptions = new ConcurrentBag();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ tasks[i] = Task.Run(() =>
+ {
+ try
+ {
+ barrier.SignalAndWait();
+
+ for (int j = 0; j < 10; j++)
+ {
+ var jsonDoc = JsonDocument.Parse(jsonString);
+ var hash = persistenceStore.GenerateHash(jsonDoc.RootElement);
+ hashes.Add(hash);
+ }
+ }
+ catch (Exception ex)
+ {
+ exceptions.Add(ex);
+ }
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ Assert.Empty(exceptions);
+ var distinctHashes = hashes.Distinct().ToList();
+ Assert.Single(distinctHashes);
+ }
+
+ [Theory]
+ [InlineData(2)]
+ [InlineData(5)]
+ [InlineData(10)]
+ public void HashGenerationThreadSafety_DifferentPayloads_ShouldProduceDifferentHashes(int concurrencyLevel)
+ {
+ var persistenceStore = new ThreadSafeInMemoryPersistenceStore();
+ persistenceStore.Configure(new IdempotencyOptionsBuilder().Build(), null, null);
+
+ var hashToPayload = new ConcurrentDictionary();
+ var exceptions = new ConcurrentBag();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int threadIndex = i;
+ tasks[i] = Task.Run(() =>
+ {
+ try
+ {
+ barrier.SignalAndWait();
+
+ for (int j = 0; j < 10; j++)
+ {
+ string payload = $"unique_payload_thread_{threadIndex}_op_{j}";
+ var jsonValue = JsonValue.Create(payload);
+ var jsonDoc = JsonDocument.Parse(jsonValue!.ToJsonString());
+ var hash = persistenceStore.GenerateHash(jsonDoc.RootElement);
+
+ hashToPayload.TryAdd(hash, payload);
+ }
+ }
+ catch (Exception ex)
+ {
+ exceptions.Add(ex);
+ }
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ Assert.Empty(exceptions);
+ int expectedUniquePayloads = concurrencyLevel * 10;
+ Assert.Equal(expectedUniquePayloads, hashToPayload.Count);
+ }
+
+ [Theory]
+ [InlineData(2)]
+ [InlineData(5)]
+ [InlineData(10)]
+ public void HashGenerationThreadSafety_ComplexObjects_ShouldProduceCorrectHashes(int concurrencyLevel)
+ {
+ var persistenceStore = new ThreadSafeInMemoryPersistenceStore();
+ persistenceStore.Configure(new IdempotencyOptionsBuilder().Build(), null, null);
+
+ var results = new ConcurrentBag();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int threadIndex = i;
+ tasks[i] = Task.Run(() =>
+ {
+ try
+ {
+ barrier.SignalAndWait();
+
+ for (int op = 0; op < 10; op++)
+ {
+ var result = new HashResult { ThreadIndex = threadIndex };
+
+ try
+ {
+ var complexObject = new
+ {
+ Id = threadIndex * 100 + op,
+ Name = $"Item_{threadIndex}_{op}",
+ Price = (threadIndex + 1) * 10.5 + op,
+ Tags = new[] { $"tag_{threadIndex}", $"tag_{op}" },
+ Metadata = new { ThreadId = threadIndex, OperationId = op }
+ };
+
+ var jsonString = JsonSerializer.Serialize(complexObject);
+ result.Payload = jsonString;
+
+ var jsonDoc = JsonDocument.Parse(jsonString);
+ var hash = persistenceStore.GenerateHash(jsonDoc.RootElement);
+ result.ActualHash = hash;
+
+ result.Success = !string.IsNullOrEmpty(hash) &&
+ hash.Length == 32 &&
+ hash.All(c => char.IsLetterOrDigit(c));
+ }
+ catch (Exception ex)
+ {
+ result.ExceptionThrown = true;
+ result.ExceptionType = ex.GetType().Name;
+ result.ExceptionMessage = ex.Message;
+ }
+
+ results.Add(result);
+ }
+ }
+ catch (Exception ex)
+ {
+ results.Add(new HashResult
+ {
+ ThreadIndex = threadIndex,
+ ExceptionThrown = true,
+ ExceptionType = ex.GetType().Name,
+ ExceptionMessage = ex.Message
+ });
+ }
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ Assert.All(results, r => Assert.True(r.Success && !r.ExceptionThrown,
+ $"Thread {r.ThreadIndex} failed: {r.ExceptionType}: {r.ExceptionMessage}"));
+ }
+
+ private static string ComputeExpectedMd5Hash(string input)
+ {
+ using var md5 = System.Security.Cryptography.MD5.Create();
+ var inputBytes = System.Text.Encoding.UTF8.GetBytes(input);
+ var hashBytes = md5.ComputeHash(inputBytes);
+ return Convert.ToHexString(hashBytes).ToLowerInvariant();
+ }
+
+ [Theory]
+ [InlineData(20, 100)]
+ [InlineData(30, 50)]
+ public void HashGenerationThreadSafety_HighConcurrency_ShouldCompleteWithoutExceptions(int concurrencyLevel, int operationsPerThread)
+ {
+ var persistenceStore = new ThreadSafeInMemoryPersistenceStore();
+ persistenceStore.Configure(new IdempotencyOptionsBuilder().Build(), null, null);
+
+ var exceptions = new ConcurrentBag();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int threadIndex = i;
+ tasks[i] = Task.Run(() =>
+ {
+ try
+ {
+ barrier.SignalAndWait();
+
+ for (int op = 0; op < operationsPerThread; op++)
+ {
+ string payload = $"stress_test_thread_{threadIndex}_op_{op}";
+ var jsonValue = JsonValue.Create(payload);
+ var jsonDoc = JsonDocument.Parse(jsonValue!.ToJsonString());
+ persistenceStore.GenerateHash(jsonDoc.RootElement);
+ }
+ }
+ catch (Exception ex)
+ {
+ exceptions.Add(ex);
+ }
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ Assert.Empty(exceptions);
+ }
+
+ [Fact]
+ public void HashGenerationThreadSafety_MultipleStoreInstances_ShouldProduceSameHashes()
+ {
+ const string testPayload = "test_payload_for_consistency";
+ var jsonValue = JsonValue.Create(testPayload);
+ var jsonString = jsonValue!.ToJsonString();
+
+ var hashes = new ConcurrentBag();
+ var exceptions = new ConcurrentBag();
+ var barrier = new Barrier(10);
+ var tasks = new Task[10];
+
+ for (int i = 0; i < 10; i++)
+ {
+ tasks[i] = Task.Run(() =>
+ {
+ try
+ {
+ var store = new ThreadSafeInMemoryPersistenceStore();
+ store.Configure(new IdempotencyOptionsBuilder().Build(), null, null);
+
+ barrier.SignalAndWait();
+
+ var jsonDoc = JsonDocument.Parse(jsonString);
+ var hash = store.GenerateHash(jsonDoc.RootElement);
+ hashes.Add(hash);
+ }
+ catch (Exception ex)
+ {
+ exceptions.Add(ex);
+ }
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ Assert.Empty(exceptions);
+ var distinctHashes = hashes.Distinct().ToList();
+ Assert.Single(distinctHashes);
+ }
+
+ [Fact]
+ public void HashGenerationThreadSafety_VariousJsonTypes_ShouldProduceValidHashes()
+ {
+ var persistenceStore = new ThreadSafeInMemoryPersistenceStore();
+ persistenceStore.Configure(new IdempotencyOptionsBuilder().Build(), null, null);
+
+ var testCases = new[]
+ {
+ ("string", "\"test string\""),
+ ("number", "42"),
+ ("decimal", "3.14159"),
+ ("boolean_true", "true"),
+ ("boolean_false", "false"),
+ ("null", "null"),
+ ("array", "[1, 2, 3]"),
+ ("object", "{\"key\": \"value\"}")
+ };
+
+ var results = new ConcurrentBag<(string type, string hash, bool valid)>();
+ var exceptions = new ConcurrentBag();
+ var barrier = new Barrier(testCases.Length);
+ var tasks = new Task[testCases.Length];
+
+ for (int i = 0; i < testCases.Length; i++)
+ {
+ var (typeName, jsonValue) = testCases[i];
+ tasks[i] = Task.Run(() =>
+ {
+ try
+ {
+ barrier.SignalAndWait();
+
+ var jsonDoc = JsonDocument.Parse(jsonValue);
+ var hash = persistenceStore.GenerateHash(jsonDoc.RootElement);
+
+ bool isValid = !string.IsNullOrEmpty(hash) &&
+ hash.Length == 32 &&
+ hash.All(c => char.IsLetterOrDigit(c));
+
+ results.Add((typeName, hash, isValid));
+ }
+ catch (Exception ex)
+ {
+ exceptions.Add(ex);
+ }
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ Assert.Empty(exceptions);
+ Assert.All(results, r => Assert.True(r.valid, $"Hash for type '{r.type}' was invalid: {r.hash}"));
+ }
+}
diff --git a/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/IdempotencyPersistenceStoreTests.cs b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/IdempotencyPersistenceStoreTests.cs
new file mode 100644
index 000000000..10ae488e5
--- /dev/null
+++ b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/IdempotencyPersistenceStoreTests.cs
@@ -0,0 +1,715 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+using System.Collections.Concurrent;
+using AWS.Lambda.Powertools.Idempotency;
+using AWS.Lambda.Powertools.Idempotency.Persistence;
+using Xunit;
+
+namespace AWS.Lambda.Powertools.ConcurrencyTests.Idempotency;
+
+///
+/// Tests for validating persistence store thread safety under concurrent execution scenarios.
+///
+/// These tests verify that when multiple threads perform SaveInProgress, SaveSuccess,
+/// GetRecord, and DeleteRecord operations simultaneously, all operations complete without
+/// exceptions and each operation affects only its intended record.
+///
+[Collection("Idempotency Persistence Store Tests")]
+public class IdempotencyPersistenceStoreTests
+{
+ #region Helper Classes
+
+ ///
+ /// Result of a persistence store operation for tracking test outcomes.
+ ///
+ private class PersistenceOperationResult
+ {
+ public int ThreadIndex { get; set; }
+ public string OperationType { get; set; } = string.Empty;
+ public string IdempotencyKey { get; set; } = string.Empty;
+ public bool Success { get; set; }
+ public bool ExceptionThrown { get; set; }
+ public string? ExceptionMessage { get; set; }
+ public string? ExceptionType { get; set; }
+ public DataRecord? RetrievedRecord { get; set; }
+ }
+
+ ///
+ /// Creates a configured ThreadSafeInMemoryPersistenceStore for testing.
+ ///
+ private static ThreadSafeInMemoryPersistenceStore CreateConfiguredStore()
+ {
+ var store = new ThreadSafeInMemoryPersistenceStore();
+ var options = new IdempotencyOptionsBuilder()
+ .WithEventKeyJmesPath("id")
+ .WithExpiration(TimeSpan.FromMinutes(5))
+ .Build();
+ store.Configure(options, "TestFunction", null);
+ return store;
+ }
+
+ ///
+ /// Creates a DataRecord for testing purposes.
+ ///
+ private static DataRecord CreateTestRecord(string key, DataRecord.DataRecordStatus status, string? responseData = null)
+ {
+ return new DataRecord(
+ key,
+ status,
+ DateTimeOffset.UtcNow.AddMinutes(5).ToUnixTimeSeconds(),
+ responseData ?? $"response_{key}",
+ $"hash_{key}"
+ );
+ }
+
+ #endregion
+
+
+ #region Property 4: Persistence Store Operations Thread Safety
+
+ ///
+ /// **Feature: idempotency-thread-safety, Property 4: Persistence Store Operations Thread Safety**
+ /// *For any* set of concurrent persistence store operations (SaveInProgress, SaveSuccess, GetRecord, DeleteRecord)
+ /// with different idempotency keys, all operations should complete without throwing concurrency-related exceptions
+ /// and each operation should affect only its intended record.
+ /// **Validates: Requirements 1.3, 3.1, 3.2, 3.3, 3.4, 3.5**
+ ///
+ [Theory]
+ [InlineData(2, 5)]
+ [InlineData(5, 10)]
+ [InlineData(10, 20)]
+ public void PersistenceStoreThreadSafety_ConcurrentOperations_ShouldCompleteWithoutExceptions(int concurrencyLevel, int operationsPerThread)
+ {
+ var store = CreateConfiguredStore();
+ var results = new ConcurrentBag();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int threadIndex = i;
+ tasks[i] = Task.Run(async () =>
+ {
+ try
+ {
+ barrier.SignalAndWait();
+
+ for (int op = 0; op < operationsPerThread; op++)
+ {
+ var result = new PersistenceOperationResult { ThreadIndex = threadIndex };
+ string key = $"key_{threadIndex}_{op}";
+ result.IdempotencyKey = key;
+
+ try
+ {
+ // Mix of operations: PutRecord, GetRecord, UpdateRecord, DeleteRecord
+ int opType = (threadIndex + op) % 4;
+ var record = CreateTestRecord(key, DataRecord.DataRecordStatus.INPROGRESS);
+
+ switch (opType)
+ {
+ case 0: // PutRecord
+ result.OperationType = "PutRecord";
+ await store.PutRecord(record, DateTimeOffset.UtcNow);
+ break;
+ case 1: // GetRecord
+ result.OperationType = "GetRecord";
+ result.RetrievedRecord = await store.GetRecord(key);
+ break;
+ case 2: // UpdateRecord
+ result.OperationType = "UpdateRecord";
+ var completedRecord = CreateTestRecord(key, DataRecord.DataRecordStatus.COMPLETED, $"result_{threadIndex}_{op}");
+ await store.UpdateRecord(completedRecord);
+ break;
+ case 3: // DeleteRecord
+ result.OperationType = "DeleteRecord";
+ await store.DeleteRecord(key);
+ break;
+ }
+
+ result.Success = true;
+ }
+ catch (Exception ex)
+ {
+ result.ExceptionThrown = true;
+ result.ExceptionType = ex.GetType().Name;
+ result.ExceptionMessage = ex.Message;
+ }
+
+ results.Add(result);
+ }
+ }
+ catch (Exception ex)
+ {
+ results.Add(new PersistenceOperationResult
+ {
+ ThreadIndex = threadIndex,
+ OperationType = "Barrier",
+ ExceptionThrown = true,
+ ExceptionType = ex.GetType().Name,
+ ExceptionMessage = ex.Message
+ });
+ }
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ // All operations should complete without exceptions
+ Assert.All(results, r => Assert.True(r.Success && !r.ExceptionThrown,
+ $"Thread {r.ThreadIndex} {r.OperationType} for key '{r.IdempotencyKey}' failed: {r.ExceptionType}: {r.ExceptionMessage}"));
+ }
+
+ ///
+ /// **Feature: idempotency-thread-safety, Property 4a: Concurrent SaveInProgress Operations**
+ /// *For any* set of concurrent SaveInProgress operations with different keys,
+ /// all operations should complete without interference.
+ /// **Validates: Requirements 3.1**
+ ///
+ [Theory]
+ [InlineData(2)]
+ [InlineData(5)]
+ [InlineData(10)]
+ public void PersistenceStoreThreadSafety_ConcurrentSaveInProgress_ShouldCompleteWithoutInterference(int concurrencyLevel)
+ {
+ var store = CreateConfiguredStore();
+ var results = new ConcurrentBag();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int threadIndex = i;
+ tasks[i] = Task.Run(async () =>
+ {
+ var result = new PersistenceOperationResult
+ {
+ ThreadIndex = threadIndex,
+ OperationType = "PutRecord",
+ IdempotencyKey = $"inprogress_key_{threadIndex}"
+ };
+
+ try
+ {
+ barrier.SignalAndWait();
+
+ var record = CreateTestRecord(result.IdempotencyKey, DataRecord.DataRecordStatus.INPROGRESS);
+ await store.PutRecord(record, DateTimeOffset.UtcNow);
+
+ result.Success = true;
+ }
+ catch (Exception ex)
+ {
+ result.ExceptionThrown = true;
+ result.ExceptionType = ex.GetType().Name;
+ result.ExceptionMessage = ex.Message;
+ }
+
+ results.Add(result);
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ // All operations should succeed
+ Assert.All(results, r => Assert.True(r.Success && !r.ExceptionThrown,
+ $"Thread {r.ThreadIndex} failed: {r.ExceptionType}: {r.ExceptionMessage}"));
+
+ // Verify all records were saved
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ Assert.True(store.ContainsKey($"inprogress_key_{i}"),
+ $"Record for key 'inprogress_key_{i}' was not saved");
+ }
+ }
+
+
+ ///
+ /// **Feature: idempotency-thread-safety, Property 4b: Concurrent SaveSuccess Operations**
+ /// *For any* set of concurrent SaveSuccess operations with different keys,
+ /// all records should be persisted correctly.
+ /// **Validates: Requirements 3.2**
+ ///
+ [Theory]
+ [InlineData(2)]
+ [InlineData(5)]
+ [InlineData(10)]
+ public void PersistenceStoreThreadSafety_ConcurrentSaveSuccess_ShouldPersistAllRecords(int concurrencyLevel)
+ {
+ var store = CreateConfiguredStore();
+ var results = new ConcurrentBag();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+ var expectedResponses = new ConcurrentDictionary();
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int threadIndex = i;
+ tasks[i] = Task.Run(async () =>
+ {
+ string key = $"success_key_{threadIndex}";
+ string responseData = $"response_data_{threadIndex}";
+ expectedResponses[key] = responseData;
+
+ var result = new PersistenceOperationResult
+ {
+ ThreadIndex = threadIndex,
+ OperationType = "UpdateRecord",
+ IdempotencyKey = key
+ };
+
+ try
+ {
+ barrier.SignalAndWait();
+
+ var record = CreateTestRecord(key, DataRecord.DataRecordStatus.COMPLETED, responseData);
+ await store.UpdateRecord(record);
+
+ result.Success = true;
+ }
+ catch (Exception ex)
+ {
+ result.ExceptionThrown = true;
+ result.ExceptionType = ex.GetType().Name;
+ result.ExceptionMessage = ex.Message;
+ }
+
+ results.Add(result);
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ // All operations should succeed
+ Assert.All(results, r => Assert.True(r.Success && !r.ExceptionThrown,
+ $"Thread {r.ThreadIndex} failed: {r.ExceptionType}: {r.ExceptionMessage}"));
+
+ // Verify all records were saved with correct data
+ foreach (var kvp in expectedResponses)
+ {
+ Assert.True(store.TryGetRecord(kvp.Key, out var record),
+ $"Record for key '{kvp.Key}' was not found");
+ Assert.Equal(kvp.Value, record?.ResponseData);
+ }
+ }
+
+ ///
+ /// **Feature: idempotency-thread-safety, Property 4c: Concurrent GetRecord Operations**
+ /// *For any* set of concurrent GetRecord operations for different keys,
+ /// each operation should return the correct record for its key.
+ /// **Validates: Requirements 3.3**
+ ///
+ [Theory]
+ [InlineData(2)]
+ [InlineData(5)]
+ [InlineData(10)]
+ public void PersistenceStoreThreadSafety_ConcurrentGetRecord_ShouldReturnCorrectRecords(int concurrencyLevel)
+ {
+ var store = CreateConfiguredStore();
+
+ // Pre-populate store with known records
+ var expectedRecords = new Dictionary();
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ string key = $"get_key_{i}";
+ var record = CreateTestRecord(key, DataRecord.DataRecordStatus.COMPLETED, $"response_{i}");
+ store.PutRecord(record, DateTimeOffset.UtcNow).Wait();
+ expectedRecords[key] = record;
+ }
+
+ var results = new ConcurrentBag();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int threadIndex = i;
+ tasks[i] = Task.Run(async () =>
+ {
+ string key = $"get_key_{threadIndex}";
+ var result = new PersistenceOperationResult
+ {
+ ThreadIndex = threadIndex,
+ OperationType = "GetRecord",
+ IdempotencyKey = key
+ };
+
+ try
+ {
+ barrier.SignalAndWait();
+
+ result.RetrievedRecord = await store.GetRecord(key);
+ result.Success = true;
+ }
+ catch (Exception ex)
+ {
+ result.ExceptionThrown = true;
+ result.ExceptionType = ex.GetType().Name;
+ result.ExceptionMessage = ex.Message;
+ }
+
+ results.Add(result);
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ // All operations should succeed
+ Assert.All(results, r => Assert.True(r.Success && !r.ExceptionThrown,
+ $"Thread {r.ThreadIndex} failed: {r.ExceptionType}: {r.ExceptionMessage}"));
+
+ // Verify all retrieved records match expected
+ foreach (var r in results)
+ {
+ Assert.NotNull(r.RetrievedRecord);
+ var expected = expectedRecords[r.IdempotencyKey];
+ Assert.Equal(expected.IdempotencyKey, r.RetrievedRecord.IdempotencyKey);
+ Assert.Equal(expected.ResponseData, r.RetrievedRecord.ResponseData);
+ }
+ }
+
+
+ ///
+ /// **Feature: idempotency-thread-safety, Property 4d: Concurrent DeleteRecord Operations**
+ /// *For any* set of concurrent DeleteRecord operations for different keys,
+ /// only the specified records should be deleted.
+ /// **Validates: Requirements 3.4**
+ ///
+ [Theory]
+ [InlineData(2)]
+ [InlineData(5)]
+ [InlineData(10)]
+ public void PersistenceStoreThreadSafety_ConcurrentDeleteRecord_ShouldDeleteOnlySpecifiedRecords(int concurrencyLevel)
+ {
+ var store = CreateConfiguredStore();
+
+ // Pre-populate store with records to delete and records to keep
+ var keysToDelete = new List();
+ var keysToKeep = new List();
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ string deleteKey = $"delete_key_{i}";
+ string keepKey = $"keep_key_{i}";
+
+ var deleteRecord = CreateTestRecord(deleteKey, DataRecord.DataRecordStatus.COMPLETED);
+ var keepRecord = CreateTestRecord(keepKey, DataRecord.DataRecordStatus.COMPLETED);
+
+ store.PutRecord(deleteRecord, DateTimeOffset.UtcNow).Wait();
+ store.PutRecord(keepRecord, DateTimeOffset.UtcNow).Wait();
+
+ keysToDelete.Add(deleteKey);
+ keysToKeep.Add(keepKey);
+ }
+
+ var results = new ConcurrentBag();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int threadIndex = i;
+ tasks[i] = Task.Run(async () =>
+ {
+ string key = $"delete_key_{threadIndex}";
+ var result = new PersistenceOperationResult
+ {
+ ThreadIndex = threadIndex,
+ OperationType = "DeleteRecord",
+ IdempotencyKey = key
+ };
+
+ try
+ {
+ barrier.SignalAndWait();
+
+ await store.DeleteRecord(key);
+ result.Success = true;
+ }
+ catch (Exception ex)
+ {
+ result.ExceptionThrown = true;
+ result.ExceptionType = ex.GetType().Name;
+ result.ExceptionMessage = ex.Message;
+ }
+
+ results.Add(result);
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ // All operations should succeed
+ Assert.All(results, r => Assert.True(r.Success && !r.ExceptionThrown,
+ $"Thread {r.ThreadIndex} failed: {r.ExceptionType}: {r.ExceptionMessage}"));
+
+ // Verify deleted keys are gone
+ foreach (var key in keysToDelete)
+ {
+ Assert.False(store.ContainsKey(key), $"Key '{key}' should have been deleted");
+ }
+
+ // Verify kept keys still exist
+ foreach (var key in keysToKeep)
+ {
+ Assert.True(store.ContainsKey(key), $"Key '{key}' should still exist");
+ }
+ }
+
+ ///
+ /// **Feature: idempotency-thread-safety, Property 4e: Mixed Concurrent Operations**
+ /// *For any* combination of concurrent save, get, and delete operations,
+ /// data integrity should be maintained across all operations.
+ /// **Validates: Requirements 3.5**
+ ///
+ [Theory]
+ [InlineData(4)]
+ [InlineData(8)]
+ [InlineData(12)]
+ public void PersistenceStoreThreadSafety_MixedOperations_ShouldMaintainDataIntegrity(int concurrencyLevel)
+ {
+ var store = CreateConfiguredStore();
+ var results = new ConcurrentBag();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ // Pre-populate some records
+ for (int i = 0; i < concurrencyLevel / 2; i++)
+ {
+ var record = CreateTestRecord($"existing_key_{i}", DataRecord.DataRecordStatus.COMPLETED);
+ store.PutRecord(record, DateTimeOffset.UtcNow).Wait();
+ }
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int threadIndex = i;
+ tasks[i] = Task.Run(async () =>
+ {
+ var result = new PersistenceOperationResult { ThreadIndex = threadIndex };
+
+ try
+ {
+ barrier.SignalAndWait();
+
+ // Different threads do different operations
+ int opType = threadIndex % 4;
+
+ switch (opType)
+ {
+ case 0: // Put new record
+ result.OperationType = "PutRecord";
+ result.IdempotencyKey = $"new_key_{threadIndex}";
+ var newRecord = CreateTestRecord(result.IdempotencyKey, DataRecord.DataRecordStatus.INPROGRESS);
+ await store.PutRecord(newRecord, DateTimeOffset.UtcNow);
+ break;
+
+ case 1: // Get existing record
+ result.OperationType = "GetRecord";
+ result.IdempotencyKey = $"existing_key_{threadIndex % (concurrencyLevel / 2)}";
+ result.RetrievedRecord = await store.GetRecord(result.IdempotencyKey);
+ break;
+
+ case 2: // Update existing record
+ result.OperationType = "UpdateRecord";
+ result.IdempotencyKey = $"existing_key_{threadIndex % (concurrencyLevel / 2)}";
+ var updateRecord = CreateTestRecord(result.IdempotencyKey, DataRecord.DataRecordStatus.COMPLETED, $"updated_{threadIndex}");
+ await store.UpdateRecord(updateRecord);
+ break;
+
+ case 3: // Delete (non-existing key to avoid affecting other tests)
+ result.OperationType = "DeleteRecord";
+ result.IdempotencyKey = $"delete_target_{threadIndex}";
+ await store.DeleteRecord(result.IdempotencyKey);
+ break;
+ }
+
+ result.Success = true;
+ }
+ catch (Exception ex)
+ {
+ result.ExceptionThrown = true;
+ result.ExceptionType = ex.GetType().Name;
+ result.ExceptionMessage = ex.Message;
+ }
+
+ results.Add(result);
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ // All operations should complete without exceptions
+ Assert.All(results, r => Assert.True(r.Success && !r.ExceptionThrown,
+ $"Thread {r.ThreadIndex} {r.OperationType} for key '{r.IdempotencyKey}' failed: {r.ExceptionType}: {r.ExceptionMessage}"));
+ }
+
+ #endregion
+
+
+ #region Additional Stress Tests
+
+ ///
+ /// High concurrency stress test for persistence store operations.
+ ///
+ [Theory]
+ [InlineData(20, 50)]
+ [InlineData(30, 30)]
+ public void PersistenceStoreThreadSafety_HighConcurrency_ShouldCompleteWithoutExceptions(int concurrencyLevel, int operationsPerThread)
+ {
+ var store = CreateConfiguredStore();
+ var exceptions = new ConcurrentBag();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int threadIndex = i;
+ tasks[i] = Task.Run(async () =>
+ {
+ try
+ {
+ barrier.SignalAndWait();
+
+ for (int op = 0; op < operationsPerThread; op++)
+ {
+ int opType = (threadIndex + op) % 4;
+ string key = $"stress_key_{threadIndex}_{op}";
+ var record = CreateTestRecord(key, DataRecord.DataRecordStatus.INPROGRESS);
+
+ switch (opType)
+ {
+ case 0:
+ await store.PutRecord(record, DateTimeOffset.UtcNow);
+ break;
+ case 1:
+ await store.GetRecord(key);
+ break;
+ case 2:
+ var completedRecord = CreateTestRecord(key, DataRecord.DataRecordStatus.COMPLETED);
+ await store.UpdateRecord(completedRecord);
+ break;
+ case 3:
+ await store.DeleteRecord(key);
+ break;
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ exceptions.Add(ex);
+ }
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ Assert.Empty(exceptions);
+ }
+
+ ///
+ /// Test that verifies operation counters are correctly incremented under concurrent access.
+ ///
+ [Fact]
+ public void PersistenceStoreThreadSafety_OperationCounters_ShouldBeAccurate()
+ {
+ var store = CreateConfiguredStore();
+ int concurrencyLevel = 10;
+ int operationsPerThread = 20;
+
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int threadIndex = i;
+ tasks[i] = Task.Run(async () =>
+ {
+ barrier.SignalAndWait();
+
+ for (int op = 0; op < operationsPerThread; op++)
+ {
+ string key = $"counter_key_{threadIndex}_{op}";
+ var record = CreateTestRecord(key, DataRecord.DataRecordStatus.COMPLETED);
+
+ await store.PutRecord(record, DateTimeOffset.UtcNow);
+ await store.GetRecord(key);
+ await store.UpdateRecord(record);
+ await store.DeleteRecord(key);
+ }
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ int expectedOperations = concurrencyLevel * operationsPerThread;
+
+ Assert.Equal(expectedOperations, store.PutRecordCount);
+ Assert.Equal(expectedOperations, store.GetRecordCount);
+ Assert.Equal(expectedOperations, store.UpdateRecordCount);
+ Assert.Equal(expectedOperations, store.DeleteRecordCount);
+ }
+
+ ///
+ /// Test that verifies concurrent operations on the same key are handled correctly.
+ ///
+ [Fact]
+ public void PersistenceStoreThreadSafety_SameKeyConcurrentOperations_ShouldNotCorruptData()
+ {
+ var store = CreateConfiguredStore();
+ int concurrencyLevel = 10;
+ string sharedKey = "shared_key";
+
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+ var exceptions = new ConcurrentBag();
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int threadIndex = i;
+ tasks[i] = Task.Run(async () =>
+ {
+ try
+ {
+ barrier.SignalAndWait();
+
+ // All threads operate on the same key
+ var record = CreateTestRecord(sharedKey, DataRecord.DataRecordStatus.COMPLETED, $"response_{threadIndex}");
+
+ await store.PutRecord(record, DateTimeOffset.UtcNow);
+ await store.GetRecord(sharedKey);
+ await store.UpdateRecord(record);
+ }
+ catch (Exception ex)
+ {
+ exceptions.Add(ex);
+ }
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ // No exceptions should be thrown
+ Assert.Empty(exceptions);
+
+ // The key should still exist with valid data
+ Assert.True(store.ContainsKey(sharedKey));
+ Assert.True(store.TryGetRecord(sharedKey, out var finalRecord));
+ Assert.NotNull(finalRecord);
+ Assert.Equal(sharedKey, finalRecord.IdempotencyKey);
+ }
+
+ #endregion
+}
diff --git a/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/LRUCacheThreadSafetyTests.cs b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/LRUCacheThreadSafetyTests.cs
new file mode 100644
index 000000000..1c6ac7b80
--- /dev/null
+++ b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/LRUCacheThreadSafetyTests.cs
@@ -0,0 +1,485 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+using System.Collections.Concurrent;
+using Xunit;
+
+namespace AWS.Lambda.Powertools.ConcurrencyTests.Idempotency;
+
+///
+/// Tests for validating LRU Cache thread safety under concurrent execution scenarios.
+///
+/// These tests verify that when multiple threads perform read, write, and delete
+/// operations on the LRU cache simultaneously, all operations complete without
+/// exceptions and without data corruption.
+///
+[Collection("Idempotency LRU Cache Tests")]
+public class LRUCacheThreadSafetyTests
+{
+ #region Helper Classes
+
+ private class OperationResult
+ {
+ public int ThreadIndex { get; set; }
+ public string OperationType { get; set; } = string.Empty;
+ public bool Success { get; set; }
+ public bool ExceptionThrown { get; set; }
+ public string? ExceptionMessage { get; set; }
+ public string? ExceptionType { get; set; }
+ }
+
+ ///
+ /// Wrapper to access internal LRUCache for testing purposes.
+ /// Uses reflection to create and interact with the internal LRUCache class.
+ ///
+ private class LRUCacheWrapper where TKey : notnull
+ {
+ private readonly object _cache;
+ private readonly Type _cacheType;
+
+ public LRUCacheWrapper(int capacity)
+ {
+ var assembly = typeof(AWS.Lambda.Powertools.Idempotency.Idempotency).Assembly;
+ _cacheType = assembly.GetType("AWS.Lambda.Powertools.Idempotency.Internal.LRUCache`2")!
+ .MakeGenericType(typeof(TKey), typeof(TValue));
+ _cache = Activator.CreateInstance(_cacheType, capacity)!;
+ }
+
+ public bool TryGet(TKey key, out TValue? value)
+ {
+ var method = _cacheType.GetMethod("TryGet")!;
+ var parameters = new object?[] { key, null };
+ var result = (bool)method.Invoke(_cache, parameters)!;
+ value = (TValue?)parameters[1];
+ return result;
+ }
+
+ public void Set(TKey key, TValue value)
+ {
+ var method = _cacheType.GetMethod("Set")!;
+ method.Invoke(_cache, new object?[] { key, value });
+ }
+
+ public void Delete(TKey key)
+ {
+ var method = _cacheType.GetMethod("Delete")!;
+ method.Invoke(_cache, new object?[] { key });
+ }
+
+ public int Count
+ {
+ get
+ {
+ var property = _cacheType.GetProperty("Count")!;
+ return (int)property.GetValue(_cache)!;
+ }
+ }
+ }
+
+ #endregion
+
+ #region Property 3: LRU Cache Thread Safety
+
+ ///
+ /// **Feature: idempotency-thread-safety, Property 3: LRU Cache Thread Safety**
+ /// *For any* combination of concurrent read, write, and delete operations on the LRU cache,
+ /// all operations should complete without throwing exceptions and without data corruption
+ /// (reads return correct values, writes persist correctly, deletes remove only specified entries).
+ /// **Validates: Requirements 2.1, 2.2, 2.3, 2.4**
+ ///
+ [Theory]
+ [InlineData(2, 10)]
+ [InlineData(5, 50)]
+ [InlineData(10, 100)]
+ public void LRUCacheThreadSafety_ConcurrentOperations_ShouldCompleteWithoutExceptions(int concurrencyLevel, int operationsPerThread)
+ {
+ var cache = new LRUCacheWrapper(50);
+ var results = new ConcurrentBag();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int threadIndex = i;
+ tasks[i] = Task.Run(() =>
+ {
+ try
+ {
+ barrier.SignalAndWait();
+
+ for (int op = 0; op < operationsPerThread; op++)
+ {
+ var result = new OperationResult { ThreadIndex = threadIndex };
+
+ try
+ {
+ // Mix of operations: write, read, delete
+ int opType = (threadIndex + op) % 3;
+ string key = $"key_{threadIndex}_{op % 20}"; // Reuse some keys
+
+ switch (opType)
+ {
+ case 0: // Write
+ result.OperationType = "Set";
+ cache.Set(key, $"value_{threadIndex}_{op}");
+ break;
+ case 1: // Read
+ result.OperationType = "TryGet";
+ cache.TryGet(key, out _);
+ break;
+ case 2: // Delete
+ result.OperationType = "Delete";
+ cache.Delete(key);
+ break;
+ }
+
+ result.Success = true;
+ }
+ catch (Exception ex)
+ {
+ result.ExceptionThrown = true;
+ result.ExceptionType = ex.GetType().Name;
+ result.ExceptionMessage = ex.Message;
+ }
+
+ results.Add(result);
+ }
+ }
+ catch (Exception ex)
+ {
+ results.Add(new OperationResult
+ {
+ ThreadIndex = threadIndex,
+ OperationType = "Barrier",
+ ExceptionThrown = true,
+ ExceptionType = ex.GetType().Name,
+ ExceptionMessage = ex.Message
+ });
+ }
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ // All operations should complete without exceptions
+ Assert.All(results, r => Assert.True(r.Success && !r.ExceptionThrown,
+ $"Thread {r.ThreadIndex} {r.OperationType} failed: {r.ExceptionType}: {r.ExceptionMessage}"));
+ }
+
+ ///
+ /// **Feature: idempotency-thread-safety, Property 3a: Concurrent Reads Return Correct Values**
+ /// *For any* set of concurrent read operations on the LRU cache, each read should return
+ /// the correct value that was previously written for that key.
+ /// **Validates: Requirements 2.1**
+ ///
+ [Theory]
+ [InlineData(2)]
+ [InlineData(5)]
+ [InlineData(10)]
+ public void LRUCacheThreadSafety_ConcurrentReads_ShouldReturnCorrectValues(int concurrencyLevel)
+ {
+ var cache = new LRUCacheWrapper(100);
+
+ // Pre-populate cache with known values
+ for (int i = 0; i < concurrencyLevel * 5; i++)
+ {
+ cache.Set(i, $"value_{i}");
+ }
+
+ var results = new ConcurrentBag<(int key, string? value, bool found, bool correct)>();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int threadIndex = i;
+ tasks[i] = Task.Run(() =>
+ {
+ barrier.SignalAndWait();
+
+ // Each thread reads multiple keys
+ for (int j = 0; j < concurrencyLevel * 5; j++)
+ {
+ bool found = cache.TryGet(j, out var value);
+ bool correct = !found || value == $"value_{j}";
+ results.Add((j, value, found, correct));
+ }
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ // All reads that found a value should have the correct value
+ Assert.All(results, r => Assert.True(r.correct,
+ $"Key {r.key} returned incorrect value: expected 'value_{r.key}', got '{r.value}'"));
+ }
+
+ ///
+ /// **Feature: idempotency-thread-safety, Property 3b: Concurrent Writes Complete Without Corruption**
+ /// *For any* set of concurrent write operations on the LRU cache, all writes should
+ /// complete and the final state should be consistent (no corrupted entries).
+ /// **Validates: Requirements 2.2**
+ ///
+ [Theory]
+ [InlineData(2)]
+ [InlineData(5)]
+ [InlineData(10)]
+ public void LRUCacheThreadSafety_ConcurrentWrites_ShouldCompleteWithoutCorruption(int concurrencyLevel)
+ {
+ int keysPerThread = 10;
+
+ var cache = new LRUCacheWrapper(concurrencyLevel * keysPerThread);
+ var writtenValues = new ConcurrentDictionary();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+ var exceptions = new ConcurrentBag();
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int threadIndex = i;
+ tasks[i] = Task.Run(() =>
+ {
+ try
+ {
+ barrier.SignalAndWait();
+
+ for (int j = 0; j < keysPerThread; j++)
+ {
+ string key = $"thread_{threadIndex}_key_{j}";
+ int value = threadIndex * 1000 + j;
+ cache.Set(key, value);
+ writtenValues[key] = value;
+ }
+ }
+ catch (Exception ex)
+ {
+ exceptions.Add(ex);
+ }
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ Assert.Empty(exceptions);
+
+ // Verify all written values can be read back correctly
+ foreach (var kvp in writtenValues)
+ {
+ if (cache.TryGet(kvp.Key, out var value))
+ {
+ Assert.Equal(kvp.Value, value);
+ }
+ }
+ }
+
+ ///
+ /// **Feature: idempotency-thread-safety, Property 3c: Mixed Read/Write Operations Are Safe**
+ /// *For any* combination of concurrent read and write operations, the cache should
+ /// handle the concurrent access without throwing exceptions.
+ /// **Validates: Requirements 2.3**
+ ///
+ [Theory]
+ [InlineData(4)]
+ [InlineData(8)]
+ [InlineData(10)]
+ public void LRUCacheThreadSafety_MixedReadWrite_ShouldBeSafe(int concurrencyLevel)
+ {
+ int halfConcurrency = concurrencyLevel / 2;
+
+ var cache = new LRUCacheWrapper(50);
+ var exceptions = new ConcurrentBag();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ // Half threads write, half threads read
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int threadIndex = i;
+ bool isWriter = threadIndex < halfConcurrency;
+
+ tasks[i] = Task.Run(() =>
+ {
+ try
+ {
+ barrier.SignalAndWait();
+
+ for (int j = 0; j < 100; j++)
+ {
+ int key = j % 30; // Shared key space
+
+ if (isWriter)
+ {
+ cache.Set(key, $"value_{threadIndex}_{j}");
+ }
+ else
+ {
+ cache.TryGet(key, out _);
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ exceptions.Add(ex);
+ }
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ Assert.Empty(exceptions);
+ }
+
+ ///
+ /// **Feature: idempotency-thread-safety, Property 3d: Eviction Under Load Is Safe**
+ /// *For any* set of concurrent writes that exceed cache capacity, the eviction
+ /// mechanism should work correctly without corruption.
+ /// **Validates: Requirements 2.4**
+ ///
+ [Theory]
+ [InlineData(2)]
+ [InlineData(5)]
+ [InlineData(10)]
+ public void LRUCacheThreadSafety_EvictionUnderLoad_ShouldBeSafe(int concurrencyLevel)
+ {
+ int cacheCapacity = 20;
+ int keysPerThread = 50; // More keys than capacity to force eviction
+
+ var cache = new LRUCacheWrapper(cacheCapacity);
+ var exceptions = new ConcurrentBag();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int threadIndex = i;
+ tasks[i] = Task.Run(() =>
+ {
+ try
+ {
+ barrier.SignalAndWait();
+
+ for (int j = 0; j < keysPerThread; j++)
+ {
+ string key = $"key_{threadIndex}_{j}";
+ cache.Set(key, threadIndex * 1000 + j);
+ }
+ }
+ catch (Exception ex)
+ {
+ exceptions.Add(ex);
+ }
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ Assert.Empty(exceptions);
+
+ // Verify cache count doesn't exceed capacity
+ Assert.True(cache.Count <= cacheCapacity,
+ $"Cache count {cache.Count} exceeds capacity {cacheCapacity}");
+ }
+
+ #endregion
+
+ #region Additional Stress Tests
+
+ ///
+ /// High concurrency stress test for LRU cache operations.
+ ///
+ [Theory]
+ [InlineData(20, 200)]
+ [InlineData(30, 100)]
+ public void LRUCacheThreadSafety_HighConcurrency_ShouldCompleteWithoutExceptions(int concurrencyLevel, int operationsPerThread)
+ {
+ var cache = new LRUCacheWrapper(100);
+ var exceptions = new ConcurrentBag();
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int threadIndex = i;
+ tasks[i] = Task.Run(() =>
+ {
+ try
+ {
+ barrier.SignalAndWait();
+
+ for (int op = 0; op < operationsPerThread; op++)
+ {
+ int opType = (threadIndex + op) % 3;
+ string key = $"key_{op % 50}";
+
+ switch (opType)
+ {
+ case 0:
+ cache.Set(key, $"value_{threadIndex}_{op}");
+ break;
+ case 1:
+ cache.TryGet(key, out _);
+ break;
+ case 2:
+ cache.Delete(key);
+ break;
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ exceptions.Add(ex);
+ }
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ Assert.Empty(exceptions);
+ }
+
+ ///
+ /// Test that verifies LRU eviction order is maintained under concurrent access.
+ ///
+ [Fact]
+ public void LRUCacheThreadSafety_EvictionOrder_ShouldMaintainLRUProperty()
+ {
+ var cache = new LRUCacheWrapper(5);
+
+ // Add 5 items
+ for (int i = 0; i < 5; i++)
+ {
+ cache.Set(i, $"value_{i}");
+ }
+
+ // Access item 0 to make it most recently used
+ cache.TryGet(0, out _);
+
+ // Add a new item, which should evict item 1 (least recently used)
+ cache.Set(5, "value_5");
+
+ // Item 0 should still exist (was accessed)
+ Assert.True(cache.TryGet(0, out var value0));
+ Assert.Equal("value_0", value0);
+
+ // Item 1 should be evicted
+ Assert.False(cache.TryGet(1, out _));
+
+ // Item 5 should exist
+ Assert.True(cache.TryGet(5, out var value5));
+ Assert.Equal("value_5", value5);
+ }
+
+ #endregion
+}
diff --git a/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/LambdaContextIsolationTests.cs b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/LambdaContextIsolationTests.cs
new file mode 100644
index 000000000..d83de5fb9
--- /dev/null
+++ b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/LambdaContextIsolationTests.cs
@@ -0,0 +1,325 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+using Amazon.Lambda.Core;
+using Amazon.Lambda.TestUtilities;
+using Xunit;
+using IdempotencyLib = AWS.Lambda.Powertools.Idempotency;
+
+namespace AWS.Lambda.Powertools.ConcurrencyTests.Idempotency;
+
+///
+/// Tests for validating LambdaContext isolation in Powertools Idempotency
+/// under concurrent execution scenarios.
+///
+/// These tests verify that when multiple Lambda invocations run concurrently,
+/// each invocation's LambdaContext remains isolated from other invocations.
+///
+/// The Idempotency implementation uses AsyncLocal storage to ensure
+/// isolation between concurrent Lambda invocations.
+///
+[Collection("Idempotency Tests")]
+public class LambdaContextIsolationTests : IDisposable
+{
+ public LambdaContextIsolationTests()
+ {
+ // Configure Idempotency with a thread-safe in-memory store for testing
+ IdempotencyLib.Idempotency.Configure(builder => builder
+ .WithPersistenceStore(new ThreadSafeInMemoryPersistenceStore()));
+ }
+
+ public void Dispose()
+ {
+ // Clean up after tests
+ }
+
+ #region Helper Result Classes
+
+ private class ContextIsolationResult
+ {
+ public string InvocationId { get; set; } = string.Empty;
+ public int InvocationIndex { get; set; }
+ public string ExpectedFunctionName { get; set; } = string.Empty;
+ public string ActualFunctionName { get; set; } = string.Empty;
+ public bool ContextMatched { get; set; }
+ public bool ExceptionThrown { get; set; }
+ public string? ExceptionMessage { get; set; }
+ }
+
+ private class AsyncContextResult
+ {
+ public string InvocationId { get; set; } = string.Empty;
+ public string ExpectedFunctionName { get; set; } = string.Empty;
+ public string FunctionNameBeforeAwait { get; set; } = string.Empty;
+ public string FunctionNameAfterAwait { get; set; } = string.Empty;
+ public bool ContextPreservedAcrossAwait { get; set; }
+ public bool ExceptionThrown { get; set; }
+ public string? ExceptionMessage { get; set; }
+ }
+
+ #endregion
+
+ #region Property 2: LambdaContext Isolation
+
+ ///
+ /// **Feature: idempotency-thread-safety, Property 2: LambdaContext Isolation**
+ /// *For any* set of concurrent invocations registering different LambdaContext instances,
+ /// each invocation should be able to retrieve its own context without interference from other invocations.
+ /// **Validates: Requirements 1.2**
+ ///
+ [Theory]
+ [InlineData(2)]
+ [InlineData(3)]
+ [InlineData(5)]
+ [InlineData(10)]
+ [InlineData(20)]
+ public void LambdaContextIsolation_ConcurrentInvocations_ShouldMaintainSeparateContexts(
+ int concurrencyLevel)
+ {
+ var results = new ContextIsolationResult[concurrencyLevel];
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int invocationIndex = i;
+
+ tasks[i] = Task.Run(() =>
+ {
+ var invocationId = Guid.NewGuid().ToString("N");
+ var expectedFunctionName = $"Function_{invocationIndex}_{invocationId}";
+
+ var result = new ContextIsolationResult
+ {
+ InvocationId = invocationId,
+ InvocationIndex = invocationIndex,
+ ExpectedFunctionName = expectedFunctionName
+ };
+
+ try
+ {
+ // Create a unique context for this invocation
+ var context = new TestLambdaContext
+ {
+ FunctionName = expectedFunctionName,
+ AwsRequestId = invocationId,
+ RemainingTime = TimeSpan.FromMinutes(5)
+ };
+
+ // Wait for all threads to be ready
+ barrier.SignalAndWait();
+
+ // Register the context
+ IdempotencyLib.Idempotency.RegisterLambdaContext(context);
+
+ // Small delay to allow other threads to potentially interfere
+ Thread.Sleep(Random.Shared.Next(1, 10));
+
+ // Retrieve the context and verify it's the one we registered
+ var retrievedContext = IdempotencyLib.Idempotency.Instance.LambdaContext;
+ result.ActualFunctionName = retrievedContext?.FunctionName ?? "null";
+ result.ContextMatched = retrievedContext?.FunctionName == expectedFunctionName;
+ }
+ catch (Exception ex)
+ {
+ result.ExceptionThrown = true;
+ result.ExceptionMessage = $"{ex.GetType().Name}: {ex.Message}";
+ }
+
+ results[invocationIndex] = result;
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ // Verify no exceptions were thrown
+ Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage));
+
+ // Verify each invocation retrieved its own context
+ Assert.All(results, r => Assert.True(r.ContextMatched,
+ $"Invocation {r.InvocationIndex} expected '{r.ExpectedFunctionName}' but got '{r.ActualFunctionName}'"));
+ }
+
+ ///
+ /// **Feature: idempotency-thread-safety, Property 2b: LambdaContext Isolation with Multiple Reads**
+ /// *For any* set of concurrent invocations, multiple reads of the LambdaContext within the same
+ /// invocation should consistently return the same context.
+ /// **Validates: Requirements 1.2**
+ ///
+ [Theory]
+ [InlineData(2, 5)]
+ [InlineData(5, 10)]
+ [InlineData(10, 3)]
+ public void LambdaContextIsolation_MultipleReads_ShouldReturnConsistentContext(
+ int concurrencyLevel, int readsPerInvocation)
+ {
+ var results = new List[concurrencyLevel];
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int invocationIndex = i;
+ results[invocationIndex] = new List();
+
+ tasks[i] = Task.Run(() =>
+ {
+ var invocationId = Guid.NewGuid().ToString("N");
+ var expectedFunctionName = $"Function_{invocationIndex}_{invocationId}";
+
+ try
+ {
+ var context = new TestLambdaContext
+ {
+ FunctionName = expectedFunctionName,
+ AwsRequestId = invocationId,
+ RemainingTime = TimeSpan.FromMinutes(5)
+ };
+
+ barrier.SignalAndWait();
+
+ IdempotencyLib.Idempotency.RegisterLambdaContext(context);
+
+ // Perform multiple reads with small delays
+ for (int r = 0; r < readsPerInvocation; r++)
+ {
+ Thread.Sleep(Random.Shared.Next(1, 5));
+
+ var retrievedContext = IdempotencyLib.Idempotency.Instance.LambdaContext;
+ var result = new ContextIsolationResult
+ {
+ InvocationId = invocationId,
+ InvocationIndex = invocationIndex,
+ ExpectedFunctionName = expectedFunctionName,
+ ActualFunctionName = retrievedContext?.FunctionName ?? "null",
+ ContextMatched = retrievedContext?.FunctionName == expectedFunctionName
+ };
+
+ lock (results[invocationIndex])
+ {
+ results[invocationIndex].Add(result);
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ lock (results[invocationIndex])
+ {
+ results[invocationIndex].Add(new ContextIsolationResult
+ {
+ InvocationId = invocationId,
+ InvocationIndex = invocationIndex,
+ ExceptionThrown = true,
+ ExceptionMessage = $"{ex.GetType().Name}: {ex.Message}"
+ });
+ }
+ }
+ });
+ }
+
+ Task.WaitAll(tasks);
+
+ // Verify all reads returned the correct context
+ foreach (var invocationResults in results)
+ {
+ Assert.All(invocationResults, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage));
+ Assert.All(invocationResults, r => Assert.True(r.ContextMatched,
+ $"Invocation {r.InvocationIndex} expected '{r.ExpectedFunctionName}' but got '{r.ActualFunctionName}'"));
+ }
+ }
+
+ ///
+ /// **Feature: idempotency-thread-safety, Property 2c: LambdaContext Isolation Across Async Boundaries**
+ /// *For any* invocation with async operations, the LambdaContext should be preserved
+ /// across await points.
+ /// **Validates: Requirements 1.2**
+ ///
+ [Theory]
+ [InlineData(2)]
+ [InlineData(5)]
+ [InlineData(10)]
+ public async Task LambdaContextIsolation_AsyncOperations_ShouldPreserveContextAcrossAwait(
+ int concurrencyLevel)
+ {
+ var results = new AsyncContextResult[concurrencyLevel];
+ var barrier = new Barrier(concurrencyLevel);
+ var tasks = new Task[concurrencyLevel];
+
+ for (int i = 0; i < concurrencyLevel; i++)
+ {
+ int invocationIndex = i;
+
+ tasks[i] = Task.Run(async () =>
+ {
+ var invocationId = Guid.NewGuid().ToString("N");
+ var expectedFunctionName = $"AsyncFunction_{invocationIndex}_{invocationId}";
+
+ var result = new AsyncContextResult
+ {
+ InvocationId = invocationId,
+ ExpectedFunctionName = expectedFunctionName
+ };
+
+ try
+ {
+ var context = new TestLambdaContext
+ {
+ FunctionName = expectedFunctionName,
+ AwsRequestId = invocationId,
+ RemainingTime = TimeSpan.FromMinutes(5)
+ };
+
+ barrier.SignalAndWait();
+
+ IdempotencyLib.Idempotency.RegisterLambdaContext(context);
+
+ // Read before await
+ var contextBeforeAwait = IdempotencyLib.Idempotency.Instance.LambdaContext;
+ result.FunctionNameBeforeAwait = contextBeforeAwait?.FunctionName ?? "null";
+
+ // Simulate async operation
+ await Task.Delay(Random.Shared.Next(10, 50));
+
+ // Read after await
+ var contextAfterAwait = IdempotencyLib.Idempotency.Instance.LambdaContext;
+ result.FunctionNameAfterAwait = contextAfterAwait?.FunctionName ?? "null";
+
+ result.ContextPreservedAcrossAwait =
+ result.FunctionNameBeforeAwait == expectedFunctionName &&
+ result.FunctionNameAfterAwait == expectedFunctionName;
+ }
+ catch (Exception ex)
+ {
+ result.ExceptionThrown = true;
+ result.ExceptionMessage = $"{ex.GetType().Name}: {ex.Message}";
+ }
+
+ results[invocationIndex] = result;
+ });
+ }
+
+ await Task.WhenAll(tasks);
+
+ // Verify no exceptions were thrown
+ Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage));
+
+ // Verify context was preserved across await
+ Assert.All(results, r => Assert.True(r.ContextPreservedAcrossAwait,
+ $"Context not preserved: expected '{r.ExpectedFunctionName}', " +
+ $"before await: '{r.FunctionNameBeforeAwait}', after await: '{r.FunctionNameAfterAwait}'"));
+ }
+
+ #endregion
+}
diff --git a/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/ThreadSafeInMemoryPersistenceStore.cs b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/ThreadSafeInMemoryPersistenceStore.cs
new file mode 100644
index 000000000..e9ba2004d
--- /dev/null
+++ b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Idempotency/ThreadSafeInMemoryPersistenceStore.cs
@@ -0,0 +1,183 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+using System.Collections.Concurrent;
+using AWS.Lambda.Powertools.Idempotency.Persistence;
+
+namespace AWS.Lambda.Powertools.ConcurrencyTests.Idempotency;
+
+///
+/// A thread-safe in-memory persistence store for concurrency testing.
+/// This implementation wraps all dictionary operations with proper synchronization
+/// to ensure safe concurrent access from multiple threads.
+///
+/// This store is designed for testing purposes only and should not be used in production.
+///
+public class ThreadSafeInMemoryPersistenceStore : BasePersistenceStore
+{
+ ///
+ /// Thread-safe dictionary to store idempotency records.
+ /// Using ConcurrentDictionary for atomic operations.
+ ///
+ private readonly ConcurrentDictionary _records = new();
+
+ ///
+ /// Lock object for operations that require multiple steps to be atomic.
+ ///
+ private readonly object _lockObj = new();
+
+ ///
+ /// Counter for tracking the number of GetRecord operations (for testing purposes).
+ ///
+ private int _getRecordCount;
+
+ ///
+ /// Counter for tracking the number of PutRecord operations (for testing purposes).
+ ///
+ private int _putRecordCount;
+
+ ///
+ /// Counter for tracking the number of UpdateRecord operations (for testing purposes).
+ ///
+ private int _updateRecordCount;
+
+ ///
+ /// Counter for tracking the number of DeleteRecord operations (for testing purposes).
+ ///
+ private int _deleteRecordCount;
+
+ ///
+ /// Gets the number of GetRecord operations performed.
+ ///
+ public int GetRecordCount => _getRecordCount;
+
+ ///
+ /// Gets the number of PutRecord operations performed.
+ ///
+ public int PutRecordCount => _putRecordCount;
+
+ ///
+ /// Gets the number of UpdateRecord operations performed.
+ ///
+ public int UpdateRecordCount => _updateRecordCount;
+
+ ///
+ /// Gets the number of DeleteRecord operations performed.
+ ///
+ public int DeleteRecordCount => _deleteRecordCount;
+
+ ///
+ /// Gets the current number of records in the store.
+ ///
+ public int RecordCount => _records.Count;
+
+ ///
+ public override Task GetRecord(string idempotencyKey)
+ {
+ Interlocked.Increment(ref _getRecordCount);
+
+ _records.TryGetValue(idempotencyKey, out var record);
+ return Task.FromResult(record);
+ }
+
+ ///
+ public override Task PutRecord(DataRecord record, DateTimeOffset now)
+ {
+ Interlocked.Increment(ref _putRecordCount);
+
+ // Use AddOrUpdate to handle concurrent puts atomically
+ _records.AddOrUpdate(
+ record.IdempotencyKey,
+ record,
+ (_, _) => record);
+
+ return Task.CompletedTask;
+ }
+
+ ///
+ public override Task UpdateRecord(DataRecord record)
+ {
+ Interlocked.Increment(ref _updateRecordCount);
+
+ // Use AddOrUpdate to handle concurrent updates atomically
+ _records.AddOrUpdate(
+ record.IdempotencyKey,
+ record,
+ (_, _) => record);
+
+ return Task.CompletedTask;
+ }
+
+ ///
+ public override Task DeleteRecord(string idempotencyKey)
+ {
+ Interlocked.Increment(ref _deleteRecordCount);
+
+ _records.TryRemove(idempotencyKey, out _);
+ return Task.CompletedTask;
+ }
+
+ ///
+ /// Clears all records from the store.
+ /// Thread-safe operation.
+ ///
+ public void Clear()
+ {
+ _records.Clear();
+ }
+
+ ///
+ /// Resets all operation counters to zero.
+ /// Thread-safe operation.
+ ///
+ public void ResetCounters()
+ {
+ Interlocked.Exchange(ref _getRecordCount, 0);
+ Interlocked.Exchange(ref _putRecordCount, 0);
+ Interlocked.Exchange(ref _updateRecordCount, 0);
+ Interlocked.Exchange(ref _deleteRecordCount, 0);
+ }
+
+ ///
+ /// Gets all records currently in the store.
+ /// Returns a snapshot of the records at the time of the call.
+ ///
+ /// A dictionary containing all records.
+ public IDictionary GetAllRecords()
+ {
+ return new Dictionary(_records);
+ }
+
+ ///
+ /// Checks if a record exists for the given idempotency key.
+ ///
+ /// The idempotency key to check.
+ /// True if a record exists, false otherwise.
+ public bool ContainsKey(string idempotencyKey)
+ {
+ return _records.ContainsKey(idempotencyKey);
+ }
+
+ ///
+ /// Tries to get a record for the given idempotency key.
+ ///
+ /// The idempotency key to look up.
+ /// The record if found, null otherwise.
+ /// True if the record was found, false otherwise.
+ public bool TryGetRecord(string idempotencyKey, out DataRecord? record)
+ {
+ return _records.TryGetValue(idempotencyKey, out record);
+ }
+}
diff --git a/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/BasePersistenceStoreTests.cs b/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/BasePersistenceStoreTests.cs
index 1afe2824e..5e4cd9f94 100644
--- a/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/BasePersistenceStoreTests.cs
+++ b/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/BasePersistenceStoreTests.cs
@@ -592,4 +592,261 @@ public async Task ProcessExistingRecord_WhenValidRecord_ShouldReturnRecordAndSav
cache.TryGet("testFunction#5eff007a9ed2789a9f9f6bc182fc6ae6", out var cachedRecord).Should().BeTrue();
cachedRecord.Should().Be(existingRecord);
}
+
+ #region Configure Code Coverage Tests
+
+ [Fact]
+ public async Task Configure_WhenUseLocalCacheIsFalse_ShouldNotCreateCache()
+ {
+ // Arrange
+ var persistenceStore = new InMemoryPersistenceStore();
+ var request = LoadApiGatewayProxyRequest();
+
+ // Configure with UseLocalCache = false (default)
+ persistenceStore.Configure(new IdempotencyOptionsBuilder()
+ .WithUseLocalCache(false)
+ .Build(), null, null);
+
+ var now = DateTimeOffset.UtcNow;
+
+ // Act - SaveSuccess should work without cache
+ var product = new Product(34543, "product", 42);
+ await persistenceStore.SaveSuccess(JsonSerializer.SerializeToDocument(request)!, product, now);
+
+ // Assert - Record should be saved to persistence store
+ var dr = persistenceStore.DataRecord;
+ dr.Status.Should().Be(DataRecord.DataRecordStatus.COMPLETED);
+ dr.IdempotencyKey.Should().Be("testFunction#5eff007a9ed2789a9f9f6bc182fc6ae6");
+ persistenceStore.Status.Should().Be(2);
+ }
+
+ [Fact]
+ public async Task Configure_WhenUseLocalCacheIsTrue_ShouldCreateCacheWithPublicMethod()
+ {
+ // Arrange - This test covers the positive path: if (useLocalCache) { _cache = new LRUCache... }
+ // Using the PUBLIC Configure method (not the internal one with cache parameter)
+ var persistenceStore = new InMemoryPersistenceStore();
+ var request = LoadApiGatewayProxyRequest();
+
+ // Configure with UseLocalCache = true using the PUBLIC method
+ persistenceStore.Configure(new IdempotencyOptionsBuilder()
+ .WithUseLocalCache(true)
+ .Build(), null, null);
+
+ var now = DateTimeOffset.UtcNow;
+
+ // Act - SaveSuccess should save to the internally created cache
+ var product = new Product(34543, "product", 42);
+ await persistenceStore.SaveSuccess(JsonSerializer.SerializeToDocument(request)!, product, now);
+
+ // Assert - Record should be saved to persistence store
+ var dr = persistenceStore.DataRecord;
+ dr.Status.Should().Be(DataRecord.DataRecordStatus.COMPLETED);
+ persistenceStore.Status.Should().Be(2);
+
+ // Verify cache is working by getting the record (should come from cache, not persistence)
+ var record = await persistenceStore.GetRecord(JsonSerializer.SerializeToDocument(request)!, now);
+ record.Status.Should().Be(DataRecord.DataRecordStatus.COMPLETED);
+ // Status should still be 2 (not 0) because record came from cache, not from GetRecord override
+ persistenceStore.Status.Should().Be(2);
+ }
+
+ [Fact]
+ public async Task Configure_WhenKeyPrefixIsSet_ShouldUsePrefixAsFunctionName()
+ {
+ // Arrange - This test covers the positive path: if (!string.IsNullOrEmpty(keyPrefix)) { _functionName = keyPrefix; }
+ var persistenceStore = new InMemoryPersistenceStore();
+ var request = LoadApiGatewayProxyRequest();
+
+ // Configure with a non-empty keyPrefix
+ persistenceStore.Configure(new IdempotencyOptionsBuilder().Build(), "ignoredFunctionName", "MyKeyPrefix");
+
+ var now = DateTimeOffset.UtcNow;
+
+ // Act
+ await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, null);
+
+ // Assert - IdempotencyKey should use the keyPrefix, not the functionName
+ var dr = persistenceStore.DataRecord;
+ dr.IdempotencyKey.Should().StartWith("MyKeyPrefix#");
+ dr.IdempotencyKey.Should().NotContain("ignoredFunctionName");
+ }
+
+ [Fact]
+ public async Task Configure_WhenPayloadValidationJmesPathIsSet_ShouldEnablePayloadValidation()
+ {
+ // Arrange - This test covers the positive path: if (!string.IsNullOrWhiteSpace(...PayloadValidationJmesPath)) { PayloadValidationEnabled = true; }
+ var persistenceStore = new InMemoryPersistenceStore();
+ var request = LoadApiGatewayProxyRequest();
+
+ // Configure with a valid PayloadValidationJmesPath
+ persistenceStore.Configure(new IdempotencyOptionsBuilder()
+ .WithEventKeyJmesPath("powertools_json(Body).id")
+ .WithPayloadValidationJmesPath("powertools_json(Body).message")
+ .Build(), "myfunc", null);
+
+ var now = DateTimeOffset.UtcNow;
+
+ // Act
+ await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, null);
+
+ // Assert - PayloadHash should NOT be empty when validation IS enabled
+ var dr = persistenceStore.DataRecord;
+ dr.PayloadHash.Should().NotBeEmpty();
+ // The hash should be the MD5 of "Lambda rocks" (the message in the test payload)
+ dr.PayloadHash.Should().Be("70c24d88041893f7fbab4105b76fd9e1");
+ }
+
+ [Fact]
+ public async Task Configure_WhenKeyPrefixIsNull_ShouldUseFunctionNameFromEnvironment()
+ {
+ // Arrange
+ var persistenceStore = new InMemoryPersistenceStore();
+ var request = LoadApiGatewayProxyRequest();
+
+ // Configure with null keyPrefix - should use default function name
+ persistenceStore.Configure(new IdempotencyOptionsBuilder().Build(), "myFunction", null);
+
+ var now = DateTimeOffset.UtcNow;
+
+ // Act
+ await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, null);
+
+ // Assert - IdempotencyKey should include the function name
+ var dr = persistenceStore.DataRecord;
+ dr.IdempotencyKey.Should().Contain("myFunction");
+ dr.IdempotencyKey.Should().Be("testFunction.myFunction#5eff007a9ed2789a9f9f6bc182fc6ae6");
+ }
+
+ [Fact]
+ public async Task Configure_WhenKeyPrefixIsEmpty_ShouldUseFunctionNameFromEnvironment()
+ {
+ // Arrange
+ var persistenceStore = new InMemoryPersistenceStore();
+ var request = LoadApiGatewayProxyRequest();
+
+ // Configure with empty keyPrefix - should use default function name
+ persistenceStore.Configure(new IdempotencyOptionsBuilder().Build(), "anotherFunction", "");
+
+ var now = DateTimeOffset.UtcNow;
+
+ // Act
+ await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, null);
+
+ // Assert - IdempotencyKey should include the function name
+ var dr = persistenceStore.DataRecord;
+ dr.IdempotencyKey.Should().Contain("anotherFunction");
+ dr.IdempotencyKey.Should().Be("testFunction.anotherFunction#5eff007a9ed2789a9f9f6bc182fc6ae6");
+ }
+
+ [Fact]
+ public async Task Configure_WhenPayloadValidationJmesPathIsNull_ShouldNotEnablePayloadValidation()
+ {
+ // Arrange
+ var persistenceStore = new InMemoryPersistenceStore();
+ var request = LoadApiGatewayProxyRequest();
+
+ // Configure without PayloadValidationJmesPath
+ persistenceStore.Configure(new IdempotencyOptionsBuilder()
+ .WithEventKeyJmesPath("powertools_json(Body).id")
+ .Build(), "myfunc", null);
+
+ var now = DateTimeOffset.UtcNow;
+
+ // Act
+ await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, null);
+
+ // Assert - PayloadHash should be empty when validation is not enabled
+ var dr = persistenceStore.DataRecord;
+ dr.PayloadHash.Should().BeEmpty();
+ }
+
+ [Fact]
+ public async Task Configure_WhenPayloadValidationJmesPathIsEmpty_ShouldNotEnablePayloadValidation()
+ {
+ // Arrange
+ var persistenceStore = new InMemoryPersistenceStore();
+ var request = LoadApiGatewayProxyRequest();
+
+ // Configure with empty PayloadValidationJmesPath
+ persistenceStore.Configure(new IdempotencyOptionsBuilder()
+ .WithEventKeyJmesPath("powertools_json(Body).id")
+ .WithPayloadValidationJmesPath("")
+ .Build(), "myfunc", null);
+
+ var now = DateTimeOffset.UtcNow;
+
+ // Act
+ await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, null);
+
+ // Assert - PayloadHash should be empty when validation is not enabled
+ var dr = persistenceStore.DataRecord;
+ dr.PayloadHash.Should().BeEmpty();
+ }
+
+ [Fact]
+ public async Task Configure_WhenPayloadValidationJmesPathIsWhitespace_ShouldNotEnablePayloadValidation()
+ {
+ // Arrange
+ var persistenceStore = new InMemoryPersistenceStore();
+ var request = LoadApiGatewayProxyRequest();
+
+ // Configure with whitespace PayloadValidationJmesPath
+ persistenceStore.Configure(new IdempotencyOptionsBuilder()
+ .WithEventKeyJmesPath("powertools_json(Body).id")
+ .WithPayloadValidationJmesPath(" ")
+ .Build(), "myfunc", null);
+
+ var now = DateTimeOffset.UtcNow;
+
+ // Act
+ await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, null);
+
+ // Assert - PayloadHash should be empty when validation is not enabled
+ var dr = persistenceStore.DataRecord;
+ dr.PayloadHash.Should().BeEmpty();
+ }
+
+ [Fact]
+ public async Task Configure_WhenFunctionNameIsNullAndKeyPrefixIsNull_ShouldUseDefaultFunctionName()
+ {
+ // Arrange
+ var persistenceStore = new InMemoryPersistenceStore();
+ var request = LoadApiGatewayProxyRequest();
+
+ // Configure with both functionName and keyPrefix as null
+ persistenceStore.Configure(new IdempotencyOptionsBuilder().Build(), null, null);
+
+ var now = DateTimeOffset.UtcNow;
+
+ // Act
+ await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, null);
+
+ // Assert - IdempotencyKey should use default "testFunction"
+ var dr = persistenceStore.DataRecord;
+ dr.IdempotencyKey.Should().StartWith("testFunction#");
+ }
+
+ [Fact]
+ public async Task Configure_WhenFunctionNameIsWhitespace_ShouldUseDefaultFunctionNameOnly()
+ {
+ // Arrange
+ var persistenceStore = new InMemoryPersistenceStore();
+ var request = LoadApiGatewayProxyRequest();
+
+ // Configure with whitespace functionName
+ persistenceStore.Configure(new IdempotencyOptionsBuilder().Build(), " ", null);
+
+ var now = DateTimeOffset.UtcNow;
+
+ // Act
+ await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, null);
+
+ // Assert - IdempotencyKey should use default "testFunction" without appending whitespace
+ var dr = persistenceStore.DataRecord;
+ dr.IdempotencyKey.Should().StartWith("testFunction#");
+ dr.IdempotencyKey.Should().NotContain("testFunction. ");
+ }
+
+ #endregion
}
\ No newline at end of file
diff --git a/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/DynamoDBPersistenceStoreTests.cs b/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/DynamoDBPersistenceStoreTests.cs
index 09b4c781d..1a3a8b39b 100644
--- a/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/DynamoDBPersistenceStoreTests.cs
+++ b/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/DynamoDBPersistenceStoreTests.cs
@@ -281,15 +281,21 @@ await _client.PutItemAsync(new PutItemRequest
TableName = _tableName,
Item = item
});
- // enable payload validation
- _dynamoDbPersistenceStore.Configure(
+
+ // Create a new store instance with payload validation enabled
+ // (Configure is idempotent and thread-safe, so we need a fresh instance to change configuration)
+ var storeWithValidation = new DynamoDBPersistenceStoreBuilder()
+ .WithTableName(_tableName)
+ .WithDynamoDBClient(_client)
+ .Build();
+ storeWithValidation.Configure(
new IdempotencyOptionsBuilder().WithPayloadValidationJmesPath("path").Build(),
null, null);
// Act
expiry = now.AddSeconds(3600).ToUnixTimeSeconds();
var record = new DataRecord("key", DataRecord.DataRecordStatus.COMPLETED, expiry, "Fake result", "hash");
- await _dynamoDbPersistenceStore.UpdateRecord(record);
+ await storeWithValidation.UpdateRecord(record);
// Assert
var itemInDb = (await _client.GetItemAsync(new GetItemRequest