⚠ Deprecated: Legacy
Package name changed
Suggested alternative: CounterpointCollective.Dataflow.Composable
Composable dataflow blocks for building robust, reusable data processing pipelines in .NET.
$ dotnet add package CounterpointCollective.ComposableDataflowBlocks
ComposableDataflowBlocks is a library designed to simplify the construction DataflowBlock pipelines. It provides a collection of modular, reusable blocks for processing and transforming data, enabling developers to build complex workflows with composability, scalability, and clarity.
Documentation: Browse the docs
Source & contributions: GitLab repository
The ComposableDataflowBlocks library makes building complex, high-performance data processing pipelines in .NET easier, safer, and more flexible. It extends TPL Dataflow with a set of modular building blocks that solve real-world problems developers frequently face but that the base library does not address.
In short, the library helps you construct powerful, adaptable, and production-grade Dataflow solutions with minimal friction, while staying fully compatible with the familiar TPL Dataflow model.
Clone the repository:
git clone https://gitlab.com/counterpointcollective/composabledataflowblocks.git
Install dependencies (if any):
# Example for .NET/C# projects
dotnet restore
//Example of using a BoundedPropagatorBlock to add bounded capacity and item counting
using CounterpointCollective.DataFlow;
//First we create a DataflowBlock ourselves, composed of multiple subblocks.
var b = new BufferBlock<int>(new() { BoundedCapacity = DataflowBlockOptions.Unbounded });
var t = new TransformBlock<int, int>(async i =>
{
await Task.Yield(); //simulate some work.
return i + 1;
}, new() { BoundedCapacity = DataflowBlockOptions.Unbounded });
b.LinkTo(t, new DataflowLinkOptions() { PropagateCompletion = true });
var ourOwnDataflowBlock = DataflowBlock.Encapsulate(b, t);
//Now we want to
// - put a boundedCapacity on our new block and
// - be able to count how many items are contained with it, at any given time
//This is what we will use a BoundedPropagatorBlock for.
var testSubject = new BoundedPropagatorBlock<int,int>(ourOwnDataflowBlock, boundedCapacity: 2000);
//Thus we enabled a bounded capacity of 2000 messages, and real-time counting on our own custom DataflowBlock!
Assert.Equal(0, testSubject.Count);
//we should be able to push synchronously up to the bounded capacity.
for (var i = 0; i < 2000; i++)
{
Assert.True(testSubject.Post(i));
Assert.Equal(i + 1, testSubject.Count); //count is administered properly
}
//Example showing that you can dynamically resize the bounded capacity of any block by wrapping it into a BoundedPropagatorBlock
var bufferBlock = new BufferBlock<int>(new() { BoundedCapacity = DataflowBlockOptions.Unbounded });
var resizableBufferBlock = new BoundedPropagatorBlock<int,int>(bufferBlock);
//We did not specify a bounded capacity, so it defaults to DataflowBlockOptions.Unbounded
Assert.True(resizableBufferBlock.Post(1));
//But we can dynamically set the bounded capacity at any point.
resizableBufferBlock.BoundedCapacity = 2;
Assert.True(resizableBufferBlock.Post(2));
Assert.False(resizableBufferBlock.Post(3));
resizableBufferBlock.BoundedCapacity = 3;
Assert.True(resizableBufferBlock.Post(3));
// Example: Using AutoScaling on a ResizableBatchTransformBlock
// For demonstration: assume our workload performs best when batch size is ~100 items.
async Task<IEnumerable<int>> ProcessBatch(int[] batch)
{
var distanceFromOptimal = Math.Abs(batch.Length - 100);
await Task.Delay(distanceFromOptimal * batch.Length); // Simulate slower processing when batch isn't optimal
return batch;
}
// Create a ResizableBatchTransformBlock using our ProcessBatch function.
var testSubject = new ResizableBatchTransformBlock<int, int>(
ProcessBatch,
initialBatchSize: 1,
new ExecutionDataflowBlockOptions { BoundedCapacity = 10000 }
);
// Batch size can be manually adjusted:
testSubject.BatchSize = 5;
// Or automatically optimized using AutoScaling:
testSubject.EnableAutoScaling(
new DefaultBatchSizeStrategy(minBatchSize: 1, maxBatchSize: 200)
);
// Send some work:
for (var i = 0; i < 10000; i++)
{
await testSubject.SendAsync(i);
}
// Process outputs while AutoScaling gradually converges toward the optimal batch size (~100).
for (var i = 0; i < 10000; i++)
{
var result = await testSubject.ReceiveAsync();
}
ComposableDataflowBlocks is written in C#.
Contributions are welcome! If you'd like to suggest improvements, report bugs, or contribute new dataflow blocks, please open an issue or submit a pull request.
git checkout -b feature/MyFeature).git commit -am 'Add new feature').git push origin feature/MyFeature).Distributed under the MIT License. See LICENSE for more information.