Package Description
License
—
Deps
5
Install Size
—
Vulns
✓ 0
Published
Jan 27, 2026
$ dotnet add package RmToolkit.MessageBusA compact guide for using the RmToolkit message bus and saga orchestration features.
Modern modular monolith applications often need clear boundaries between functional modules (e.g., User, Order, Inventory) without fully distributing services.
The RmToolkit Message Bus provides:
This enables modular architectures to evolve toward microservices later without major redesign, while preserving the simplicity and performance of a single-deployable monolith.
This repository contains a lightweight message-bus abstraction with support for:
RmToolkit.Result (included)RmToolkit.MessageBus.ContractsRmToolkit.MessageBus
Provides the message bus, DI extensions, transports, saga orchestration, filters, and runtime components.
RmToolkit.MessageBus.Contracts
Shared message contracts and interfaces (NuGet). Useful for external producers/consumers.
RmToolkit.Result
Included automatically for Result-pattern support.
Install contracts package if needed:
dotnet add package RmToolkit.MessageBus.Contracts
Register the bus during application startup:
builder.Services.AddMessageBus(conf =>
{
conf.UsingInMemory(opts =>
{
opts.ChannelCapacity = 1_000;
});
conf.ConsumerAssembliesToRegister = [typeof(Program).Assembly];
conf.AddProducerFilter(typeof(CommonProduceFilter<>), b => b.ForMessage<IntegrationEvent1>());
conf.AddPublishFilter(typeof(CommonPublishFilter<>));
conf.AddSendFilter(typeof(CommonSendFilter<>));
conf.AddConsumerFilter(typeof(ModuleAInboxFilter<>),
b => b.ForMessage<ModuleA.IntegrationEvent1>(order: 2));
conf.AddSagaStateMachine<ThreeStepSagaOrchestrator, ThreeStepState>()
.UseInMemory();
});
Notes
ConsumerAssembliesToRegister controls consumer discovery.InMemory, MemoryCache, DistributedCache, EF Core).Filters allow cross-cutting behavior around message flow.
public sealed class LoggingConsumeFilter<T> : IConsumeFilter<T>
{
private readonly ILogger<LoggingConsumeFilter<T>> _logger;
public LoggingConsumeFilter(ILogger<LoggingConsumeFilter<T>> logger)
{
_logger = logger;
}
public async Task Invoke(IConsumeContext<T> ctx, IPipe<T> next)
{
_logger.LogInformation("Handling {Message}", typeof(T).Name);
await next.Invoke(ctx);
}
}
Common uses
Consumers implement IConsumer<TMessage>.
internal sealed class UserCreatedEventHandler
: IConsumer<UserCreated>
{
private readonly ILogger<UserCreatedEventHandler> _logger;
public UserCreatedEventHandler(ILogger<UserCreatedEventHandler> logger)
{
_logger = logger;
}
public async Task Handle(IConsumeContext<UserCreated> context)
{
await Task.Delay(500);
_logger.LogInformation("UserCreated processed: {UserId}", context.Message.UserId);
}
}
public sealed class CompensateStepA
: IConsumer<Undo<DoStepA>>
{
public async Task Handle(IConsumeContext<Undo<DoStepA>> context)
{
await context.PublishAsync(
new Compensated<DoStepA> { CorrelationId = context.Message.CorrelationId }
);
}
}
public sealed class ThreeStepState : ISaga
{
public Guid CorrelationId { get; init; }
public State CurrentState { get; set; } = State.None;
public string? ErrorMessage { get; set; }
}
public record StartWorkflow : EventMessage;
public record DoStepA : CommandMessage;
public record DoStepB : CommandMessage;
public record DoStepC : CommandMessage;
public record StepACompleted : EventMessage;
public record StepBCompleted : EventMessage;
public record StepCCompleted : EventMessage;
Initially
.When(StartWorkflowEvent)
.Send(ctx => new DoStepA { CorrelationId = ctx.Message.CorrelationId })
.TransitionTo(StepAInProgress);
Fault<T> messages on failureUndo<T> compensation messagesFailed stateawait bus.SendAsync(new CreateUserCommand(1, "ARMIN"));
var response = await bus.RequestAsync<GetUserQuery, UserDto>(
new GetUserQuery(1),
timeout: TimeSpan.FromMinutes(1),
cancellationToken: cancellationToken
);
public sealed class GetUserHandler : IConsumer<GetUserQuery>
{
public async Task Handle(IConsumeContext<GetUserQuery> context)
{
await context.RespondAsync(new UserDto(1, "ARMIN"));
}
}
CorrelationId for sagasCorrelationId and CorrelateByIdConsumerAssembliesToRegisterMIT License