Library for coordinating distributed worker instances in multi-pod and multi-cluster environments
$ dotnet add package Logx.WorkerCoordinationA lightweight, database-backed coordination library for distributed .NET workers that provides leader election, worker synchronization, and global coordination across distributed environments.
TODO
<!-- ```bash dotnet add package Logx.WorkerCoordination ``` -->// Add worker coordination to your service collection
builder.Services.AddWorkerCoordination(options =>
options.UseNpgsql("Host=localhost;Database=worker_coordination;Username=user;Password=pw"));
// Enable database setup
app.UseWorkerCoordination();
// Add your coordinated worker
builder.Services.AddHostedService<MyClusterWorker>();
The library provides three base worker types to handle different coordination scenarios:
Ensures that only one worker instance runs in each cluster/group.
public class IndexingWorker : SingleInstanceClusterWorker
{
public IndexingWorker(ILogger<IndexingWorker> logger,
IServiceProvider serviceProvider,
INodeIdentityProvider nodeIdentityProvider)
: base(logger, serviceProvider, nodeIdentityProvider, "indexing-worker")
{
}
protected override async Task ExecuteLeaderWorkAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Executing as the cluster leader");
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}Ensures that only one worker instance runs across all clusters/groups.
public class GlobalTaskWorker : SingleInstanceGlobalWorker
{
public GlobalTaskWorker(ILogger<GlobalTaskWorker> logger,
INodeIdentityProvider nodeIdentityProvider,
IServiceProvider serviceProvider)
: base(logger, serviceProvider, nodeIdentityProvider, "global-task")
{
}
protected override async Task ExecuteGlobalLeaderWorkAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Executing as the global leader");
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}All instances run but coordinate work distribution.
public class DataProcessingWorker : GloballyCoordinatedWorker
{
public DataProcessingWorker(ILogger<DataProcessingWorker> logger,
INodeIdentityProvider nodeIdentityProvider,
IServiceProvider serviceProvider)
: base(logger, serviceProvider, nodeIdentityProvider, "data-processor")
{
}
protected override async Task ExecuteCoordinatedWorkAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Worker {Index} of {Total} processing data",
GlobalWorkerIndex + 1, GlobalWorkerCount);
// Process work for this worker's share
var myPartition = GlobalWorkerIndex;
var totalPartitions = GlobalWorkerCount;
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}By default, the library detects the node's identity from the environment. You can customize this behavior:
// Use the default environment-based identity provider
builder.Services.AddSingleton<INodeIdentityProvider, EnvironmentIdentityProvider>();
// OR provide explicit identity
builder.Services.AddSingleton<INodeIdentityProvider>(new EnvironmentIdentityProvider(
"my-worker-node-1", "production-group"));
// OR use custom environment variable names
builder.Services.AddSingleton<INodeIdentityProvider>(new EnvironmentIdentityProvider(
nodeIdEnvVar: "MY_NODE_ID",
groupNameEnvVar: "DEPLOYMENT_GROUP"));Environment idenity profiver by default uses these Kubernetes compatible env variables:
HOSTNAME for node id within cluster,CLUSTER_NAMERefer to WorkerCoordintation.Sample
// Add package: Npgsql.EntityFrameworkCore.PostgreSQL
builder.Services.AddWorkerCoordination(options =>
options.UseNpgsql("Host=localhost;Database=worker_coordination;Username=user;Password=pw",
npgsqlOptions => npgsqlOptions.EnableRetryOnFailure()));// Add package: Microsoft.EntityFrameworkCore.SqlServer
builder.Services.AddWorkerCoordination(options =>
options.UseSqlServer("Server=myServerAddress;Database=worker_coordination;User Id=user;Password=pw;",
sqlOptions => sqlOptions.EnableRetryOnFailure()));// Add package: Microsoft.EntityFrameworkCore.Sqlite
builder.Services.AddWorkerCoordination(options =>
options.UseSqlite("Data Source=worker_coordination.db"));
```