Buildersoft Cortex Mediator is a library for .NET applications that implements the mediator pattern. It helps to reduce dependencies between objects by allowing in-process messaging without direct communication. Instead, objects communicate through Cortex Mediator, making them less coupled and more maintainable..
$ dotnet add package Cortex.MediatorCortex.Mediator is a lightweight and extensible implementation of the Mediator pattern for .NET applications, designed to power clean, modular architectures like Vertical Slice Architecture and CQRS.
Built as part of the Cortex Data Framework, this library simplifies command and query handling with built-in support for:
dotnet add package Cortex.Mediator
In Program.cs or Startup.cs:
builder.Services.AddCortexMediator(
new[] { typeof(Program) }, // Assemblies to scan for handlers
options => options.AddDefaultBehaviors() // Logging
);
Features/
CreateUser/
CreateUserCommand.cs
CreateUserCommandHandler.cs
CreateUserValidator.cs
CreateUserEndpoint.cs
public class CreateUserCommand : ICommand<Guid>
{
public string UserName { get; set; }
public string Email { get; set; }
}
public class CreateUserCommandHandler : ICommandHandler<CreateUserCommand,Guid>
{
public async Task<Guid> Handle(CreateUserCommand command, CancellationToken cancellationToken)
{
// Logic here
}
}
Simplified API (Recommended) - Type is automatically inferred:
// Using extension methods - no need to specify type parameters!
var userId = await mediator.SendAsync(command);
// For void commands (no return value)
await mediator.SendAsync(new DeleteUserCommand { UserId = userId });
Explicit Type Parameters (Legacy):
var userId = await mediator.SendCommandAsync<CreateUserCommand, Guid>(command);
public class CreateUserValidator : AbstractValidator<CreateUserCommand>
{
public CreateUserValidator()
{
RuleFor(x => x.UserName).NotEmpty();
RuleFor(x => x.Email).NotEmpty().EmailAddress();
}
}
public class GetUserQuery : IQuery<GetUserResponse>
{
public int UserId { get; set; }
}
public class GetUserQueryHandler : IQueryHandler<GetUserQuery, GetUserResponse>
{
public async Task<GetUserResponse> Handle(GetUserQuery query, CancellationToken cancellationToken)
{
return new GetUserResponse { UserId = query.UserId, UserName = "Andy" };
}
}
Simplified API (Recommended) - Type is automatically inferred:
// Using extension methods - no need to specify type parameters!
var user = await mediator.QueryAsync(new GetUserQuery { UserId = 1 });
Explicit Type Parameters (Legacy):
var user = await mediator.SendQueryAsync<GetUserQuery, GetUserResponse>(query);
public class UserCreatedNotification : INotification
{
public string UserName { get; set; }
}
public class SendWelcomeEmailHandler : INotificationHandler<UserCreatedNotification>
{
public async Task Handle(UserCreatedNotification notification, CancellationToken cancellationToken)
{
// Send email...
}
}
await mediator.PublishAsync(new UserCreatedNotification { UserName = "Andy" });
Out of the box, Cortex.Mediator supports:
LoggingCommandBehavior - Logs command execution with timingLoggingQueryBehavior - Logs query execution with timingLoggingNotificationBehavior - Logs notification publishing with timingExceptionHandlingCommandBehavior - Centralized exception handling for commandsExceptionHandlingQueryBehavior - Centralized exception handling for queriesExceptionHandlingNotificationBehavior - Centralized exception handling for notificationsValidationCommandBehavior - FluentValidation support (via Cortex.Mediator.Behaviors.FluentValidation)// Add default logging behaviors
options.AddDefaultBehaviors();
// Add exception handling behaviors
options.AddExceptionHandlingBehaviors();
// Add both logging and exception handling
options.AddDefaultBehaviorsWithExceptionHandling();
// Custom behaviors
options.AddOpenCommandPipelineBehavior(typeof(MyCustomBehavior<,>));
options.AddOpenQueryPipelineBehavior(typeof(MyCustomQueryBehavior<,>));
options.AddOpenNotificationPipelineBehavior(typeof(MyCustomNotificationBehavior<>));
The exception handling behaviors provide centralized exception handling with optional fallback results.
builder.Services.AddCortexMediator(
new[] { typeof(Program) },
options => options.AddExceptionHandlingBehaviors()
);
Implement IExceptionHandler to customize exception handling:
public class MyExceptionHandler : IExceptionHandler
{
private readonly ILogger<MyExceptionHandler> _logger;
public MyExceptionHandler(ILogger<MyExceptionHandler> logger)
{
_logger = logger;
}
public Task<bool> HandleAsync(
Exception exception,
Type requestType,
object request,
CancellationToken cancellationToken)
{
_logger.LogError(exception, "Error processing {RequestType}", requestType.Name);
// Return true to suppress the exception, false to rethrow
return Task.FromResult(false);
}
}
// Register in DI
services.AddSingleton<IExceptionHandler, MyExceptionHandler>();
For commands and queries that return a value, implement IExceptionHandler<TResult>:
public class FallbackExceptionHandler : IExceptionHandler<ApiResponse>
{
public Task<(bool handled, ApiResponse? result)> HandleWithResultAsync(
Exception exception,
Type requestType,
object request,
CancellationToken cancellationToken)
{
var fallback = new ApiResponse
{
Success = false,
Error = exception.Message
};
return Task.FromResult((true, fallback));
}
public Task<bool> HandleAsync(Exception exception, Type requestType, object request, CancellationToken cancellationToken)
=> Task.FromResult(false);
}
For notifications, you can suppress exceptions to allow other handlers to continue:
// The ExceptionHandlingNotificationBehavior has a suppressExceptions parameter
// When true, exceptions are logged but not rethrown
The caching behavior provides automatic caching of query results to improve performance.
// Add caching services
builder.Services.AddMediatorCaching(options =>
{
options.DefaultAbsoluteExpiration = TimeSpan.FromMinutes(5);
options.DefaultSlidingExpiration = TimeSpan.FromMinutes(1);
options.CacheKeyPrefix = "MyApp";
});
// Add mediator with caching behavior
builder.Services.AddCortexMediator(
new[] { typeof(Program) },
options => options.AddCachingBehavior()
);
Mark your query classes with the [Cacheable] attribute:
[Cacheable(AbsoluteExpirationSeconds = 300, SlidingExpirationSeconds = 60)]
public class GetUserQuery : IQuery<UserDto>
{
public int UserId { get; set; }
}
For more control, implement ICacheableQuery:
public class GetProductQuery : IQuery<ProductDto>, ICacheableQuery
{
public int ProductId { get; set; }
// Custom cache key
public string? CacheKey => $"product-{ProductId}";
// Custom expiration times
public TimeSpan? AbsoluteExpiration => TimeSpan.FromMinutes(10);
public TimeSpan? SlidingExpiration => TimeSpan.FromMinutes(2);
}
Use ICacheInvalidator to manually invalidate cached results:
public class UpdateUserCommandHandler : ICommandHandler<UpdateUserCommand>
{
private readonly ICacheInvalidator _cacheInvalidator;
public UpdateUserCommandHandler(ICacheInvalidator cacheInvalidator)
{
_cacheInvalidator = cacheInvalidator;
}
public async Task Handle(UpdateUserCommand command, CancellationToken cancellationToken)
{
// Update user in database...
// Invalidate the cached query result
_cacheInvalidator.Invalidate<GetUserQuery, UserDto>(
new GetUserQuery { UserId = command.UserId });
}
}
Implement ICacheKeyGenerator for custom key generation:
public class MyCacheKeyGenerator : ICacheKeyGenerator
{
public string GenerateKey<TQuery, TResult>(TQuery query)
where TQuery : IQuery<TResult>
{
// Custom key generation logic
return $"MyApp:{typeof(TQuery).Name}:{query.GetHashCode()}";
}
}
// Register custom generator
services.AddMediatorCaching<MyCacheKeyGenerator>();
Cortex.Mediator supports streaming queries that return IAsyncEnumerable<T>, perfect for handling large datasets efficiently without loading everything into memory.
// Define the streaming query
public class GetAllUsersQuery : IStreamQuery<UserDto>
{
public int PageSize { get; set; } = 100;
}
// Implement the streaming handler
public class GetAllUsersQueryHandler : IStreamQueryHandler<GetAllUsersQuery, UserDto>
{
private readonly IDbConnection _db;
public GetAllUsersQueryHandler(IDbConnection db)
{
_db = db;
}
public async IAsyncEnumerable<UserDto> Handle(
GetAllUsersQuery query,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
// Stream results from database one at a time
await foreach (var user in _db.StreamUsersAsync(query.PageSize, cancellationToken))
{
yield return new UserDto
{
Id = user.Id,
Name = user.Name,
Email = user.Email
};
}
}
}
// Using the StreamAsync extension method (recommended)
await foreach (var user in mediator.StreamAsync(new GetAllUsersQuery()))
{
Console.WriteLine($"Processing: {user.Name}");
// Process each user as it arrives - no need to wait for all results
}
// Or with explicit type parameters
await foreach (var user in mediator.CreateStream<GetAllUsersQuery, UserDto>(query))
{
Console.WriteLine(user.Name);
}
var cts = new CancellationTokenSource();
await foreach (var item in mediator.StreamAsync(query, cts.Token))
{
if (ShouldStop(item))
{
cts.Cancel(); // Gracefully stop streaming
break;
}
Process(item);
}
Register pipeline behaviors for streaming queries:
// Create a custom streaming behavior
public class MetricsStreamBehavior<TQuery, TResult> : IStreamQueryPipelineBehavior<TQuery, TResult>
where TQuery : IStreamQuery<TResult>
{
public async IAsyncEnumerable<TResult> Handle(
TQuery query,
StreamQueryHandlerDelegate<TResult> next,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
var count = 0;
await foreach (var item in next().WithCancellation(cancellationToken))
{
count++;
yield return item;
}
Console.WriteLine($"Streamed {count} items");
}
}
// Register the behavior
services.AddCortexMediator(
new[] { typeof(Program) },
options => options.AddOpenStreamQueryPipelineBehavior(typeof(MetricsStreamBehavior<,>))
);
// Use the built-in logging behavior
options.AddOpenStreamQueryPipelineBehavior(typeof(LoggingStreamQueryBehavior<,>));
Pre-processors run before the handler executes, and post-processors run after. They're simpler than pipeline behaviors and are ideal for cross-cutting concerns.
// Register processor behaviors
services.AddCortexMediator(
new[] { typeof(Program) },
options => options.AddProcessorBehaviors()
);
Pre-processors run before the handler and can be used for validation, authorization, or data enrichment:
public class LoggingPreProcessor<TRequest> : IRequestPreProcessor<TRequest>
{
private readonly ILogger<LoggingPreProcessor<TRequest>> _logger;
public LoggingPreProcessor(ILogger<LoggingPreProcessor<TRequest>> logger)
{
_logger = logger;
}
public Task ProcessAsync(TRequest request, CancellationToken cancellationToken)
{
_logger.LogInformation("Processing {RequestType}", typeof(TRequest).Name);
return Task.CompletedTask;
}
}
// Register for a specific request type
services.AddTransient<IRequestPreProcessor<CreateOrderCommand>, OrderValidationPreProcessor>();
// Or register for all requests (generic)
services.AddTransient(typeof(IRequestPreProcessor<>), typeof(LoggingPreProcessor<>));
Post-processors run after successful handler execution. Use them for logging, auditing, or triggering side effects:
// Post-processor for commands/queries that return a result
public class AuditPostProcessor<TRequest, TResponse> : IRequestPostProcessor<TRequest, TResponse>
{
private readonly IAuditService _auditService;
public AuditPostProcessor(IAuditService auditService)
{
_auditService = auditService;
}
public async Task ProcessAsync(TRequest request, TResponse response, CancellationToken cancellationToken)
{
await _auditService.LogAsync(new AuditEntry
{
RequestType = typeof(TRequest).Name,
ResponseType = typeof(TResponse).Name,
Timestamp = DateTime.UtcNow
});
}
}
// Post-processor for void commands
public class NotificationPostProcessor : IRequestPostProcessor<CreateOrderCommand>
{
private readonly IMediator _mediator;
public NotificationPostProcessor(IMediator mediator)
{
_mediator = mediator;
}
public async Task ProcessAsync(CreateOrderCommand request, CancellationToken cancellationToken)
{
// Publish a notification after the command completes
await _mediator.PublishAsync(new OrderCreatedNotification { /* ... */ }, cancellationToken);
}
}
We welcome contributions from the community! Whether it's reporting bugs, suggesting features, or submitting pull requests, your involvement helps improve Cortex for everyone.
git checkout -b feature/YourFeature
git commit -m "Add your feature"
git push origin feature/YourFeature
Describe your changes and submit the pull request for review.
This project is licensed under the MIT License.
Cortex is an open-source project maintained by BuilderSoft. Your support helps us continue developing and improving Cortex. Consider sponsoring us to contribute to the future of resilient streaming platforms.
Contact Us: cortex@buildersoft.io
We'd love to hear from you! Whether you have questions, feedback, or need support, feel free to reach out.

Thank you for using Cortex Data Framework! We hope it empowers you to build scalable and efficient data processing pipelines effortlessly.
Built with ❤️ by the Buildersoft team.