๐ Production-ready distributed coordination library for .NET Standard 2.0+ โ Distributed Lock (Redis-based, safe acquire/release, auto-expiration) โ Idempotency Helper (retry-safe execution, webhook protection, API patterns) โ Saga Coordination (step-by-step orchestration, crash recovery, idempotent steps) โ Multiple Backend Support (Redis, InMemory, Composite, Custom stores) โ Cancellation Token Support (graceful shutdown handling) โ DI-First Design (seamless ASP.NET Core integration) โ Unit Testing Ready (InMemoryKeyValueStore for testing) Perfect for microservices, distributed systems, background jobs, webhooks, message queues, and payment processing. Key Features: โข Production-ready Redis implementation with Lua scripts โข Thread-safe in-memory store for testing โข Complete IKeyValueStore interface with all essential operations โข TTL-based auto-release prevents deadlocks โข Lock ownership validation for safe release โข Atomic compare-and-delete operations โข Comprehensive examples and documentation .NET Standard 2.0 compatible โ works with .NET Core 2.0+, .NET 5+, .NET 6+, .NET 7+, .NET 8+, .NET 9+
$ dotnet add package Chd.CoordinationChd.Coordination is a .NET Standard 2.0 compatible, Redis-based, DI-first distributed coordination library.
It provides essential primitives for distributed locks, idempotency, and minimal saga coordination, designed for scalable microservices and distributed systems.
V2.0.4 โ Available
RedisKeyValueStore - Production-ready Redis implementationInMemoryKeyValueStore - Fast, lightweight, perfect for testingCompositeKeyValueStore - Flexible multi-backend compositionV2.x โ Available Now
Not in Scope
dotnet add package Chd.Coordination
For OpenTelemetry/Prometheus Metrics (Optional):
# For ASP.NET Core with Prometheus exporter
dotnet add package OpenTelemetry.Exporter.Prometheus.AspNetCore
dotnet add package OpenTelemetry.Extensions.Hosting
using OpenTelemetry.Metrics;
var builder = WebApplication.CreateBuilder(args);
// 1. Register coordination with metrics
builder.Services.AddCoordination(opt =>
{
opt.RedisConnectionString = "localhost:6379";
});
// 2. Enable OpenTelemetry metrics
builder.Services.AddCoordinationMetrics(); // <-- NEW!
// 3. Configure OpenTelemetry exporter
builder.Services.AddOpenTelemetry()
.WithMetrics(metrics => metrics
.AddMeter("Chd.Coordination") // <-- Add our meter
.AddPrometheusExporter());
var app = builder.Build();
// 4. Map Prometheus scrape endpoint
app.MapPrometheusScrapingEndpoint(); // /metrics endpoint
app.Run();
Lock Metrics:
chd_coordination_lock_acquired_total - Total locks acquiredchd_coordination_lock_failed_total - Failed lock attemptschd_coordination_lock_released_total - Locks releasedchd_coordination_lock_timeout_total - Lock acquisition timeoutschd_coordination_lock_acquisition_duration_seconds - Lock acquisition timechd_coordination_lock_contention_attempts - Retry attempts before successIdempotency Metrics:
chd_coordination_idempotency_hit_total - Cache hits (duplicate requests)chd_coordination_idempotency_miss_total - Cache misses (new requests)chd_coordination_idempotency_executed_total - Operations executedchd_coordination_idempotency_failed_total - Failed operationschd_coordination_idempotency_execution_duration_seconds - Execution timeSaga Metrics:
chd_coordination_saga_started_total - Sagas startedchd_coordination_saga_completed_total - Successfully completed sagaschd_coordination_saga_failed_total - Failed sagaschd_coordination_saga_compensated_total - Compensated stepschd_coordination_saga_duration_seconds - Total saga durationchd_coordination_saga_step_duration_seconds - Individual step duration# Lock acquisition rate (per second)
rate(chd_coordination_lock_acquired_total[5m])
# Average lock acquisition time
rate(chd_coordination_lock_acquisition_duration_seconds_sum[5m]) /
rate(chd_coordination_lock_acquisition_duration_seconds_count[5m])
# Lock failure percentage
(rate(chd_coordination_lock_failed_total[5m]) /
(rate(chd_coordination_lock_acquired_total[5m]) + rate(chd_coordination_lock_failed_total[5m]))) * 100
# Idempotency hit ratio
rate(chd_coordination_idempotency_hit_total[5m]) /
(rate(chd_coordination_idempotency_hit_total[5m]) + rate(chd_coordination_idempotency_miss_total[5m]))
# Top contended resources
topk(10, sum by (resource) (chd_coordination_lock_contention_attempts))
Prevents thundering herd problem by adding randomization to retry delays.
using Chd.Coordination.Policies;
// Use default exponential backoff (50 retries, 100ms-10s, with jitter)
var policy = ExponentialBackoffRetryPolicy.Default;
var result = await policy.ExecuteAsync(async ct =>
{
return await CallUnreliableApiAsync(ct);
});
var policy = new ExponentialBackoffRetryPolicy(
maxRetryCount: 20,
initialDelay: TimeSpan.FromMilliseconds(200),
maxDelay: TimeSpan.FromSeconds(5),
useJitter: true,
backoffMultiplier: 2.0);
await policy.ExecuteAsync(async ct =>
{
await ProcessPaymentAsync(ct);
});
var policy = RetryPolicyBuilder.CreateDefault()
.WithMaxRetryCount(30)
.WithInitialDelay(TimeSpan.FromMilliseconds(100))
.WithMaxDelay(TimeSpan.FromSeconds(15))
.WithJitter(enabled: true)
.WithBackoffMultiplier(2.5)
.Build();
await policy.ExecuteAsync(async ct =>
{
return await FetchDataAsync(ct);
});
Prevents cascading failures by opening circuit after consecutive failures.
var policy = RetryPolicyBuilder.CreateDefault()
.WithMaxRetryCount(10)
.WithCircuitBreaker(
failureThreshold: 5, // Open after 5 consecutive failures
resetTimeout: TimeSpan.FromMinutes(1)) // Try again after 1 minute
.BuildWithCircuitBreaker();
try
{
await policy.ExecuteAsync(async ct =>
{
await CallExternalServiceAsync(ct);
});
}
catch (InvalidOperationException ex) when (ex.Message.Contains("Circuit breaker"))
{
// Circuit is open - service is down
Console.WriteLine("Service unavailable, fast-failing");
}
// Fast: 10 retries, 50ms-1s (low latency scenarios)
var fast = ExponentialBackoffRetryPolicy.Fast;
// Default: 50 retries, 100ms-10s (balanced)
var balanced = ExponentialBackoffRetryPolicy.Default;
// Conservative: 20 retries, 500ms-30s (high contention)
var conservative = ExponentialBackoffRetryPolicy.Conservative;
var policy = ExponentialBackoffRetryPolicy.Default;
for (int attempt = 0; attempt < 5; attempt++)
{
var delay = policy.GetDelay(attempt);
Console.WriteLine($"Attempt {attempt}: {delay.TotalMilliseconds:F0}ms");
}
// Output (approximate with jitter):
// Attempt 0: 95ms
// Attempt 1: 220ms
// Attempt 2: 380ms
// Attempt 3: 850ms
// Attempt 4: 1640ms
using Chd.Coordination;
using Microsoft.Extensions.DependencyInjection;
// 1. Register services
services.AddCoordination(opt =>
{
opt.RedisConnectionString = "localhost:6379";
});
// 2. Inject and use
public class OrderService
{
private readonly ICoordinator _coordinator;
public OrderService(ICoordinator coordinator)
{
_coordinator = coordinator;
}
public async Task ProcessOrderAsync(string orderId)
{
// Distributed lock
await _coordinator.Lock.RunAsync(
$"order:{orderId}",
TimeSpan.FromSeconds(30),
async ct =>
{
// Only one instance processes this order at a time
await DoWorkAsync(orderId, ct);
});
}
}
using Chd.Coordination;
using Microsoft.Extensions.DependencyInjection;
// In your Startup.cs or Program.cs
services.AddCoordination(opt =>
{
opt.RedisConnectionString = "localhost:6379";
opt.DatabaseNumber = 0; // Optional: Redis database number
});
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddCoordination(opt =>
{
opt.RedisConnectionString = builder.Configuration
.GetConnectionString("Redis");
});
var app = builder.Build();
app.MapPost("/orders/{orderId}/process", async (
string orderId,
ICoordinator coordinator) =>
{
await coordinator.Lock.RunAsync(
$"order:{orderId}",
TimeSpan.FromSeconds(30),
async ct =>
{
// Process order
return Results.Ok($"Order {orderId} processed");
});
});
services.AddSingleton<IKeyValueStore, InMemoryKeyValueStore>();
services.AddCoordination();
await _coordinator.Lock.RunAsync(
key: "order:123",
ttl: TimeSpan.FromSeconds(30),
async ct =>
{
Console.WriteLine("Processing order 123...");
await UpdateInventoryAsync("order:123", ct);
});
public class OrderProcessor
{
private readonly ICoordinator _coordinator;
private readonly IOrderRepository _orderRepo;
private readonly ILogger<OrderProcessor> _logger;
public async Task<bool> ProcessOrderAsync(
string orderId,
CancellationToken ct = default)
{
var lockKey = $"order:process:{orderId}";
try
{
await _coordinator.Lock.RunAsync(
lockKey,
TimeSpan.FromMinutes(5),
async lockCt =>
{
var order = await _orderRepo.GetAsync(orderId, lockCt);
if (order.Status != OrderStatus.Pending)
{
throw new InvalidOperationException(
$"Order {orderId} already processed");
}
// Process payment
await ChargePaymentAsync(order, lockCt);
// Update inventory
await DecrementStockAsync(order.Items, lockCt);
// Update order status
order.Status = OrderStatus.Completed;
await _orderRepo.UpdateAsync(order, lockCt);
_logger.LogInformation(
"Order {OrderId} processed successfully", orderId);
},
ct);
return true;
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to process order {OrderId}", orderId);
return false;
}
}
}
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(10));
try
{
await _coordinator.Lock.RunAsync(
"resource:critical",
TimeSpan.FromSeconds(30),
async ct =>
{
await LongRunningOperationAsync(ct);
},
cancellationToken: cts.Token);
}
catch (OperationCanceledException)
{
Console.WriteLine("Operation was cancelled");
}
public class ScheduledJobService : BackgroundService
{
private readonly ICoordinator _coordinator;
private readonly ILogger<ScheduledJobService> _logger;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
// Ensure only one instance runs this job
await _coordinator.Lock.RunAsync(
"scheduled:daily-report",
TimeSpan.FromMinutes(30),
async ct =>
{
_logger.LogInformation("Generating daily report...");
await GenerateDailyReportAsync(ct);
},
stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to generate daily report");
}
await Task.Delay(TimeSpan.FromHours(24), stoppingToken);
}
}
}
await _coordinator.Idempotency.RunAsync(
key: "payment:order:123",
ttl: TimeSpan.FromMinutes(10),
async () =>
{
await ProcessPaymentAsync();
});
public class PaymentService
{
private readonly ICoordinator _coordinator;
private readonly IPaymentGateway _paymentGateway;
private readonly ILogger<PaymentService> _logger;
public async Task<PaymentResult> ProcessPaymentAsync(
string orderId,
decimal amount,
string customerId)
{
var idempotencyKey = $"payment:{orderId}:{customerId}";
var result = new PaymentResult();
await _coordinator.Idempotency.RunAsync(
key: idempotencyKey,
ttl: TimeSpan.FromHours(24),
async () =>
{
_logger.LogInformation(
"Processing payment for order {OrderId}", orderId);
var response = await _paymentGateway.ChargeAsync(
customerId,
amount,
$"Order {orderId}");
result.TransactionId = response.TransactionId;
result.Success = response.IsSuccess;
result.Message = response.Message;
_logger.LogInformation(
"Payment processed: {TransactionId}",
response.TransactionId);
});
return result;
}
}
[ApiController]
[Route("api/webhooks")]
public class WebhookController : ControllerBase
{
private readonly ICoordinator _coordinator;
private readonly ILogger<WebhookController> _logger;
[HttpPost("payment")]
public async Task<IActionResult> HandlePaymentWebhook(
[FromBody] PaymentWebhookDto webhook,
[FromHeader(Name = "X-Webhook-Id")] string webhookId)
{
try
{
await _coordinator.Idempotency.RunAsync(
key: $"webhook:payment:{webhookId}",
ttl: TimeSpan.FromDays(7),
async () =>
{
_logger.LogInformation(
"Processing webhook {WebhookId}", webhookId);
await ProcessPaymentWebhookAsync(webhook);
});
return Ok(new { processed = true });
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to process webhook {WebhookId}", webhookId);
return StatusCode(500);
}
}
}
public class MessageConsumer
{
private readonly ICoordinator _coordinator;
private readonly ILogger<MessageConsumer> _logger;
public async Task HandleMessageAsync(Message message)
{
await _coordinator.Idempotency.RunAsync(
key: $"message:{message.Id}",
ttl: TimeSpan.FromHours(1),
async () =>
{
_logger.LogInformation(
"Processing message {MessageId}", message.Id);
await ProcessMessageAsync(message);
});
}
private async Task ProcessMessageAsync(Message message)
{
// Your message processing logic
await Task.Delay(100);
}
}
[HttpPost("orders")]
public async Task<IActionResult> CreateOrder(
[FromBody] CreateOrderDto dto,
[FromHeader(Name = "Idempotency-Key")] string idempotencyKey)
{
if (string.IsNullOrEmpty(idempotencyKey))
{
return BadRequest("Idempotency-Key header is required");
}
var orderId = string.Empty;
await _coordinator.Idempotency.RunAsync(
key: $"order:create:{idempotencyKey}",
ttl: TimeSpan.FromHours(24),
async () =>
{
orderId = await CreateOrderInternalAsync(dto);
});
return CreatedAtAction(
nameof(GetOrder),
new { id = orderId },
new { orderId });
}
await _coordinator.Saga.RunAsync("order:123", async saga =>
{
await saga.Step("reserve-stock", ReserveStockAsync);
await saga.Step("charge-payment", ChargePaymentAsync);
await saga.Step("notify-user", NotifyUserAsync);
});
public class OrderSagaCoordinator
{
private readonly ICoordinator _coordinator;
private readonly IInventoryService _inventory;
private readonly IPaymentService _payment;
private readonly IShippingService _shipping;
private readonly INotificationService _notification;
private readonly ILogger<OrderSagaCoordinator> _logger;
public async Task<OrderResult> CreateOrderAsync(CreateOrderRequest request)
{
var orderId = Guid.NewGuid().ToString();
var sagaId = $"order:saga:{orderId}";
try
{
await _coordinator.Saga.RunAsync(sagaId, async saga =>
{
// Step 1: Reserve inventory
await saga.Step("reserve-inventory", async () =>
{
_logger.LogInformation(
"Reserving inventory for order {OrderId}", orderId);
await _inventory.ReserveAsync(orderId, request.Items);
});
// Step 2: Process payment
await saga.Step("process-payment", async () =>
{
_logger.LogInformation(
"Processing payment for order {OrderId}", orderId);
await _payment.ChargeAsync(
orderId,
request.CustomerId,
request.TotalAmount);
});
// Step 3: Create shipment
await saga.Step("create-shipment", async () =>
{
_logger.LogInformation(
"Creating shipment for order {OrderId}", orderId);
await _shipping.CreateShipmentAsync(
orderId,
request.ShippingAddress);
});
// Step 4: Send notifications
await saga.Step("send-notifications", async () =>
{
_logger.LogInformation(
"Sending notifications for order {OrderId}", orderId);
await _notification.SendOrderConfirmationAsync(
request.CustomerId,
orderId);
});
});
return OrderResult.Success(orderId);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Order saga failed for {OrderId}", orderId);
await CompensateFailedOrderAsync(orderId);
return OrderResult.Failed(ex.Message);
}
}
private async Task CompensateFailedOrderAsync(string orderId)
{
// Rollback operations
await _inventory.ReleaseReservationAsync(orderId);
await _payment.RefundAsync(orderId);
}
}
public class DataImportSaga
{
private readonly ICoordinator _coordinator;
public async Task ProcessLargeDataImportAsync(
string importId,
List<DataBatch> batches)
{
var sagaId = $"import:{importId}";
await _coordinator.Saga.RunAsync(sagaId, async saga =>
{
await saga.Step("validate-data", async () =>
{
await ValidateDataAsync(batches);
});
await saga.Step("transform-data", async () =>
{
await TransformDataAsync(batches);
});
// Process in batches - each is idempotent
for (int i = 0; i < batches.Count; i++)
{
var stepName = $"process-batch-{i}";
var batchIndex = i; // Capture for closure
await saga.Step(stepName, async () =>
{
await ProcessBatchAsync(batches[batchIndex]);
Console.WriteLine(
$"Batch {batchIndex + 1}/{batches.Count} completed");
});
}
await saga.Step("finalize-import", async () =>
{
await FinalizeImportAsync(importId);
});
});
}
}
public class TravelBookingSaga
{
private readonly ICoordinator _coordinator;
public async Task BookTravelAsync(TravelBookingRequest request)
{
var bookingId = Guid.NewGuid().ToString();
await _coordinator.Saga.RunAsync($"travel:{bookingId}", async saga =>
{
await saga.Step("book-flight", async () =>
{
await BookFlightAsync(request.FlightDetails);
});
await saga.Step("book-hotel", async () =>
{
await BookHotelAsync(request.HotelDetails);
});
await saga.Step("book-car-rental", async () =>
{
await BookCarRentalAsync(request.CarRentalDetails);
});
await saga.Step("process-payment", async () =>
{
await ProcessTravelPaymentAsync(bookingId, request.Total);
});
await saga.Step("send-confirmation", async () =>
{
await SendBookingConfirmationAsync(bookingId, request.Email);
});
});
}
}
// Create a new coordination context
var context = CoordinationContext.Create();
Console.WriteLine($"Correlation ID: {context.CorrelationId}");
// Create context with lock key
var lockedContext = new CoordinationContext(
context.CorrelationId,
lockKey: "order:123");
// Create context with saga ID
var sagaContext = new CoordinationContext(
context.CorrelationId,
sagaId: "order:123");
[ApiController]
[Route("api/orders")]
public class OrderController : ControllerBase
{
private readonly ICoordinator _coordinator;
private readonly ILogger<OrderController> _logger;
[HttpPost]
public async Task<IActionResult> CreateOrder([FromBody] CreateOrderDto dto)
{
// Create context with correlation ID from request
var correlationId = HttpContext.TraceIdentifier;
var context = new CoordinationContext(correlationId);
_logger.LogInformation(
"Creating order with correlation ID: {CorrelationId}",
context.CorrelationId);
try
{
var orderId = await ProcessOrderWithContextAsync(dto, context);
return Ok(new
{
orderId,
correlationId = context.CorrelationId
});
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to create order. CorrelationId: {CorrelationId}",
context.CorrelationId);
return StatusCode(500);
}
}
private async Task<string> ProcessOrderWithContextAsync(
CreateOrderDto dto,
CoordinationContext context)
{
var orderId = Guid.NewGuid().ToString();
var lockContext = new CoordinationContext(
context.CorrelationId,
lockKey: $"order:create:{dto.CustomerId}");
await _coordinator.Lock.RunAsync(
lockContext.LockKey,
TimeSpan.FromSeconds(30),
async ct =>
{
_logger.LogInformation(
"Processing order {OrderId} - CorrelationId: {CorrelationId}",
orderId,
context.CorrelationId);
await SaveOrderAsync(orderId, dto, ct);
});
return orderId;
}
}
services.AddCoordination(opt =>
{
opt.RedisConnectionString = "localhost:6379";
opt.DatabaseNumber = 0;
});
Features:
services.AddSingleton<IKeyValueStore, InMemoryKeyValueStore>();
services.AddCoordination();
Features:
ConcurrentDictionaryvar primary = new RedisKeyValueStore(redisDb);
var composite = new CompositeKeyValueStore(primary);
services.AddSingleton<IKeyValueStore>(composite);
Features:
public class MyCustomStore : IKeyValueStore
{
private readonly IMyDatabase _database;
public MyCustomStore(IMyDatabase database)
{
_database = database;
}
public async Task<bool> TryAcquireAsync(
string key,
string value,
TimeSpan ttl,
CancellationToken ct = default)
{
return await _database.SetIfNotExistsAsync(key, value, ttl, ct);
}
public async Task<bool> ReleaseAsync(
string key,
string value,
CancellationToken ct = default)
{
return await CompareAndDeleteAsync(key, value, ct);
}
public async Task<bool> ExistsAsync(
string key,
CancellationToken ct = default)
{
return await _database.KeyExistsAsync(key, ct);
}
public async Task SetAsync(
string key,
string value,
TimeSpan ttl,
CancellationToken ct = default)
{
await _database.SetAsync(key, value, ttl, ct);
}
public async Task<bool> TrySetAsync(
string key,
string value,
TimeSpan ttl,
CancellationToken ct = default)
{
return await _database.SetIfNotExistsAsync(key, value, ttl, ct);
}
public async Task<bool> CompareAndDeleteAsync(
string key,
string expectedValue,
CancellationToken ct = default)
{
return await _database.DeleteIfMatchAsync(key, expectedValue, ct);
}
public async Task<string?> GetAsync(
string key,
CancellationToken ct = default)
{
return await _database.GetAsync(key, ct);
}
}
// Register in DI
services.AddSingleton<IKeyValueStore, MyCustomStore>();
services.AddCoordination();
public class ComplexOrderProcessor
{
private readonly ICoordinator _coordinator;
public async Task ProcessComplexOrderAsync(string orderId, string customerId)
{
// Use lock to prevent concurrent processing
await _coordinator.Lock.RunAsync(
$"order:lock:{orderId}",
TimeSpan.FromMinutes(5),
async lockCt =>
{
// Use idempotency for payment
await _coordinator.Idempotency.RunAsync(
$"payment:{orderId}",
TimeSpan.FromHours(24),
async () =>
{
await ProcessPaymentAsync(orderId);
});
// Use saga for multi-step process
await _coordinator.Saga.RunAsync(
$"order:saga:{orderId}",
async saga =>
{
await saga.Step("allocate-inventory",
() => AllocateInventoryAsync(orderId));
await saga.Step("generate-invoice",
() => GenerateInvoiceAsync(orderId));
await saga.Step("schedule-shipping",
() => ScheduleShippingAsync(orderId));
});
});
}
}
public class ResilientProcessor
{
private readonly ICoordinator _coordinator;
private readonly ILogger<ResilientProcessor> _logger;
public async Task<bool> ProcessWithRetryAsync(
string resourceId,
int maxRetries = 3)
{
int retryCount = 0;
while (retryCount < maxRetries)
{
try
{
await _coordinator.Lock.RunAsync(
$"resource:{resourceId}",
TimeSpan.FromSeconds(30),
async ct =>
{
await ProcessResourceAsync(resourceId, ct);
});
return true;
}
catch (Exception ex)
{
retryCount++;
_logger.LogWarning(ex,
"Attempt {RetryCount}/{MaxRetries} failed",
retryCount, maxRetries);
if (retryCount >= maxRetries)
{
_logger.LogError(
"All retry attempts exhausted for {ResourceId}",
resourceId);
throw;
}
// Exponential backoff
var delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount));
await Task.Delay(delay);
}
}
return false;
}
}
public class BatchProcessor
{
private readonly ICoordinator _coordinator;
public async Task ProcessBatchInParallelAsync(List<string> items)
{
var tasks = items.Select(item => ProcessItemAsync(item));
await Task.WhenAll(tasks);
}
private async Task ProcessItemAsync(string item)
{
// Each item gets its own lock
await _coordinator.Lock.RunAsync(
$"item:{item}",
TimeSpan.FromMinutes(1),
async ct =>
{
await ProcessSingleItemAsync(item, ct);
});
}
}
public class OrderServiceTests
{
private readonly ServiceProvider _serviceProvider;
private readonly ICoordinator _coordinator;
private readonly OrderService _orderService;
public OrderServiceTests()
{
var services = new ServiceCollection();
// Use in-memory store for testing
services.AddSingleton<IKeyValueStore, InMemoryKeyValueStore>();
services.AddCoordination();
services.AddLogging();
// Add your services
services.AddTransient<OrderService>();
_serviceProvider = services.BuildServiceProvider();
_coordinator = _serviceProvider.GetRequiredService<ICoordinator>();
_orderService = _serviceProvider.GetRequiredService<OrderService>();
}
[Fact]
public async Task ProcessOrder_WithLock_PreventsConcurrentExecution()
{
var orderId = "test-order-123";
var executionCount = 0;
// Try to execute same order twice concurrently
var task1 = _coordinator.Lock.RunAsync(
$"order:{orderId}",
TimeSpan.FromSeconds(2),
async ct =>
{
Interlocked.Increment(ref executionCount);
await Task.Delay(1000, ct);
});
var task2 = _coordinator.Lock.RunAsync(
$"order:{orderId}",
TimeSpan.FromSeconds(2),
async ct =>
{
Interlocked.Increment(ref executionCount);
await Task.Delay(1000, ct);
});
await Task.WhenAll(task1, task2);
// Only one should have executed
Assert.Equal(1, executionCount);
}
[Fact]
public async Task Idempotency_PreventsDuplicateExecution()
{
var paymentId = "payment-123";
var executionCount = 0;
// Execute twice with same key
await _coordinator.Idempotency.RunAsync(
$"payment:{paymentId}",
TimeSpan.FromMinutes(1),
async () =>
{
Interlocked.Increment(ref executionCount);
await Task.CompletedTask;
});
await _coordinator.Idempotency.RunAsync(
$"payment:{paymentId}",
TimeSpan.FromMinutes(1),
async () =>
{
Interlocked.Increment(ref executionCount);
await Task.CompletedTask;
});
// Should only execute once
Assert.Equal(1, executionCount);
}
[Fact]
public async Task Saga_ExecutesAllSteps_InOrder()
{
var steps = new List<string>();
var sagaId = "saga-123";
await _coordinator.Saga.RunAsync(sagaId, async saga =>
{
await saga.Step("step1", async () =>
{
steps.Add("step1");
await Task.CompletedTask;
});
await saga.Step("step2", async () =>
{
steps.Add("step2");
await Task.CompletedTask;
});
await saga.Step("step3", async () =>
{
steps.Add("step3");
await Task.CompletedTask;
});
});
Assert.Equal(new[] { "step1", "step2", "step3" }, steps);
}
public void Dispose()
{
_serviceProvider?.Dispose();
}
}
IServiceCollectionEnhanced IKeyValueStore Interface:
TryAcquireAsync() - Acquire distributed locks with TTLReleaseAsync() - Safely release locks with owner validationExistsAsync() - Check key existence with TTL awarenessSetAsync() - Set key-value pairs with TTLTrySetAsync() - Conditional set (SET NX)GetAsync() - Retrieve valuesCompareAndDeleteAsync() - Atomic compare-and-delete operationsProduction-Ready InMemoryKeyValueStore:
ConcurrentDictionaryComplete RedisKeyValueStore Implementation:
CancellationToken support throughoutCompositeKeyValueStore:
IKeyValueStore interfaceInMemoryKeyValueStore enables easy unit testingCancellationTokenContributions are welcome! Please feel free to submit a Pull Request.
This project is licensed under the MIT License.
๐ Documentation & Code Quality Update
None - This is a documentation-only update!
Simply update your package reference:
dotnet add package Chd.Coordination --version 2.0.6
No code changes required. Fully backward compatible with 2.0.5.
Made with โค๏ธ by Mehmet Yoldaล