A high performance .NET Mediator pattern implemenation using source generation.
$ dotnet add package Mediator.SourceGenerator[!NOTE] Want to contribute? See the Contributing Guide for information on building, testing, and submitting changes.
This is a high performance .NET implementation of the Mediator pattern using source generators. It provides a similar API to the great MediatR library while delivering better performance and full Native AOT support. Packages target .NET Standard 2.0 and .NET 8.
The mediator pattern is great for implementing cross cutting concerns (logging, metrics, etc) and avoiding "fat" constructors due to lots of injected services.
Goals for this library
In particular, a source generator in this library is used to
IMediator implementation
Send methods are monomorphizedobject/generic arguments (takes effect above a certain project size threshold)IMediator and the concrete Mediator class, the latter allows for better performanceNuGet packages:
dotnet add package Mediator.SourceGenerator --version 3.0.*
dotnet add package Mediator.Abstractions --version 3.0.*
or
<PackageReference Include="Mediator.SourceGenerator" Version="3.0.*">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference>
<PackageReference Include="Mediator.Abstractions" Version="3.0.*" />
See this great video by @Elfocrash / Nick Chapsas, covering both similarities and differences between Mediator and MediatR
IRequest<> typeIPipelineBehavior<,> message with open genericsHere is a brief comparison benchmark hightighting the difference between MediatR and this library using Singleton lifetime.
Note that this library yields the best performance when using the Singleton service lifetime.
<ColdStart | Notification | Request | StreamRequest>_Mediator: the concrete Mediator class generated by this library<ColdStart | Notification | Request | StreamRequest>_IMediator: call through the IMediator interface in this library<ColdStart | Notification | Request | StreamRequest>_MediatR: the MediatR libraryBenchmark category descriptions
ColdStart - time to resolve IMediator from IServiceProvider and send a single requestNotification - publish a single notificationRequest - publish a single requestStreamRequest - stream a single request which yields 3 responses without delaySee the benchmarks/ folder for more detailed information, including varying lifetimes and project sizes.

There are two NuGet packages needed to use this library
IMediator implementation and dependency injection setup.IRequest<,>, INotification), handler types (IRequestHandler<,>, INotificationHandler<>), pipeline types (IPipelineBehavior)You install the source generator package into your edge/outermost project (i.e. ASP.NET Core application, Background worker project),
and then use the Mediator.Abstractions package wherever you define message types and handlers.
Standard message handlers are automatically picked up and added to the DI container in the generated AddMediator method.
Pipeline behaviors need to be added manually (including pre/post/exception behaviors).
For example implementations, see the /samples folder. See the ASP.NET Core clean architecture sample for a more real world setup.
IMessage - marker interfaceIStreamMessage - marker interfaceIBaseRequest - marker interface for requestsIRequest - a request message, no return value (ValueTask<Unit>)IRequest<out TResponse> - a request message with a response (ValueTask<TResponse>)IStreamRequest<out TResponse> - a request message with a streaming response (IAsyncEnumerable<TResponse>)IBaseCommand - marker interface for commandsICommand - a command message, no return value (ValueTask<Unit>)ICommand<out TResponse> - a command message with a response (ValueTask<TResponse>)IStreamCommand<out TResponse> - a command message with a streaming response (IAsyncEnumerable<TResponse>)IBaseQuery - marker interface for queriesIQuery<out TResponse> - a query message with a response (ValueTask<TResponse>)IStreamQuery<out TResponse> - a query message with a streaming response (IAsyncEnumerable<TResponse>)INotification - a notification message, no return value (ValueTask)As you can see, you can achieve the exact same thing with requests, commands and queries. But I find the distinction in naming useful if you for example use the CQRS pattern or for some reason have a preference on naming in your application.
IRequestHandler<in TRequest>IRequestHandler<in TRequest, TResponse>IStreamRequestHandler<in TRequest, out TResponse>ICommandHandler<in TCommand>ICommandHandler<in TCommand, TResponse>IStreamCommandHandler<in TCommand, out TResponse>IQueryHandler<in TQuery, TResponse>IStreamQueryHandler<in TQuery, out TResponse>INotificationHandler<in TNotification>These types are used in correlation with the message types above.
IPipelineBehavior<TMessage, TResponse>IStreamPipelineBehavior<TMessage, TResponse>MessagePreProcessor<TMessage, TResponse>MessagePostProcessor<TMessage, TResponse>StreamMessagePreProcessor<TMessage, TResponse>StreamMessagePostProcessor<TMessage, TResponse>MessageExceptionHandler<TMessage, TResponse, TException>// As a normal pipeline behavior
public sealed class MessageValidatorBehaviour<TMessage, TResponse> : IPipelineBehavior<TMessage, TResponse>
where TMessage : IValidate
{
public ValueTask<TResponse> Handle(
TMessage message,
MessageHandlerDelegate<TMessage, TResponse> next,
CancellationToken cancellationToken
)
{
if (!message.IsValid(out var validationError))
throw new ValidationException(validationError);
return next(message, cancellationToken);
}
}
// Or as a pre-processor
public sealed class MessageValidatorBehaviour<TMessage, TResponse> : MessagePreProcessor<TMessage, TResponse>
where TMessage : IValidate
{
protected override ValueTask Handle(TMessage message, CancellationToken cancellationToken)
{
if (!message.IsValid(out var validationError))
throw new ValidationException(validationError);
return default;
}
}
// Register as IPipelineBehavior<,> in either case
// NOTE: for NativeAOT, you should use the pipeline configuration on `MediatorOptions` instead (during `AddMediator`)
services.AddSingleton(typeof(IPipelineBehavior<,>), typeof(MessageValidatorBehaviour<,>))
// As a normal pipeline behavior
public sealed class ErrorLoggingBehaviour<TMessage, TResponse> : IPipelineBehavior<TMessage, TResponse>
where TMessage : IMessage
{
private readonly ILogger<ErrorLoggingBehaviour<TMessage, TResponse>> _logger;
public ErrorLoggingBehaviour(ILogger<ErrorLoggingBehaviour<TMessage, TResponse>> logger)
{
_logger = logger;
}
public async ValueTask<TResponse> Handle(
TMessage message,
MessageHandlerDelegate<TMessage, TResponse> next,
CancellationToken cancellationToken
)
{
try
{
return await next(message, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error handling message of type {messageType}", message.GetType().Name);
throw;
}
}
}
// Or as an exception handler
public sealed class ErrorLoggingBehaviour<TMessage, TResponse> : MessageExceptionHandler<TMessage, TResponse>
where TMessage : notnull, IMessage
{
private readonly ILogger<ErrorLoggingBehaviour<TMessage, TResponse>> _logger;
public ErrorLoggingBehaviour(ILogger<ErrorLoggingBehaviour<TMessage, TResponse>> logger)
{
_logger = logger;
}
protected override ValueTask<ExceptionHandlingResult<TResponse>> Handle(
TMessage message,
Exception exception,
CancellationToken cancellationToken
)
{
_logger.LogError(exception, "Error handling message of type {messageType}", message.GetType().Name);
// Let the exception bubble up by using the base class helper NotHandled:
return NotHandled;
// Or if the exception is properly handled, you can just return your own response,
// using the base class helper Handle().
// This requires you to know something about TResponse,
// so TResponse needs to be constrained to something,
// typically with a static abstract member acting as a consructor on an interface or abstract class.
return Handled(null!);
}
}
// Register as IPipelineBehavior<,> in either case
services.AddSingleton(typeof(IPipelineBehavior<,>), typeof(ErrorLoggingBehaviour<,>))
For streaming messages (e.g., IStreamRequest<TResponse>, IStreamCommand<TResponse>, IStreamQuery<TResponse>), you can use StreamMessagePreProcessor and StreamMessagePostProcessor to handle pre/post-processing logic.
NOTE: StreamMessagePostProcessor implementations will need to buffer all responses so that they can be passed as an IReadOnlyList<> to the Handle method.
// Stream message pre-processor - runs once before stream starts
public sealed class StreamLoggingPreProcessor<TMessage, TResponse> : StreamMessagePreProcessor<TMessage, TResponse>
where TMessage : notnull, IStreamMessage
{
private readonly ILogger<StreamLoggingPreProcessor<TMessage, TResponse>> _logger;
public StreamLoggingPreProcessor(ILogger<StreamLoggingPreProcessor<TMessage, TResponse>> logger)
{
_logger = logger;
}
protected override ValueTask Handle(TMessage message, CancellationToken cancellationToken)
{
_logger.LogInformation("Starting stream processing for {MessageType}", typeof(TMessage).Name);
return default;
}
}
// Stream message post-processor - runs once after stream completes
public sealed class StreamLoggingPostProcessor<TMessage, TResponse> : StreamMessagePostProcessor<TMessage, TResponse>
where TMessage : notnull, IStreamMessage
{
private readonly ILogger<StreamLoggingPostProcessor<TMessage, TResponse>> _logger;
public StreamLoggingPostProcessor(ILogger<StreamLoggingPostProcessor<TMessage, TResponse>> logger)
{
_logger = logger;
}
protected override ValueTask Handle(TMessage message, IReadOnlyList<TResponse> responses, CancellationToken cancellationToken)
{
_logger.LogInformation("Stream completed with {Count} items", responses.Count);
return default;
}
}
// Register as IStreamPipelineBehavior<,>
// NOTE: for NativeAOT, you should use the pipeline configuration on `MediatorOptions` instead (during `AddMediator`)
services.AddSingleton(typeof(IStreamPipelineBehavior<,>), typeof(StreamLoggingPreProcessor<,>))
services.AddSingleton(typeof(IStreamPipelineBehavior<,>), typeof(StreamLoggingPostProcessor<,>))
There are two ways to configure Mediator. Configuration values are needed during compile-time since this is a source generator:
MediatorOptionsAttributeAddMediator function.services.AddMediator((MediatorOptions options) =>
{
options.Namespace = "SimpleConsole.Mediator";
options.ServiceLifetime = ServiceLifetime.Singleton;
// Only available from v3:
options.GenerateTypesAsInternal = true;
options.NotificationPublisherType = typeof(Mediator.ForeachAwaitPublisher);
options.Assemblies = [typeof(...)];
options.Types = [typeof(IModuleMarker)];
options.PipelineBehaviors = [];
options.StreamPipelineBehaviors = [];
// Only available from v3.1:
options.CachingMode = CachingMode.Eager;
});
// or
[assembly: MediatorOptions(Namespace = "SimpleConsole.Mediator", ServiceLifetime = ServiceLifetime.Singleton, CachingMode = CachingMode.Eager)]
Namespace - where the IMediator implementation is generatedServiceLifetime - the DI service lifetime
Singleton - (default value) everything registered as singletons, minimal allocationsTransient - mediator and handlers registered as transientScoped - mediator and handlers registered as scopedGenerateTypesAsInternal - makes all generated types internal as opposed to public, which may be necessary depending on project setup (e.g. for Blazor sample)NotificationPublisherType - the type used for publishing notifications (ForeachAwaitPublisher and TaskWhenAllPublisher are built in)Assemblies - which assemblies the source generator should scan for messages and handlers. When not used the source generator will scan all references assemblies (same behavior as v2)Types - which message types to include for generation (requests/commands/queries/stream variants/notifications)
typeof(...) expressions in the AddMediator configurationPipelineBehaviors/StreamPipelineBehaviors - ordered array of types used for the pipeline
CachingMode - controls when Mediator initialization occurs
Eager (default) - all handler wrappers and lookups are initialized on first Mediator access, best for long-running applications where startup cost is amortizedLazy - handler wrappers are initialized on-demand as messages are processed, best for cold start scenarios (serverless, Native AOT) where minimal initialization is preferredTelemetry - configures telemetry emitted by the generated mediator implementation
EnableMetrics - enables metrics using System.Diagnostics.Metrics (requires .NET 8+)MeterName - meter name used when metrics are enabledEnableTracing - enables tracing using System.Diagnostics.ActivitySource (requires System.Diagnostics.DiagnosticSource APIs to be available on the target compilation)ActivitySourceName - activity source name used when tracing is enabledHistogramBuckets - optional custom bucket boundaries for messaging.process.duration (only configurable through MediatorOptions, not assembly attribute)For assembly attribute configuration (MediatorOptionsAttribute), telemetry is configured with:
TelemetryEnableMetricsTelemetryMeterNameTelemetryEnableTracingTelemetryActivitySourceNameNote that since parsing of these options is done during compilation/source generation, all values must be compile time constants.
In addition, since some types are not valid attributes parameter types (such as arrays/lists), some configuration is only available through AddMediator/MediatorOptions and not the assembly attribute.
Singleton lifetime is highly recommended as it yields the best performance. Every application is different, but it is likely that a lot of your message handlers doesn't keep state and have no need for transient or scoped lifetime. In a lot of cases those lifetimes only allocate lots of memory for no particular reason.
Mediator can emit telemetry for requests, streams and notifications using BCL APIs (System.Diagnostics.Metrics and System.Diagnostics.ActivitySource).
services.AddMediator((MediatorOptions options) =>
{
options.Telemetry.EnableMetrics = true;
options.Telemetry.EnableTracing = true;
options.Telemetry.MeterName = "<optional-custom-name>";
options.Telemetry.ActivitySourceName = "<optional-custom-name>";
});
Telemetry emitted by Mediator follows OpenTelemetry semantic conventions for messaging:
messaging.process.duration (Histogram<double>, unit s)<messaging.operation.name> <messaging.destination.name> - one Activity per message operation, kind Consumermessaging.operation.name):
send for request/query/command messagescreatestream for stream request/query/command messagespublish for notificationsmessaging.operation.type): processmessaging.system=mediatormessaging.destination.name=<message type name>error.type=<exception full name> (only when an operation fails)messaging.mediator.message.kind=command|query|request|streamcommand|streamquery|streamrequest|notificationTo connect this with OpenTelemetry, wire Meter/ActivitySource in your app:
builder.Services
.AddOpenTelemetry()
.WithMetrics(metrics => metrics.AddMeter(Mediator.Mediator.MeterName))
.WithTracing(tracing => tracing.AddSource(Mediator.Mediator.ActivitySourceName));
Mediator does not automatically call OpenTelemetry SDK configuration extension methods for you.
If you configured a custom mediator namespace, use that generated concrete mediator type instead of Mediator.Mediator.
See ASP.NET Core sample for example OpenTelemetry configuration using Mediator telemetry.
In this section we will get started with Mediator and go through a sample illustrating the various ways the Mediator pattern can be used in an application.
See the full runnable sample code in the Showcase sample.
dotnet add package Mediator.SourceGenerator --version 3.0.*
dotnet add package Mediator.Abstractions --version 3.0.*
or
<PackageReference Include="Mediator.SourceGenerator" Version="3.0.*">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference>
<PackageReference Include="Mediator.Abstractions" Version="3.0.*" />
In ConfigureServices or equivalent, call AddMediator (unless MediatorOptions is configured, default namespace is Mediator).
This registers your handler below.
using Mediator;
using Microsoft.Extensions.DependencyInjection;
using System;
var services = new ServiceCollection(); // Most likely IServiceCollection comes from IHostBuilder/Generic host abstraction in Microsoft.Extensions.Hosting
services.AddMediator();
using var serviceProvider = services.BuildServiceProvider();
IRequest<> typeservices.AddMediator((MediatorOptions options) => options.Assemblies = [typeof(Ping)]);
using var serviceProvider = services.BuildServiceProvider();
var mediator = serviceProvider.GetRequiredService<IMediator>();
var ping = new Ping(Guid.NewGuid());
var pong = await mediator.Send(ping);
Debug.Assert(ping.Id == pong.Id);
// ...
public sealed record Ping(Guid Id) : IRequest<Pong>;
public sealed record Pong(Guid Id);
public sealed class PingHandler : IRequestHandler<Ping, Pong>
{
public ValueTask<Pong> Handle(Ping request, CancellationToken cancellationToken)
{
return new ValueTask<Pong>(new Pong(request.Id));
}
}
As soon as you code up message types, the source generator will add DI registrations automatically (inside AddMediator).
P.S - You can inspect the code yourself - open Mediator.g.cs in VS from Project -> Dependencies -> Analyzers -> Mediator.SourceGenerator -> Mediator.SourceGenerator.MediatorGenerator,
or just F12 through the code.
The pipeline behavior below validates all incoming Ping messages.
Pipeline behaviors currently must be added manually.
services.AddMediator((MediatorOptions options) =>
{
options.Assemblies = [typeof(Ping)];
options.PipelineBehaviors = [typeof(PingValidator)];
});
public sealed class PingValidator : IPipelineBehavior<Ping, Pong>
{
public ValueTask<Pong> Handle(Ping request, MessageHandlerDelegate<Ping, Pong> next, CancellationToken cancellationToken)
{
if (request is null || request.Id == default)
throw new ArgumentException("Invalid input");
return next(request, cancellationToken);
}
}
IPipelineBehavior<,> message with open genericsAdd open generic handler to process all or a subset of messages passing through Mediator.
This handler will log any error that is thrown from message handlers (IRequest, ICommand, IQuery).
It also publishes a notification allowing notification handlers to react to errors.
Message pre- and post-processors along with the exception handlers can also constrain the generic type parameters in the same way.
services.AddMediator((MediatorOptions options) =>
{
options.Assemblies = [typeof(ErrorMessage)];
options.PipelineBehaviors = [typeof(ErrorLoggerHandler<,>)];
});
public sealed record ErrorMessage(Exception Exception) : INotification;
public sealed record SuccessfulMessage() : INotification;
public sealed class ErrorLoggerHandler<TMessage, TResponse> : IPipelineBehavior<TMessage, TResponse>
where TMessage : IMessage // Constrained to IMessage, or constrain to IBaseCommand or any custom interface you've implemented
{
private readonly ILogger<ErrorLoggerHandler<TMessage, TResponse>> _logger;
private readonly IMediator _mediator;
public ErrorLoggerHandler(ILogger<ErrorLoggerHandler<TMessage, TResponse>> logger, IMediator mediator)
{
_logger = logger;
_mediator = mediator;
}
public async ValueTask<TResponse> Handle(TMessage message, MessageHandlerDelegate<TMessage, TResponse> next, CancellationToken cancellationToken)
{
try
{
var response = await next(message, cancellationToken);
return response;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error handling message");
await _mediator.Publish(new ErrorMessage(ex), cancellationToken);
throw;
}
}
}
We can define a notification handler to catch errors from the above pipeline behavior.
// Notification handlers are automatically added to DI container
public sealed class ErrorNotificationHandler : INotificationHandler<ErrorMessage>
{
public ValueTask Handle(ErrorMessage error, CancellationToken cancellationToken)
{
// Could log to application insights or something...
return default;
}
}
We can also define a notification handler that receives all notifications. Note that polymorphic dispatch does not work with struct notifications.
public sealed class StatsNotificationHandler : INotificationHandler<INotification> // or any other interface deriving from INotification
{
private long _messageCount;
private long _messageErrorCount;
public (long MessageCount, long MessageErrorCount) Stats => (_messageCount, _messageErrorCount);
public ValueTask Handle(INotification notification, CancellationToken cancellationToken)
{
Interlocked.Increment(ref _messageCount);
if (notification is ErrorMessage)
Interlocked.Increment(ref _messageErrorCount);
return default;
}
}
public sealed class GenericNotificationHandler<TNotification> : INotificationHandler<TNotification>
where TNotification : INotification // Generic notification handlers will be registered as open constrained types automatically
{
public ValueTask Handle(TNotification notification, CancellationToken cancellationToken)
{
return default;
}
}
Notification publishers are responsible for dispatching notifications to a collection of handlers. There are two built in implementations:
ForeachAwaitPublisher - the default, dispatches the notifications to handlers in order 1-by-1TaskWhenAllPublisher - dispatches notifications in parallelBoth of these try to be efficient by handling a number of special cases (early exit on sync completion, single-handler, array of handlers).
Below we implement a custom one by simply using Task.WhenAll.
services.AddMediator((MediatorOptions options) =>
{
options.NotificationPublisherType = typeof(FireAndForgetNotificationPublisher);
});
public sealed class FireAndForgetNotificationPublisher : INotificationPublisher
{
public async ValueTask Publish<TNotification>(
NotificationHandlers<TNotification> handlers,
TNotification notification,
CancellationToken cancellationToken
)
where TNotification : INotification
{
try
{
await Task.WhenAll(handlers.Select(handler => handler.Handle(notification, cancellationToken).AsTask()));
}
catch (Exception ex)
{
// Notifications should be fire-and-forget, we just need to log it!
// This way we don't have to worry about exceptions bubbling up when publishing notifications
Console.Error.WriteLine(ex);
// NOTE: not necessarily saying this is a good idea!
}
}
}
Since version 1.* of this library there is support for streaming using IAsyncEnumerable.
var mediator = serviceProvider.GetRequiredService<IMediator>();
var ping = new StreamPing(Guid.NewGuid());
await foreach (var pong in mediator.CreateStream(ping))
{
Debug.Assert(ping.Id == pong.Id);
Console.WriteLine("Received pong!"); // Should log 5 times
}
// ...
public sealed record StreamPing(Guid Id) : IStreamRequest<Pong>;
public sealed record Pong(Guid Id);
public sealed class PingHandler : IStreamRequestHandler<StreamPing, Pong>
{
public async IAsyncEnumerable<Pong> Handle(StreamPing request, [EnumeratorCancellation] CancellationToken cancellationToken)
{
for (int i = 0; i < 5; i++)
{
await Task.Delay(1000, cancellationToken);
yield return new Pong(request.Id);
}
}
}
Since this is a source generator, diagnostics are also included. Examples below


This is a work in progress list on the differences between this library and MediatR.
RequestHandlerDelegate<TResponse>() -> MessageHandlerDelegate<TMessage, TResponse>(TMessage message, CancellationToken cancellationToken)
ServiceFactory
Microsoft.Extensions.DependencyInjection, so it only works with DI containers that integrate with those abstractions.MediatR.Extensions.Microsoft.DependencyInjection does transient service registration by default, which leads to a lot of allocations. Even if it is configured for singleton lifetime, IMediator and ServiceFactory services are registered as transient (not configurable).ValueTask<T> instead of Task<T>, to allow for fewer allocations (for example if the handler completes synchronously, or using async method builder pooling/PoolingAsyncValueTaskMethodBuilder<T>)For versioning this library I try to follow semver 2.0 as best as I can, meaning
There are various options for Mediator implementations in the .NET ecosystem. Here are some good ones that you might consider: