HashiCorp Consul provider for MultiLock - Leader Election framework for distributed systems. This package includes the core MultiLock framework.
$ dotnet add package MultiLock.ConsulA comprehensive .NET framework for implementing the Leader Election pattern with support for multiple providers including Azure Blob Storage, SQL Server, Redis, File System, In-Memory, Consul, and ZooKeeper.
┌────────────────────────────────────────────────────────────────────────────┐
│ APPLICATIONS │
│ │
│ ┌──────────────┐ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Sample App │ │ Multi-Provider │ │ Your Application │ │
│ │ │ │ Demo │ │ │ │
│ └──────┬───────┘ └────────┬─────────┘ └────────┬─────────┘ │
│ │ │ │ │
│ └─────────────────────┼───────────────────────┘ │
│ │ │
└─────────────────────────────────┼──────────────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────────────────────┐
│ CORE FRAMEWORK │
│ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ ILeaderElectionService (Interface) │ │
│ │ • StartAsync() / StopAsync() │ │
│ │ • IsLeader / GetCurrentLeaderAsync() │ │
│ │ • GetLeadershipChangesAsync() │ │
│ └────────────────────────┬─────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ LeaderElectionService (Implementation) │ │
│ │ • Election Logic │ │
│ │ • Heartbeat Monitoring │ │
│ │ • Event Publishing │ │
│ └────────────────────────┬─────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ ILeaderElectionProvider (Interface) │ │
│ │ • TryAcquireLeadershipAsync() │ │
│ │ • ReleaseLeadershipAsync() │ │
│ │ • UpdateHeartbeatAsync() │ │
│ └────────────────────────┬─────────────────────────────────────┘ │
│ │ │
└─────────────────────────────┼──────────────────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────────────────────┐
│ PROVIDERS │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ InMemory │ │ FileSystem │ │ SQL Server │ │ PostgreSQL │ │
│ │ │ │ │ │ │ │ │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Azure Blob │ │ Redis │ │ Consul │ │ ZooKeeper │ │
│ │ Storage │ │ │ │ │ │ │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────────┘
| Provider | Package | Description |
|---|---|---|
| Azure Blob Storage | MultiLock.AzureBlobStorage | Uses Azure Blob Storage for coordination |
| SQL Server | MultiLock.SqlServer | Uses SQL Server database for coordination |
| PostgreSQL | MultiLock.PostgreSQL | Uses PostgreSQL database for coordination |
| Redis | MultiLock.Redis | Uses Redis for coordination |
| File System | MultiLock.FileSystem | Uses local file system (single machine) |
| In-Memory | MultiLock.InMemory | In-memory provider for testing |
| Consul | MultiLock.Consul | Uses HashiCorp Consul for coordination |
| ZooKeeper | MultiLock.ZooKeeper | Uses Apache ZooKeeper for coordination |
# For SQL Server
dotnet add package MultiLock.SqlServer
# For Redis
dotnet add package MultiLock.Redis
# For PostgreSQL
dotnet add package MultiLock.PostgreSQL
# For In-Memory (testing)
dotnet add package MultiLock.InMemory
using MultiLock;
using MultiLock.InMemory;
var builder = WebApplication.CreateBuilder(args);
// Add leader election with In-Memory provider
builder.Services.AddLeaderElection<InMemoryLeaderElectionProvider>(options =>
{
options.ElectionGroup = "my-service";
options.HeartbeatInterval = TimeSpan.FromSeconds(30);
options.HeartbeatTimeout = TimeSpan.FromSeconds(90);
});
var app = builder.Build();
public class MyBackgroundService : BackgroundService
{
private readonly ILeaderElectionService _leaderElection;
private readonly ILogger<MyBackgroundService> _logger;
public MyBackgroundService(
ILeaderElectionService leaderElection,
ILogger<MyBackgroundService> logger)
{
_leaderElection = leaderElection;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// Start listening to leadership changes using AsyncEnumerable
var leadershipTask = Task.Run(async () =>
{
await foreach (var change in _leaderElection.GetLeadershipChangesAsync(stoppingToken))
{
if (change.BecameLeader)
{
_logger.LogInformation("🎉 Leadership acquired!");
}
else if (change.LostLeadership)
{
_logger.LogWarning("😞 Leadership lost!");
}
}
}, stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
if (_leaderElection.IsLeader)
{
// Perform leader-only work
_logger.LogInformation("🏆 Performing leader work...");
await DoLeaderWork(stoppingToken);
}
else
{
// Perform follower work or wait
_logger.LogInformation("👥 Waiting as follower...");
await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
}
}
// Wait for leadership monitoring to complete
try
{
await leadershipTask;
}
catch (OperationCanceledException)
{
// Expected when stopping
}
}
private async Task DoLeaderWork(CancellationToken cancellationToken)
{
// Your leader-specific logic here
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
}
}
The leader election service provides a modern AsyncEnumerable API for consuming leadership change events:
// Subscribe to all leadership changes
await foreach (var change in leaderElection.GetLeadershipChangesAsync(cancellationToken))
{
if (change.BecameLeader)
{
Console.WriteLine("I am now the leader!");
}
else if (change.LostLeadership)
{
Console.WriteLine("I lost leadership!");
}
}
// Only receive leadership acquired events
await foreach (var change in leaderElection.GetLeadershipChangesAsync(
LeadershipEventType.Acquired, cancellationToken))
{
Console.WriteLine("Leadership acquired!");
}
// Receive both acquired and lost events
await foreach (var change in leaderElection.GetLeadershipChangesAsync(
LeadershipEventType.Acquired | LeadershipEventType.Lost, cancellationToken))
{
// Handle events
}
The framework provides powerful LINQ-style extension methods:
// Take only the first leadership acquisition
await foreach (var change in leaderElection.GetLeadershipChangesAsync(cancellationToken)
.TakeUntilLeader(cancellationToken))
{
Console.WriteLine("Waiting for leadership...");
}
// Process events while we are the leader
await foreach (var change in leaderElection.GetLeadershipChangesAsync(cancellationToken)
.WhileLeader(cancellationToken))
{
Console.WriteLine("Still the leader!");
}
// Execute callbacks on leadership transitions
await leaderElection.GetLeadershipChangesAsync(cancellationToken)
.OnLeadershipTransition(
onAcquired: e => Console.WriteLine("Acquired!"),
onLost: e => Console.WriteLine("Lost!"),
cancellationToken);
// Combine multiple extension methods
await foreach (var change in leaderElection.GetLeadershipChangesAsync(cancellationToken)
.Where(e => e.BecameLeader || e.LostLeadership, cancellationToken)
.DistinctUntilChanged(cancellationToken)
.Take(5, cancellationToken))
{
// Process filtered events
}
The MultiLock framework is designed to be fully thread-safe and handle concurrent operations gracefully:
StartAsync(), StopAsync(), IsLeaderAsync(), and other methods from multiple threads concurrently.IDisposable and IAsyncDisposable with proper coordination to prevent race conditions during shutdown.ObjectDisposedException and ensure clean shutdown.All provider implementations are thread-safe and support concurrent operations:
GetCurrentLeaderAsync() and IsLeaderAsync() can be called concurrently without issues.The framework has been extensively tested for various concurrent scenarios:
When multiple participants attempt to acquire leadership simultaneously:
// Safe to run from multiple instances/threads
var tasks = Enumerable.Range(0, 10).Select(async i =>
{
var service = serviceProvider.GetRequiredService<ILeaderElectionService>();
await service.StartAsync();
// Only one will become leader
}).ToArray();
await Task.WhenAll(tasks);
Exactly one participant will become the leader. All others will fail gracefully and continue monitoring for leadership opportunities.
The framework handles rapid leadership transitions without race conditions:
// Safe to rapidly acquire and release leadership
for (int i = 0; i < 100; i++)
{
await service.StartAsync();
await Task.Delay(10);
await service.StopAsync();
}
Each cycle completes cleanly without resource leaks or state corruption.
Multiple threads can safely update heartbeats:
// Safe concurrent heartbeat updates
var tasks = Enumerable.Range(0, 50).Select(async i =>
{
await provider.UpdateHeartbeatAsync("election-group", "participant-id");
}).ToArray();
await Task.WhenAll(tasks);
All heartbeat updates are processed atomically. The last update wins.
When a leader fails and multiple participants compete to take over:
// Current leader releases leadership
await currentLeader.StopAsync();
// Multiple waiting participants compete
// Only one will successfully acquire leadership
Clean failover without split-brain scenarios. Exactly one new leader is elected.
services.AddSingleton<ILeaderElectionService, LeaderElectionService>();
Avoid manual synchronization: The framework handles all necessary synchronization internally. Don't wrap calls in your own locks.
Use async disposal: When shutting down, prefer DisposeAsync() over Dispose() for providers that support it (Consul, ZooKeeper).
await using var provider = new ConsulLeaderElectionProvider(options, logger);
// Provider will be properly disposed asynchronously
CancellationToken to async methods and handle OperationCanceledException.try
{
await service.StartAsync(cancellationToken);
}
catch (OperationCanceledException)
{
// Clean shutdown requested
}
GetLeadershipChangesAsync() API to react to leadership transitions rather than polling.await foreach (var change in service.GetLeadershipChangesAsync(cancellationToken))
{
if (change.BecameLeader)
{
// Start leader-only work
}
else if (change.LostLeadership)
{
// Stop leader-only work
}
}
builder.Services.AddSqlServerLeaderElection(
connectionString: "Server=localhost;Database=MyApp;Trusted_Connection=true;",
options =>
{
options.ElectionGroup = "my-service";
options.HeartbeatInterval = TimeSpan.FromSeconds(30);
options.HeartbeatTimeout = TimeSpan.FromSeconds(90);
});
builder.Services.AddPostgreSQLLeaderElection(
connectionString: "Host=localhost;Database=MyApp;Username=user;Password=password;",
options =>
{
options.ElectionGroup = "my-service";
options.HeartbeatInterval = TimeSpan.FromSeconds(30);
options.HeartbeatTimeout = TimeSpan.FromSeconds(90);
});
builder.Services.AddRedisLeaderElection(
connectionString: "localhost:6379",
options =>
{
options.ElectionGroup = "my-service";
options.HeartbeatInterval = TimeSpan.FromSeconds(30);
options.HeartbeatTimeout = TimeSpan.FromSeconds(90);
});
builder.Services.AddAzureBlobStorageLeaderElection(
connectionString: "DefaultEndpointsProtocol=https;AccountName=...",
options =>
{
options.ElectionGroup = "my-service";
options.HeartbeatInterval = TimeSpan.FromSeconds(30);
options.HeartbeatTimeout = TimeSpan.FromSeconds(90);
});
builder.Services.AddConsulLeaderElection(
address: "http://localhost:8500",
options =>
{
options.ElectionGroup = "my-service";
options.HeartbeatInterval = TimeSpan.FromSeconds(30);
options.HeartbeatTimeout = TimeSpan.FromSeconds(90);
});
builder.Services.AddZooKeeperLeaderElection(
connectionString: "localhost:2181",
options =>
{
options.ElectionGroup = "my-service";
options.HeartbeatInterval = TimeSpan.FromSeconds(30);
options.HeartbeatTimeout = TimeSpan.FromSeconds(90);
});
PostgreSQL with Custom Schema and Table:
builder.Services.AddPostgreSQLLeaderElection(options =>
{
options.ConnectionString = "Host=localhost;Database=myapp;Username=user;Password=pass";
options.TableName = "custom_leader_election";
options.SchemaName = "leader_election";
options.AutoCreateTable = true;
options.CommandTimeoutSeconds = 60;
}, leaderElectionOptions =>
{
leaderElectionOptions.ElectionGroup = "my-service";
leaderElectionOptions.ParticipantId = Environment.MachineName;
leaderElectionOptions.HeartbeatInterval = TimeSpan.FromSeconds(15);
leaderElectionOptions.HeartbeatTimeout = TimeSpan.FromSeconds(45);
leaderElectionOptions.EnableDetailedLogging = true;
});
ZooKeeper with Custom Session Settings:
builder.Services.AddZooKeeperLeaderElection(options =>
{
options.ConnectionString = "zk1:2181,zk2:2181,zk3:2181";
options.RootPath = "/my-app/leader-election";
options.SessionTimeout = TimeSpan.FromSeconds(30);
options.ConnectionTimeout = TimeSpan.FromSeconds(10);
options.MaxRetries = 5;
options.RetryDelay = TimeSpan.FromSeconds(2);
options.AutoCreateRootPath = true;
}, leaderElectionOptions =>
{
leaderElectionOptions.ElectionGroup = "my-service";
leaderElectionOptions.ParticipantId = $"{Environment.MachineName}-{Environment.ProcessId}";
leaderElectionOptions.HeartbeatInterval = TimeSpan.FromSeconds(10);
leaderElectionOptions.HeartbeatTimeout = TimeSpan.FromSeconds(30);
});
The service provides an AsyncEnumerable API for observing leadership changes:
Use GetLeadershipChangesAsync() to subscribe to these events and optionally filter by event type using LeadershipEventType flags.
The framework includes comprehensive test coverage:
# Run all tests
dotnet test
# Run specific test project
dotnet test tests/MultiLock.Tests/
dotnet test tests/MultiLock.IntegrationTests/
Check out the sample applications:
samples/MultiLock.Sample/ - Single provider demonstrationsamples/MultiLock.MultiProvider/ - Multiple instances competing# Run the basic sample with different providers
cd samples/MultiLock.Sample
dotnet run inmemory
dotnet run filesystem
dotnet run sqlserver "Server=localhost;Database=Test;Trusted_Connection=true;"
dotnet run postgresql "Host=localhost;Database=leaderelection;Username=user;Password=pass"
dotnet run redis "localhost:6379"
dotnet run consul "http://localhost:8500"
dotnet run zookeeper "localhost:2181"
# Run the multi-provider demo
cd samples/MultiLock.MultiProvider
dotnet run
The project includes comprehensive unit tests and integration tests. Integration tests use Docker containers to test against real services.
# Run unit tests
dotnet test tests/MultiLock.Tests/
# Run integration tests (requires Docker)
docker-compose up -d
dotnet test tests/MultiLock.IntegrationTests/
docker-compose down
The included docker-compose.yml file provides all the necessary services for testing:
# Start all services
docker-compose up -d
# Check service health
docker-compose ps
# View logs for a specific service
docker-compose logs postgres
docker-compose logs zookeeper
# Stop all services
docker-compose down
# Stop and remove volumes (clean slate)
docker-compose down -v
The framework works seamlessly in containerized environments. See docker-compose.yml for examples of running with various backing services.
Contributions are welcome! Please see our contributing guidelines and code of conduct.
This project is licensed under the MIT License - see the LICENSE file for details.