Extension methods that add parallel ForEach iterations to IEnumerable<T>, IAsyncEnumerable<T>, and Channel<T>. Process items concurrently with configurable limits, collect results, and control per-key concurrency.
$ dotnet add package Polluxs.ForEach
Make .NET concurrency simple.
Extension methods for parallel processing with fluent syntax. Built on Parallel.ForEachAsync with extras like result collection and per-key limits.
⚠️ Warning: This package is in active development and may introduce breaking changes between versions.
// Process 100 URLs, max 20 at a time
await urls.ForEachParallelAsync(async (url, ct) =>
{
var response = await httpClient.GetAsync(url, ct);
Console.WriteLine($"{url}: {response.StatusCode}");
}, maxParallel: 20);
// Download and collect all results
var results = await urls.ForEachParallelAsync(async (url, ct) =>
{
var content = await httpClient.GetStringAsync(url, ct);
return (url, content.Length);
}, maxParallel: 20);
foreach (var (url, size) in results)
Console.WriteLine($"{url}: {size} bytes");
// Max 50 total, but only 2 per customer
await orders.ForEachKeyParallelAsync(
keySelector: order => order.CustomerId,
body: async (order, ct) => await ProcessOrderAsync(order, ct),
maxTotalParallel: 50,
maxPerKey: 2);
// Process items in batches of 50, with max 5 batches running concurrently
await records.ForEachBatchParallelAsync(async (batch, ct) =>
{
await database.BulkInsertAsync(batch, ct);
}, maxPerBatch: 50, maxConcurrent: 5);
For IEnumerable<T>, IAsyncEnumerable<T>, and Channel<T>:
| Method | Purpose |
|---|---|
ForEachParallelAsync | Process items concurrently with a global limit |
ForEachParallelAsync<T,TResult> | Process items concurrently and collect results |
ForEachKeyParallelAsync | Process items with both global and per-key concurrency limits |
ForEachBatchParallelAsync | Process items in batches with parallel batch execution |
For Channel<T> only:
| Method | Purpose |
|---|---|
ForEachAsync | Process items sequentially |
ReadAllAsync | Convert channel to IAsyncEnumerable<T> |
WriteAllAsync | Write items from IEnumerable<T> or IAsyncEnumerable<T> into channel |
All methods support cancellation tokens at two levels:
1. Method-level cancellation (always available):
var cts = new CancellationTokenSource();
await items.ForEachParallelAsync(async item =>
{
await ProcessAsync(item);
}, maxParallel: 10, ct: cts.Token); // ← Pass CT here
2. Body-level cancellation (optional - use when your work needs it):
await items.ForEachParallelAsync(async (item, ct) => // ← CT parameter
{
await ProcessAsync(item, ct); // ← Pass to operations
}, maxParallel: 10);
Don't need cancellation? Just omit it:
// Simplest form - no cancellation token needed
await items.ForEachParallelAsync(async item =>
{
await ProcessAsync(item);
}, maxParallel: 10);
Run async operations for an enumerable with a concurrency limit.
using ForEach.Enumerable; // or ForEach.AsyncEnumerable
await files.ForEachParallelAsync(async (path, ct) =>
{
var content = await File.ReadAllTextAsync(path, ct);
var upper = content.ToUpperInvariant();
await File.WriteAllTextAsync($"{path}.out", upper, ct);
}, maxParallel: 8);
maxParallelIEnumerable<T>: Aggregates via Parallel.ForEachAsync → AggregateExceptionIAsyncEnumerable<T>: Aggregates via Task.WhenAll → AggregateExceptionProcess items concurrently and collect results.
var results = await urls.ForEachParallelAsync(async (url, ct) =>
{
using var r = await httpClient.GetAsync(url, ct);
return (url, r.StatusCode);
}, maxParallel: 16);
foreach (var (url, code) in results)
Console.WriteLine($"{url} → {code}");
ConcurrentBag under the hoodForEachParallelAsync (inherits from Parallel.ForEachAsync)Limit concurrency globally AND per key.
await jobs.ForEachKeyParallelAsync(
keySelector: j => j.AccountId,
body: async (job, ct) =>
{
await HandleJobAsync(job, ct);
},
maxTotalParallel: 64,
maxPerKey: 2);
maxTotalParallel (actual items being processed concurrently)maxPerKey (items per key being processed concurrently)min(maxTotalParallel, maxPerKey) - if maxPerKey > maxTotalParallel, the global limit winsTask.WhenAll - multiple failures collected into an AggregateExceptionProcess items in batches with concurrent batch execution.
using ForEach.Enumerable; // or ForEach.AsyncEnumerable
// Bulk insert records in batches
await records.ForEachBatchParallelAsync(async (batch, ct) =>
{
await database.BulkInsertAsync(batch, ct);
Console.WriteLine($"Inserted batch of {batch.Count} records");
}, maxPerBatch: 100, maxConcurrent: 4);
maxPerBatch itemsmaxConcurrent batches processed in parallelList<T> to body functionTask.WhenAll → AggregateExceptionProcess channel items sequentially.
await channel.ForEachAsync(async (item, ct) =>
await ProcessAsync(item, ct));
Convert channel to IAsyncEnumerable<T>.
await foreach (var item in channel.ReadAllAsync())
Process(item);
Write items into channel.
var channel = Channel.CreateUnbounded<int>();
// From IEnumerable<T>
await channel.WriteAllAsync(Enumerable.Range(1, 100));
// From IAsyncEnumerable<T>
await channel.WriteAllAsync(FetchDataAsync());
channel.Writer.Complete();
MIT = copy, use, modify, ignore.