A sophisticated .NET pipeline processing library that enables building complex data processing workflows through a fluent API. Provides a robust and extensible framework for processing data through a series of configurable, composable, and resilient steps (pipes). Features include sequential, parallel, conditional, and sub-pipeline execution, resource sharing with thread-safe operations, comprehensive error handling (retry policy, circuit breaker policy, fallback policy, circuit breaker strategy, retry with backoff strategy), pipeline validation with dependency checking, performance monitoring with metrics collection, cancellation support, and asynchronous operations.
$ dotnet add package InzSoftwares.NetPipelineA sophisticated .NET pipeline processing library that enables building complex data processing workflows through a fluent API. The library provides a robust and extensible framework for processing data through a series of configurable, composable, and resilient steps (pipes).
InzSoftwares.NetPipeline is a powerful .NET 9.0 library that implements a fluent pipeline pattern for processing data through a series of configurable, composable, and resilient steps (pipes). The library provides an intuitive API for building data processing pipelines where each pipe performs a specific transformation or operation on data as it flows through the pipeline.
The architecture follows the pipeline pattern with a fluent builder interface that allows for sequential, parallel, conditional, and sub-pipeline execution. InzSoftwares.NetPipeline provides comprehensive error handling, performance monitoring, cancellation support, and resource sharing capabilities to create robust and maintainable data processing workflows.
Execute pipes one after another in a specified order with full control over data flow:
var pipeline = new PipelineBuilder<InputData, OutputData>();
await pipeline.SetSource(input)
.AttachContext(context)
.AttachPipe(new StepOnePipe())
.AttachPipe(new StepTwoPipe())
.AttachPipe(new StepThreePipe())
.Flush();Execute multiple pipes concurrently to improve performance for independent operations:
await pipeline
.AttachParallelPipes(
new StepOnePipe(),
new StepTwoPipe(),
new StepThreePipe()
)
.Flush();Execute pipes based on runtime conditions, allowing for dynamic pipeline behavior:
await pipeline.SetSource(input)
.AttachContext(context)
.AttachPipe(new StepOnePipe())
.AttachConditionalPipe(new StepTwoPipe(), context => context.Input.CanExecutePipeTwo)
.AttachConditionalPipe(new StepThreePipe(), context => !context.Input.CanExecutePipeTwo)
.Flush();Create reusable pipeline components for better composability and maintainability:
var dataProcessingPipeline = new SubPipeline<InputData, OutputData>(subBuilder =>
subBuilder
.AttachPipe(new ValidateInputPipe())
.AttachPipe(new TransformDataPipe())
.AttachPipe(new EnrichDataPipe())
);
await pipeline
.AttachSubPipeline(dataProcessingPipeline)
.Flush();Share data between pipes through the context repository with thread-safe operations:
// In one pipe
context.AddResource("userId", 123);
// In another pipe
var userId = context.GetResource<int>("userId");
// Thread-safe operations
context.TryAddResource("cacheKey", "value"); // Returns false if key exists
context.UpdateResource("existingKey", "newValue");Comprehensive error handling with multiple policies and strategies for resilience:
Policies (applied during execution):
Recovery Strategies (applied after failure occurs):
For more detailed information on implementing these error handling techniques, see the Error Handling Guide in the main repository.
Comprehensive validation system to catch configuration issues early:
// Attach a custom validator (optional - if not attached, default validation is used)
var validator = new DefaultPipelineValidator<InputData, OutputData>();
await pipeline.AttachValidator(validator);
// Validate the pipeline configuration
await pipeline.SetSource(input)
.AttachContext(context)
.AttachPipe(new StepOnePipe())
.AttachPipe(new StepTwoPipe())
.ValidateConfiguration(); // This will throw an exception if validation fails
// Or get validation results without throwing
var (errors, warnings) = await pipeline.ValidateConfigurationForResult();
if (errors.Any())
{
// Handle errors
Console.WriteLine($"Validation errors: {string.Join(", ", errors)}");
}
// Pipes can declare their resource dependencies to enable validation:
public class MyPipe : IPipe<InputData, OutputData>
{
public async Task Handle(IPipelineContext<InputData, OutputData> context, CancellationToken cancellationToken = default)
{
// Pipe implementation
await Task.CompletedTask;
}
// Declare required resources
public IEnumerable<string> GetRequiredResources() => ["user_id", "session_token"];
// Declare provided resources
public IEnumerable<string> GetProvidedResources() => ["processed_data"];
}Detailed performance tracking with comprehensive metrics collection:
await pipeline.SetSource(input)
.AttachContext(context)
.EnablePerformanceMetrics("my-correlation-id")
.AttachPipe(new StepOnePipe())
.AttachPipe(new StepTwoPipe())
.Flush();
Console.WriteLine(context.GetPerformanceMetricsSummary());
// Or access individual metrics
Console.WriteLine($"Total Duration: {context.PerformanceMetrics?.TotalDurationMs:F2} ms");
Console.WriteLine($"Initial Memory: {context.PerformanceMetrics?.MemoryMetrics?.InitialMemoryBytes} bytes");
Console.WriteLine($"Final Memory: {context.PerformanceMetrics?.MemoryMetrics?.FinalMemoryBytes} bytes");Granular control over cancellation with proper propagation to sub-pipelines and ICancellablePipe implementation:
var cancellationTokenSource = new CancellationTokenSource();
// Use cancellation with pipeline execution
await pipeline.SetSource(input)
.AttachContext(context)
.AttachPipe(new StepOnePipe())
.AttachPipe(new StepTwoPipe())
.Flush(cancellationTokenSource.Token);
// Later cancel the operation
cancellationTokenSource.Cancel();For pipes that need to handle cancellation more gracefully and perform cleanup operations, implement ICancellablePipe:
public class MyCancellablePipe : ICancellablePipe<InputData, OutputData>
{
public async Task Handle(IPipelineContext<InputData, OutputData> context, CancellationToken cancellationToken = default)
{
// Check for cancellation periodically during long-running operations
for (int i = 0; i < 100; i++)
{
cancellationToken.ThrowIfCancellationRequested();
await Task.Delay(100, cancellationToken);
// Perform work
}
}
public async Task HandleCancellation(IPipelineContext<InputData, OutputData> context, CancellationToken cancellationToken = default)
{
// Perform cleanup operations when cancellation is requested
// This method is called before the pipe execution is terminated
// allowing for proper resource cleanup and state management
Console.WriteLine("Performing cleanup before cancellation...");
await Task.CompletedTask;
}
}Full support for strongly-typed generic pipelines with async/await patterns:
public class PipelineBuilder<TIn, TOut> where TOut : class
{
// Strongly typed pipeline with compile-time safety
}The InzSoftwares.NetPipeline library is available as a NuGet package:
Install-Package InzSoftwares.NetPipeline
dotnet add package InzSoftwares.NetPipeline
<PackageReference Include="InzSoftwares.NetPipeline" Version="x.x.x" />Install the NuGet package into your project as shown above.
Define your data models:
public class InputData
{
public string Name { get; set; } = string.Empty;
public int Age { get; set; }
public string Email { get; set; } = string.Empty;
}
public class OutputData
{
public bool IsValid { get; set; }
public string ProcessedName { get; set; } = string.Empty;
public string HashedEmail { get; set; } = string.Empty;
public List<string> Warnings { get; set; } = new();
}using InzPipeline.Core;
public class ProcessingContext : PipelineContext<InputData, OutputData>
{
public ProcessingContext()
{
Input = new InputData();
Output = new OutputData();
}
}using InzPipeline.Core.Contracts;
public class ValidateInputPipe : IPipe<InputData, OutputData>
{
public async Task Handle(IPipelineContext<InputData, OutputData> context,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(context.Input.Name))
{
context.Output.Warnings.Add("Name is empty");
}
context.Output.IsValid = !string.IsNullOrEmpty(context.Input.Email) && context.Input.Age >= 0;
await Task.CompletedTask;
}
}
public class ProcessNamePipe : IPipe<InputData, OutputData>
{
public async Task Handle(IPipelineContext<InputData, OutputData> context,
CancellationToken cancellationToken = default)
{
context.Output.ProcessedName = context.Input.Name.ToUpper();
await Task.CompletedTask;
}
}using InzPipeline.Core;
var pipeline = new PipelineBuilder<InputData, OutputData>();
var context = new ProcessingContext();
await pipeline.SetSource(new InputData { Name = "John Doe", Email = "john@example.com", Age = 30 })
.AttachContext(context)
.AttachPipe(new ValidateInputPipe())
.AttachPipe(new ProcessNamePipe())
.Flush();
Console.WriteLine($"Result: {context.Output.ProcessedName}, Valid: {context.Output.IsValid}");For comprehensive documentation on specific features and implementation details, visit the main repository:
Here's a complete example of using the InzSoftwares.NetPipeline library:
public class UserInput
{
public string Name { get; set; } = string.Empty;
public int Age { get; set; }
public string Email { get; set; } = string.Empty;
}
public class UserOutput
{
public bool IsValid { get; set; }
public string ProcessedName { get; set; } = string.Empty;
public string HashedEmail { get; set; } = string.Empty;
public List<string> Warnings { get; set; } = new();
}public class UserProcessingContext : PipelineContext<UserInput, UserOutput>
{
public UserProcessingContext()
{
Input = new UserInput();
Output = new UserOutput();
}
}public class ValidateUserPipe : IPipe<UserInput, UserOutput>
{
public async Task Handle(IPipelineContext<UserInput, UserOutput> context,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(context.Input.Name))
{
context.Output.Warnings.Add("Name is empty");
}
context.Output.IsValid = !string.IsNullOrEmpty(context.Input.Email) && context.Input.Age >= 0;
await Task.CompletedTask;
}
}
public class ProcessUserNamePipe : IPipe<UserInput, UserOutput>
{
public async Task Handle(IPipelineContext<UserInput, UserOutput> context,
CancellationToken cancellationToken = default)
{
context.Output.ProcessedName = context.Input.Name.ToUpper();
await Task.CompletedTask;
}
}
public class HashEmailPipe : IPipe<UserInput, UserOutput>
{
public async Task Handle(IPipelineContext<UserInput, UserOutput> context,
CancellationToken cancellationToken = default)
{
// Example implementation for hashing email
context.Output.HashedEmail = Convert.ToBase64String(Encoding.UTF8.GetBytes(context.Input.Email));
await Task.CompletedTask;
}
}var pipeline = new PipelineBuilder<UserInput, UserOutput>();
var context = new UserProcessingContext();
await pipeline.SetSource(new UserInput { Name = "John Doe", Email = "john@example.com", Age = 30 })
.AttachContext(context)
.AttachPipe(new ValidateUserPipe())
.AttachPipe(new ProcessUserNamePipe())
.AttachPipe(new HashEmailPipe())
.Flush();
Console.WriteLine($"Result: {context.Output.ProcessedName}, Valid: {context.Output.IsValid}");// Retry Policy - automatically retry failed operations during execution
await pipeline.SetSource(input)
.AttachContext(context)
.AttachPipeWithRetryPolicy(new ExternalServicePipe(), maxAttempts: 3, delay: TimeSpan.FromSeconds(1))
.Flush();
// Circuit Breaker Policy - prevent cascading failures during execution
await pipeline.SetSource(input)
.AttachContext(context)
.AttachPipeWithCircuitBreakerPolicy(new DatabasePipe(), failureThreshold: 5, timeout: TimeSpan.FromMinutes(2))
.Flush();
// Fallback Policy - execute alternative operations when primary fails during execution
await pipeline.SetSource(input)
.AttachContext(context)
.AttachPipeWithFallbackPolicy(new PrimaryServicePipe(), new FallbackServicePipe())
.Flush();// Circuit Breaker Strategy - reactive circuit breaker for post-failure recovery
await pipeline.SetSource(input)
.AttachContext(context)
.AttachPipeWithCircuitBreakerStrategy(new UnreliableServicePipe(), failureThreshold: 2, timeout: TimeSpan.FromMinutes(1))
.Flush();
// Retry With Backoff Strategy - retry failed operations after failure occurs
await pipeline.SetSource(input)
.AttachContext(context)
.AttachPipeWithRetryStrategy(new NetworkOperationPipe(), maxAttempts: 3, initialDelay: TimeSpan.FromSeconds(1))
.Flush();
// Global Recovery Strategy - apply recovery strategy to all pipes in the pipeline
await pipeline.SetSource(input)
.AttachContext(context)
.WithRecoveryStrategy(new RetryWithBackoffStrategy<InputData, OutputData>(maxAttempts: 2))
.AttachPipe(new FlakyOperationPipe())
.Flush();await pipeline.SetSource(input)
.AttachContext(context)
.EnablePerformanceMetrics("my-correlation-id")
.AttachPipe(new StepOnePipe())
.AttachPipe(new StepTwoPipe())
.Flush();
Console.WriteLine($"Total Duration: {context.PerformanceMetrics.TotalDurationMs}ms");The main entry point for building pipelines. Provides methods to attach pipes, set source data, and configure various pipeline behaviors including sequential, parallel, conditional, and sub-pipeline execution. Supports all error handling mechanisms and performance metrics collection.
Manages data flow between pipes, contains methods for resource sharing using a ConcurrentDictionary for thread-safe operations, error handling with detailed error tracking, and performance metrics collection. Provides methods for adding, getting, updating, and removing resources from the shared repository.
Interface that all pipe implementations must implement. Contains a Handle method that performs the pipe's operation and methods for declaring resource dependencies via GetRequiredResources() and GetProvidedResources().
Allows for creating reusable pipeline components that can be attached to parent pipelines, enabling composition and reusability of pipeline segments.
Specialized interface for pipes that need to handle cancellation more gracefully by implementing a HandleCancellation method to perform cleanup operations when cancellation is requested.
context.ContinueOnFailure = true if you want to continue after an errorPipelineErrors collection to access detailed error informationInzSoftwares.NetPipeline provides comprehensive performance monitoring capabilities:
await pipeline.SetSource(input)
.AttachContext(context)
.EnablePerformanceMetrics("my-correlation-id")
.AttachPipe(new StepOnePipe())
.AttachPipe(new StepTwoPipe())
.Flush();
// Get a summary of all metrics
Console.WriteLine(context.GetPerformanceMetricsSummary());
// Access specific metrics
Console.WriteLine($"Total Duration: {context.PerformanceMetrics.TotalDurationMs}ms");
Console.WriteLine($"Memory Increase: {context.PerformanceMetrics.MemoryMetrics.MemoryIncrease} bytes");The InzSoftwares.NetPipeline library is part of the InzPipeline project which includes:
InzPipeline/
├── InzPipeline.Core/ # Core library (this package)
│ ├── Cancellation/ # Cancellation support
│ ├── Configuration/ # Pipeline configuration options
│ ├── Contracts/ # Interface definitions
│ ├── ErrorHandling/ # Error handling policies and strategies
│ ├── Models/ # Data models used internally
│ ├── Steps/ # Pipeline step implementations
│ ├── Validation/ # Pipeline validation components
│ ├── PipelineBuilder.cs # Main pipeline orchestration
│ ├── PipelineContext.cs # Pipeline context management
│ └── SubPipeline.cs # Sub-pipeline functionality
├── InzPipeline.Sample/ # Sample application demonstrating usage
└── InzPipeline.Tests/ # Unit and integration tests
If you have questions, issues, or suggestions about InzSoftwares.NetPipeline, please:
Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
git checkout -b feature/AmazingFeaturedotnet testgit commit -m 'Add some AmazingFeature'git push origin feature/AmazingFeatureWe use Semantic Versioning (SemVer) for versioning. For the versions available, see the tags on this repository.
This project is licensed under the MIT License - see the LICENSE file for details.
InzSoftwares.NetPipeline - A powerful and flexible pipeline processing library for .NET applications