.NET library to implement the scatter-gather pattern using MongoDB to store distributed progress state
$ dotnet add package ScatterGather.MongoDB| NuGet package | Link |
|---|---|
| ScatterGather | |
| ScatterGather.DynamoDB | |
| ScatterGather.MongoDB | |
| ScatterGather.Postgres |
Scatter-gather is an enterprise integration pattern where a single large operation is split into a number of sub-operations, usually performed by separate workers. Then, another operation must be carried out after all sub-operations have finished.
This library provides a means to keep track of how many scattered sub-operations have been completed (i.e., gathered), using a database (currently Amazon DynamoDB, MongoDB, and PostgreSQL are supported) to store that state and a gateway to manage it.
A typical scenario involves a component that is asked to process a large operation, and a set of workers this component wants to delegate parts of that large operation. For example, the first component may want to send messages to workers using a message queue, keeping processing of parts asynchronous.
Once a worker completes processing of a part, it may send a message back to the first component, or even to a third one, which act as an aggregator for the processed parts. When all parts have been processed, the large operation may move forward.
The first component performs a scatter of the large operation, while each worker performs a gather.
When a worker performs a gather, and the scatter-gather gateway provided by this library notices it is the last part left, it calls a callback function in the context of that worker, and only that worker.
The scatter-gather pattern may be implemented on AWS using the Map state of Step functions. Using Map, you can split an operation across multiple workers. If you need to process a large number of sub-operations, you can run a so-called Distributed Map state, which takes its input from a file saved in an S3 bucket, containing the list the parts to scatter.
The Map state of Step functions will wait for all workers to complete before moving forward. In case of errors, the Map state will produce reports containing the failed or pending parts (saving them into an S3 bucket, in case of a Distributed Map state), which you can use to fabricate a new input to resume the failed state machine.
As an alternative, you might want to use the scatter-gather gateway provided by this library in the following cases:
The ScatterGatherGateway class in the ScatterGather namespace is the entry point for functionality of the scatter-gather gateway provided by this library. A scatter-gather gateway needs an IScatterGatherStore to persist progress. Factory methods are provided for Amazon DynamoDB, MongoDB and PostgreSQL.
Each scatter-gather request must be identified by a unique ScatterRequestId generated by the application. Each scattered part must be identified by a ScatterPartId that must be unique in that scatter-gather operation. Both types are just simple wrappers for a string, to make these values type safe.
Both the scattering component and gathering workers must create an ScatterGatherGateway using one of the provided constructors.
Typically, the scattering component creates a unique ScatterRequestId, creates a scatter scope using CreateScatterScopeAsync and calls AddAsync on the scope, perhaps multiple times, to add parts (the scatter-gather gateway is "stream-friendly"). Each worker calls Gather to mark each part as completed.
Performance-wise, the run time of all methods is proportional to the number of elements passed to that method, but is irrespective of the number of elements in the whole scatter-gather operation.
public class ScatterGatherGateway(IScatterGatherStore store)
{
public async Task<ScatterScope<TContext>> CreateScatterScopeAsync<TContext>(
ScatterRequestId requestId,
TContext context,
Func<TContext, Task> handleCompletion);
}
public sealed class ScatterScope<TContext> : IAsyncDisposable
{
public async Task AddAsync(IEnumerable<ScatterPartId> partIds, Func<Task> process);
public async ValueTask DisposeAsync();
}CreateScatterScopeAsync initializes a new scatter-gather request and returns a ScatterScope to add parts to the request. The requestId parameter identifies the request globally. The context parameter allows storing any data that the application may need when handling completion; the size of context data is limited by the size of the underlying storage, that is, once serialized as JSON plus some overhead, 400 kB for DynamoDB, 16 megabytes for MongoDB and 1 GB for Postgres. The handleCompletion function is called when the whole scatter-gather operation is completed.
AddAsync on the returned ScatterScope tells the scatter-gather gateway that there are one or more new parts that are about to be scattered for the request identified by requestId. Part identifiers are listed in partIds. The process function must be specified to execute specific action on the added part(s), for example to send a message to a worker.
Disposing the returned ScatterScope signals that all parts have been scattered, thus it is now possible to expect completion of the whole scatter-gather operation. If, by the time the scope is disposed, all parts have already been gathered and the scatter-gather operation is complete, the handleCompletion function is called immediately.
public class ScatterGatherGateway(IScatterGatherStore store)
{
public async Task GatherAsync<TContext>(
ScatterRequestId requestId,
IReadOnlyCollection<ScatterPartId> partIds,
Func<Task> process,
Func<TContext, Task> handleCompletion);
}A worker calls GatherAsync when processing its part(s). The gathered parts (maybe just one) are identified by partIds within requestId. Both requestId and partId are expected to be provided to the worker by the scattering component, for example within a message. The process function specifies any work to do on the parts being gathered. The handleCompletion function will be executed if the scatter-gather gateway notices that that was the last part to be gathered.
Note that only one worker will be able to call the completion handler function, because the scatter-gather gateway treats it as a critical section. Also note that, in case of errors during the completion handler function, restarting the worker that was processing the last GatherAsync will restart the completion handler function (that is, the critical section is re-entrant).
This library provides implementations of the IScatterGatherStore interface, allowing to choose between Amazon DynamoDB, MongoDB and PostgreSQL as storage to persist the progress state of a scatter-gather operation among distributed workers.
The scatter-gather gateway for DynamoDB is provided by the ScatterGather.DynamoDB library.
The scatter-gather gateway needs a pair of master-detail DynamoDB tables to keep its state: one to store current scatter requests and one to list scattered parts.
The scatter-gather gateway will try to create those tables if they don't already exist, otherwise it will use the ones already present. Considering the typical usage pattern of the scatter-gather gateway, the tables are created with a "pay per request" billing mode.
If you want more control over the billing mode and provisioned capacity, you can create the pair of tables manually, as follows:
RequestId string field as the partition keyRequestId string field as the partition key and a PartId string field as the sort keyNote that your application will need the following permissions on those two tables: dynamodb:CreateTable, dynamodb:DescribeTable, dynamodb:Query, dynamodb:PutItem, dynamodb:DeleteItem, dynamodb:UpdateItem and dynamodb:BatchWriteItem.
For billing, given a scatter-gather operation consisting of N scattered parts, account for about 4 write request units for the request table, and about 2N write request units and N read request units for the part table. This is for normal operation with no errors and restarts involved.
To construct the scatter-gather gateway using DynamoDB as storage, use one of factory methods of the ScatterGatherFactory class in the ScatterGather.DynamoDB namespace:
public static class ScatterGatherFactory
{
public static ScatterGatherGateway Create(string requestTableName, string partTableName);
public static ScatterGatherGateway Create(string dynamoDbServiceUrlOption, string requestTableName, string partTableName);
}Pass either factory method the names for the request table and the part table, which may either be already existing or not. The second factory method allows you to pass a custom URL for the DynamoDB service, useful when you want to use, for example, a local DynamoDB for development and testing.
The scatter-gather gateway for MongoDB is provided by the ScatterGather.MongoDB library.
To construct the scatter-gather gateway use the factory method of the ScatterGatherFactory class in the ScatterGather.MongoDB namespace:
public static class ScatterGatherFactory
{
public static ScatterGatherGateway Create(IMongoDatabase mongoDatabase, string collectionNamePrefix);
}Pass the instance of the Mongo database to store the scatter-gather state and a string that will be used as a name prefix for the collections used by the scatter-gather gateway. Two collections are used, named <collectionNamePrefix>.Requests and <collectionNamePrefix>.Parts, storing scatter-gather operations and sub-operations respectively.
The scatter-gather gateway for Postgres is provided by the ScatterGather.Postgres library.
The scatter-gather gateway needs a pair of master-detail tables to keep its state: one to store current scatter requests and one to list scattered parts. The scatter-gather gateway will try to create those tables if they don't already exist, otherwise it will use the ones already present.
To construct the scatter-gather gateway using Postgres as storage, use one of factory methods of the ScatterGatherFactory class in the ScatterGather.Postgres namespace:
public static class ScatterGatherFactory
{
public static ScatterGatherGateway Create(string connectionString, string requestTableName, string partTableName);
}Pass the Postgres connection string and the names for the request table and the part table, which may either be already existing or not.
A full example using either database-specific implementation is provided in the Example directory of the repository.
Automated tests are run against containers created from the dynamodb-local, mongo and postgres Docker images. A docker-compose file is provided so that you can just run docker-compose up to run the DynamoDB, MongoDB and Postgres containers before running tests. The three servers are mapped to TCP ports 8998, 27017 and 5432 on the host, respectively.
MIT License - see LICENSE file for details.
Created and maintained by Salvatore "Salvo" Isaja. Feedback is always welcome.
Thanks to Matteo Pierangeli for the initial review and helpful comments.