A robust Task Parallel Library that combines Railway Oriented Programming with parallel execution patterns. Execute collections of tasks concurrently while maintaining functional programming principles and comprehensive error handling.
$ dotnet add package Mahamudra.Contemporary
Photo by Paul Engel on Unsplash
A robust Task Parallel Library that combines Railway Oriented Programming with parallel execution patterns. Execute collections of tasks concurrently while maintaining functional programming principles and comprehensive error handling.
Result<T, E> pattern for elegant error handlingConcurrentDictionary for safe parallel operationsdotnet add package Mahamudra.Contemporary
using Mahamudra.Contemporary;
using Microsoft.Extensions.Logging;
// Create factory with optional logging
var factory = new ParallelAsyncFactory(logger);
// Execute async operations in parallel
var results = await factory.ExecuteAsyncResult<int, string>(
numbers,
async number => await ProcessNumberAsync(number)
);
// Handle results using Railway Oriented Programming
var successes = results.Where(r => r.Key is Success<int, string>);
var failures = results.Where(r => r.Key is Failure<int, string>);
The async-first class providing true parallel execution with for optimal performance.
Task.WhenAll()// Execute async function with Result pattern (Railway Oriented)
Task<ConcurrentDictionary<Result<T, string>, M>> ExecuteAsyncResult<T, M>(
IEnumerable<T> list,
Func<T, Task<M>> function
)
Parameters:
list: Collection of items to processfunction: Async function to execute for each itemReturns: ConcurrentDictionary where:
Result<T, string> - Either Success<T> containing the original item, or Failure<string> with error detailsM - The result of the function execution (or default for failures)Behavior: Continues processing all items even if some fail. Failures are captured in the Result pattern.
// Execute async function with fail-fast behavior
Task<ConcurrentDictionary<T, M>> ExecuteAsync<T, M>(
IEnumerable<T> list,
Func<T, Task<M>> function
)
Parameters:
list: Collection of items to processfunction: Async function to execute for each itemReturns: ConcurrentDictionary<T, M> with results
Behavior: Throws exception immediately if any item fails. Use when all items must succeed.
Synchronous parallel execution using Parallel.ForEach for CPU-bound operations.
// Execute synchronous function with Result pattern
ConcurrentDictionary<Result<T, string>, M> ExecuteResult<T, M>(
IEnumerable<T> list,
Func<T, M> function
)
Parameters:
list: Collection of items to processfunction: Synchronous function to execute for each itemReturns: ConcurrentDictionary with Railway Oriented results
Behavior: Synchronous parallel execution, captures all errors.
// Execute synchronous function with fail-fast behavior
ConcurrentDictionary<T, M> Execute<T, M>(
IEnumerable<T> list,
Func<T, M> function
)
Parameters:
list: Collection of items to processfunction: Synchronous function to execute for each itemReturns: ConcurrentDictionary<T, M> with results
Behavior: Throws exception immediately on first failure.
// Execute async function synchronously with Result pattern
ConcurrentDictionary<Result<T, string>, M> ExecuteAsyncResult<T, M>(
IEnumerable<T> list,
Func<T, Task<M>> function
)
Parameters:
list: Collection of items to processfunction: Async function to execute synchronously for each itemReturns: ConcurrentDictionary with Railway Oriented results
Behavior: Converts async operations to sync using ToSync() extension method.
// Execute async function synchronously with fail-fast behavior
ConcurrentDictionary<T, M> ExecuteAsync<T, M>(
IEnumerable<T> list,
Func<T, Task<M>> function
)
Parameters:
list: Collection of items to processfunction: Async function to execute synchronously for each itemReturns: ConcurrentDictionary<T, M> with results
Behavior: Synchronous execution of async functions, throws on first error.
// Default constructor (uses NullLogger)
ParallelAsyncFactory()
ParallelFactory()
// Constructor with custom logger
ParallelAsyncFactory(ILogger logger)
ParallelFactory(ILogger logger)
| Use Case | Recommended Class | Reason |
|---|---|---|
| I/O-bound operations (HTTP, DB, File I/O) | ParallelAsyncFactory | True async parallelism, better performance |
| CPU-bound operations | ParallelFactory | Optimized thread pool usage |
| Legacy synchronous code | ParallelFactory | Native synchronous execution |
| Modern async APIs | ParallelAsyncFactory | Native async/await support |
| Mode | Method | Behavior | Use When |
|---|---|---|---|
| Fail-Fast | ExecuteAsync() / Execute() | Stops on first error, throws exception | All items must succeed (transactions, critical operations) |
| Railway Oriented | ExecuteAsyncResult() / ExecuteResult() | Processes all items, captures errors | Partial success acceptable (batch processing, data imports) |
var urls = new[] { "https://api1.com", "https://api2.com", "https://api3.com" };
var factory = new ParallelAsyncFactory(logger);
var results = await factory.ExecuteAsyncResult<string, HttpResponseMessage>(
urls,
async url => await httpClient.GetAsync(url)
);
// Process successful responses
foreach (var success in results.Where(r => r.Key is Success<string, string>))
{
var originalUrl = ((Success<string, string>)success.Key).Value;
var response = success.Value;
Console.WriteLine($"✅ {originalUrl}: {response.StatusCode}");
}
// Handle failures
foreach (var failure in results.Where(r => r.Key is Failure<string, string>))
{
var error = ((Failure<string, string>)failure.Key).Error;
Console.WriteLine($"❌ Error: {error}");
}
var dataIds = Enumerable.Range(1, 100);
var processor = new ParallelAsyncFactory(logger);
var processedData = await processor.ExecuteAsyncResult<int, ProcessedResult>(
dataIds,
async id => await ProcessDataAsync(id)
);
var successCount = processedData.Count(r => r.Key is Success<int, string>);
var errorCount = processedData.Count(r => r.Key is Failure<int, string>);
Console.WriteLine($"Processed: {successCount} successful, {errorCount} failed");
// Use ParallelFactory for CPU-intensive tasks
var numbers = Enumerable.Range(1, 1000);
var factory = new ParallelFactory(logger);
var results = factory.ExecuteResult<int, bool>(
numbers,
number => IsPrime(number) // Synchronous CPU-bound operation
);
var primeCount = results.Count(r => r.Key is Success<int, string> && r.Value);
Console.WriteLine($"Found {primeCount} prime numbers");
var userIds = await GetUserIdsAsync();
var factory = new ParallelAsyncFactory(logger);
// Update users in parallel with error handling
var updateResults = await factory.ExecuteAsyncResult<int, bool>(
userIds,
async userId =>
{
await dbContext.Users
.Where(u => u.Id == userId)
.ExecuteUpdateAsync(u => u.SetProperty(x => x.LastUpdated, DateTime.UtcNow));
return true;
}
);
// Log failures for retry
var failures = updateResults.Where(r => r.Key is Failure<int, string>);
foreach (var failure in failures)
{
var error = ((Failure<int, string>)failure.Key).Error;
logger.LogError($"Failed to update user: {error}");
}
public class OrderService
{
private readonly ParallelAsyncFactory _factory;
private readonly HttpClient _httpClient;
public async Task<OrderResult> ProcessOrderAsync(Order order)
{
var services = new[] { "inventory", "payment", "shipping" };
// Call multiple microservices in parallel
var results = await _factory.ExecuteAsyncResult<string, ServiceResponse>(
services,
async serviceName => await CallServiceAsync(serviceName, order)
);
// Check if all services succeeded
if (results.All(r => r.Key is Success<string, string>))
{
return OrderResult.Success();
}
// Rollback on any failure
var failures = results
.Where(r => r.Key is Failure<string, string>)
.Select(r => ((Failure<string, string>)r.Key).Error);
await RollbackServicesAsync(order);
return OrderResult.Failed(failures);
}
}
// Use ExecuteAsync when all items MUST succeed
try
{
var criticalTasks = new[] { "task1", "task2", "task3" };
var factory = new ParallelAsyncFactory(logger);
var results = await factory.ExecuteAsync<string, bool>(
criticalTasks,
async task => await ExecuteCriticalTaskAsync(task)
);
Console.WriteLine("All critical tasks completed successfully!");
}
catch (Exception ex)
{
// Immediate failure - transaction can be rolled back
logger.LogError($"Critical task failed: {ex.Message}");
await RollbackTransactionAsync();
}
The library integrates with Microsoft.Extensions.Logging. Configure your logger as usual:
// In Startup.cs or Program.cs
services.AddLogging(builder =>
{
builder.AddConsole();
builder.SetMinimumLevel(LogLevel.Information);
});
// Inject into your service
public class MyService
{
public MyService(ILogger<MyService> logger)
{
_factory = new ParallelAsyncFactory(logger);
}
}
Performance comparison between synchronous parallel execution and async parallel execution:
| Method | Mean | Error | StdDev | Allocated |
|---|---|---|---|---|
| ExecuteParallelResult | 731.6 ms | 15,086.4 ms | 826.9 ms | 307.29 KB |
| ExecuteAsyncResult | 206.5 ms | 243.0 ms | 13.32 ms | 270.69 KB |
Async execution shows ~3.5x better performance with lower memory allocation.
Run the test suite:
dotnet test
Example test demonstrating error handling:
[TestMethod]
public async Task ExecuteAsyncResult_HandlesPartialFailures()
{
var items = new[] { 1, 2, 3, 4, 5 };
var results = await factory.ExecuteAsyncResult<int, int>(
items,
async x => x == 3 ? throw new Exception("Test error") : x * 2
);
var successes = results.Where(r => r.Key is Success<int, string>);
var failures = results.Where(r => r.Key is Failure<int, string>);
Assert.AreEqual(4, successes.Count());
Assert.AreEqual(1, failures.Count());
Assert.AreEqual(16, successes.Sum(s => s.Value)); // 2+4+8+10 (3 failed, so 6 not included)
}
// ✅ GOOD: Use ParallelAsyncFactory for I/O operations
var factory = new ParallelAsyncFactory(logger);
var results = await factory.ExecuteAsyncResult(urls, async url => await httpClient.GetAsync(url));
// ❌ BAD: Using ParallelFactory for I/O (blocks threads unnecessarily)
var factory = new ParallelFactory(logger);
var results = factory.ExecuteAsyncResult(urls, async url => await httpClient.GetAsync(url));
// ✅ GOOD: Check result type before casting
foreach (var result in results)
{
if (result.Key is Success<int, string> success)
{
var value = success.Value;
var output = result.Value;
// Process success
}
else if (result.Key is Failure<int, string> failure)
{
var error = failure.Error;
// Handle error
}
}
// ❌ BAD: Casting without checking
var success = (Success<int, string>)result.Key; // May throw InvalidCastException
// ✅ GOOD: Process all items, collect errors for retry
var results = await factory.ExecuteAsyncResult(items, async item => await ProcessAsync(item));
var failures = results.Where(r => r.Key is Failure<T, string>);
await RetryFailedItemsAsync(failures);
// ❌ BAD: Fail-fast for batch operations loses progress
try
{
var results = await factory.ExecuteAsync(items, async item => await ProcessAsync(item));
}
catch
{
// Lost all progress, must restart from beginning
}
// ✅ GOOD: Use structured logging
services.AddLogging(builder =>
{
builder.AddConsole();
builder.AddFilter("Mahamudra.Contemporary", LogLevel.Information);
});
// 💡 TIP: Use Debug level for detailed execution tracking
builder.AddFilter("Mahamudra.Contemporary", LogLevel.Debug);
// ❌ BAD: Shared mutable state causes race conditions
var counter = 0;
await factory.ExecuteAsyncResult(items, async item =>
{
counter++; // Race condition!
return await ProcessAsync(item);
});
// ✅ GOOD: Use immutable patterns or thread-safe collections
var processed = await factory.ExecuteAsyncResult(items, async item => await ProcessAsync(item));
var counter = processed.Count(r => r.Key is Success<T, string>);
A: ParallelAsyncFactory uses true async parallelism with Task.WhenAll(), making it ideal for I/O-bound operations (HTTP, database, file operations). ParallelFactory uses Parallel.ForEach, which is optimized for CPU-bound synchronous operations. For most modern applications with async APIs, use ParallelAsyncFactory.
A: Use ExecuteAsync() (fail-fast) when all items must succeed, such as transactional operations where partial completion is unacceptable. Use ExecuteAsyncResult() (Railway pattern) for batch operations where you want to process as many items as possible and handle failures separately.
A: Currently, the library uses default .NET parallelism settings:
ParallelAsyncFactory: All tasks start simultaneously (bounded by system resources)ParallelFactory: Uses Parallel.ForEach default partitioningFor custom control, wrap your operations:
var semaphore = new SemaphoreSlim(10); // Max 10 concurrent operations
await factory.ExecuteAsyncResult(items, async item =>
{
await semaphore.WaitAsync();
try
{
return await ProcessAsync(item);
}
finally
{
semaphore.Release();
}
});
A: Yes! Both factories work without logging. Use the default constructor, which provides a NullLogger:
var factory = new ParallelAsyncFactory(); // No logging required
A:
var results = await factory.ExecuteAsyncResult(items, async item => await ProcessAsync(item));
// Extract failed items
var failedItems = results
.Where(r => r.Key is Failure<T, string> failure)
.Select(r => ((Failure<T, string>)r.Key).Error);
// Retry with exponential backoff
await RetryWithBackoffAsync(failedItems);
A: Yes! Results are stored in ConcurrentDictionary, which is thread-safe. However, your lambda functions must also be thread-safe. Avoid shared mutable state within your functions.
A: The library targets .NET Standard 2.1, making it compatible with:
Cause: Using ToSync() on async operations can cause deadlocks in certain contexts (e.g., ASP.NET synchronization contexts).
Solution: Use ParallelAsyncFactory instead:
// ❌ BAD: May deadlock
var factory = new ParallelFactory();
var results = factory.ExecuteAsyncResult(items, async item => await ProcessAsync(item));
// ✅ GOOD: Use async factory
var factory = new ParallelAsyncFactory();
var results = await factory.ExecuteAsyncResult(items, async item => await ProcessAsync(item));
Cause: Processing millions of items simultaneously can exhaust memory.
Solution: Batch your operations:
var batchSize = 1000;
var batches = items.Chunk(batchSize);
foreach (var batch in batches)
{
var results = await factory.ExecuteAsyncResult(batch, async item => await ProcessAsync(item));
await ProcessResultsAsync(results);
}
Cause: The library captures exception messages as strings.
Solution: Use custom error codes in your functions:
await factory.ExecuteAsyncResult(items, async item =>
{
try
{
return await ProcessAsync(item);
}
catch (HttpRequestException)
{
throw new Exception($"HTTP_ERROR:{item}");
}
catch (TimeoutException)
{
throw new Exception($"TIMEOUT:{item}");
}
});
Cause: Processing duplicate items with Railway pattern can cause key conflicts.
Solution: Ensure items are unique, or transform items to include unique identifiers:
var uniqueItems = items.Select((item, index) => new { Index = index, Item = item });
var results = await factory.ExecuteAsyncResult(uniqueItems, async x => await ProcessAsync(x.Item));
public class ProductSyncService
{
private readonly ParallelAsyncFactory _factory;
private readonly IProductRepository _repository;
public async Task<SyncResult> SyncProductsAsync(IEnumerable<string> productIds)
{
var results = await _factory.ExecuteAsyncResult(
productIds,
async productId =>
{
var externalData = await _externalApi.GetProductAsync(productId);
await _repository.UpdateAsync(productId, externalData);
return true;
}
);
return new SyncResult
{
TotalProcessed = results.Count,
Successful = results.Count(r => r.Key is Success<string, string>),
Failed = results.Count(r => r.Key is Failure<string, string>),
FailedIds = results
.Where(r => r.Key is Failure<string, string>)
.Select(r => ((Failure<string, string>)r.Key).Error)
.ToList()
};
}
}
public class MigrationService
{
public async Task<MigrationReport> MigrateCustomersAsync()
{
var legacyCustomers = await _legacyDb.GetAllCustomersAsync();
var factory = new ParallelAsyncFactory(_logger);
var results = await factory.ExecuteAsyncResult(
legacyCustomers,
async customer =>
{
// Transform legacy data model
var modernCustomer = TransformCustomer(customer);
// Validate business rules
await _validator.ValidateAsync(modernCustomer);
// Insert into new system
await _modernDb.InsertCustomerAsync(modernCustomer);
return modernCustomer.Id;
}
);
await GenerateMigrationReportAsync(results);
return BuildMigrationReport(results);
}
}
public class HealthCheckService
{
public async Task<HealthReport> CheckAllServicesAsync()
{
var endpoints = _configuration.GetServiceEndpoints();
var factory = new ParallelAsyncFactory(_logger);
var results = await factory.ExecuteAsyncResult(
endpoints,
async endpoint =>
{
var stopwatch = Stopwatch.StartNew();
var response = await _httpClient.GetAsync(endpoint.Url);
stopwatch.Stop();
return new HealthCheck
{
Service = endpoint.Name,
Status = response.IsSuccessStatusCode ? "Healthy" : "Unhealthy",
ResponseTime = stopwatch.ElapsedMilliseconds
};
}
);
return BuildHealthReport(results);
}
}
public class PortfolioService
{
public async Task<RebalanceResult> RebalancePortfoliosAsync(IEnumerable<int> portfolioIds)
{
var factory = new ParallelAsyncFactory(_logger);
// Use fail-fast for financial operations
try
{
var results = await factory.ExecuteAsync(
portfolioIds,
async portfolioId =>
{
using var transaction = await _dbContext.Database.BeginTransactionAsync();
var portfolio = await _repository.GetPortfolioAsync(portfolioId);
var trades = CalculateRebalancingTrades(portfolio);
foreach (var trade in trades)
{
await _tradingService.ExecuteTradeAsync(trade);
}
await transaction.CommitAsync();
return true;
}
);
return RebalanceResult.Success(results.Count);
}
catch (Exception ex)
{
_logger.LogError($"Rebalancing failed: {ex.Message}");
await RollbackAllTransactionsAsync();
return RebalanceResult.Failed(ex.Message);
}
}
}
// Instead of processing 1 million items at once
var results = await factory.ExecuteAsyncResult(millionItems, ProcessAsync);
// Batch into manageable chunks
const int batchSize = 1000;
var allResults = new ConcurrentBag<Result>();
foreach (var batch in millionItems.Chunk(batchSize))
{
var batchResults = await factory.ExecuteAsyncResult(batch, ProcessAsync);
foreach (var result in batchResults)
{
allResults.Add(result);
}
}
// Limit concurrent API calls to avoid rate limits
var semaphore = new SemaphoreSlim(10); // Max 10 concurrent
var results = await factory.ExecuteAsyncResult(items, async item =>
{
await semaphore.WaitAsync();
try
{
return await _apiClient.CallAsync(item);
}
finally
{
semaphore.Release();
}
});
// Instead of creating classes for results
var results = await factory.ExecuteAsyncResult(
items,
async item => new { Id = item.Id, Status = await ProcessAsync(item) }
);
// Use ValueTuple for better memory efficiency
var results = await factory.ExecuteAsyncResult(
items,
async item => (item.Id, await ProcessAsync(item))
);
// Create HttpClient once (not per request)
private static readonly HttpClient _httpClient = new HttpClient();
public async Task ProcessUrlsAsync(IEnumerable<string> urls)
{
var factory = new ParallelAsyncFactory(_logger);
var results = await factory.ExecuteAsyncResult(
urls,
async url => await _httpClient.GetStringAsync(url)
);
}
// In library code, avoid capturing synchronization context
var results = await factory.ExecuteAsyncResult(
items,
async item =>
{
var data = await FetchDataAsync(item).ConfigureAwait(false);
return await ProcessDataAsync(data).ConfigureAwait(false);
}
);
This project follows Semantic Versioning. For detailed changelog, see the Releases page.
Current Version: 3.0.2
Major Changes:
Contributions are welcome! Please follow these guidelines:
Fork the repository
git clone https://github.com/YOUR_USERNAME/mahamudra-contemporary.git
Create a feature branch
git checkout -b feature/your-feature-name
Make your changes
Add tests for new functionality
cd tests/UnitTests/UnitTestsContemporary
# Add test methods to appropriate test class
Run the test suite
dotnet test
Build and verify
dotnet build --configuration Release
Submit a pull request
This project is licensed under the MIT License - see the LICENSE file for details.
Copyright (c) 2025 Mahamudra Contemporary Contributors
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
If you find this library useful, please consider giving it a star on GitHub! It helps others discover the project.
Repository: https://github.com/janmaru/mahamudra-contemporary NuGet Package: Mahamudra.Contemporary License: MIT Current Version: 3.0.2