Package Description
License
—
Deps
5
Install Size
—
Vulns
✓ 0
Published
Jan 27, 2026
$ dotnet add package RmToolkit.MessageBusA compact guide for using the RmToolkit message bus and saga orchestration features used in your project.
Modern modular monolith applications often need boundaries between functional modules (e.g., User, Order, Inventory) without fully distributing services.
The RmToolkit Message Bus provides:
This enables modular architectures to evolve toward microservices later without major redesign, while preserving performance and simplicity of a single-deployable monolith.
This repository contains a lightweight message-bus abstraction with support for:
RmToolkit.Result package (already included in the library; no installation required).RmToolkit.MessageBus.Contracts NuGet package.This README explains how to register the bus, configure a saga state machine, create consumers, and add filters.
RmToolkit.MessageBus — . Provides the message bus, dependency injection extension methods (e.g., AddMessageBus), transports, saga orchestration, filters, and runtime pieces. This is the package you add to applications to register and configure the bus via DI.RmToolkit.MessageBus.Contracts — contains shared message contracts and bus interfaces (NuGet). Useful for external consumers/producers that only need contracts.RmToolkit.Result — included by the message-bus library to handle the Result pattern (no extra action required).If you need to add/install the contracts package (for consumers/producers outside the library):
dotnet add package RmToolkit.MessageBus.Contracts
Use AddMessageBus (from the main RmToolkit.MessageBus package) during startup/service registration. Example:
builder.Services.AddMessageBus(conf =>
{
conf.UsingInMemory(opts =>
{
opts.ChannelCapacity = 1_000;
});
conf.ConsumerAssembliesToRegister = [typeof(Program).Assembly];
conf.AddProducerFilter(typeof(CommonProduceFilter<>), b => b.ForMessage<IntegrationEvent1>());
conf.AddPublishFilter(typeof(CommonPublishFilter<>));
conf.AddSendFilter(typeof(CommonSendFilter<>));
conf.AddProducerFilter(typeof(ModuleB.ProducerFilter), b => b.ForMessage<IntegrationEvent2>());
conf.AddConsumerFilter(typeof(ModuleAInboxFilter<>), b => b.ForMessage<ModuleA.IntegrationEvent1>(order:2));
conf.AddConsumerFilter(typeof(ModuleBInboxFilter<>), b => b.ForConsumer<ModuleB.IntegrationEvent1Handler>(order: 10));
conf.AddConsumerFilter(typeof(ModuleCInboxFilter<>));
conf.AddSagaStateMachine<ThreeStepSagaOrchestrator, ThreeStepState>()
.UseInMemory();
//.UseMemoryCache()
//.UseDistributedCache()
//.UseEntityFramework<AppDbContext>();
});Notes
ConsumerAssembliesToRegister tells the bus where to scan for consumers, handlers and related types..UseMemoryCache(), .UseDistributedCache() or .UseEntityFramework<AppDbContext>().UsingInMemory is great for development and testing.Filters are pipeline hooks that run before/after producing/publishing/sending/consuming messages.
Example filter types:
public sealed class LoggingProduceFilter<T> : IPublishFilter<T> where T : class, IRequest
{
public Task Invoke(IProduceContext<T> context, IPipe<T> next)
{
return next.Invoke(context);
}
}
public sealed class LoggingCreateUserCommandFilter : ISendFilter<CreateUserCommand>
{
public Task Invoke(IProduceContext<CreateUserCommand> context, IPipe<CreateUserCommand> next)
{
return next.Invoke(context);
}
}
public sealed class GeneralProduceFilter<T> : IProduceFilter<T> where T : class, IRequest
{
public Task Invoke(IProduceContext<T> context, IPipe<T> next)
{
return next.Invoke(context);
}
}
public class LoggingConsumeFilter<T>(ILogger<LoggingConsumeFilter<T>> _logger) : IConsumeFilter<T> where T : IdempotentCommandMessage
{
public async Task Invoke(IConsumeContext<T> ctx, IPipe<T> next)
{
_logger.LogInformation("Entering {MessageType} Id={Id}", ctx.Envelope.MessageType.Name, ctx.Message.MessageId);
var sw = Stopwatch.StartNew();
try
{
await next.Invoke(ctx).ConfigureAwait(false);
sw.Stop();
_logger.LogInformation("Completed {MessageType} Id={Id} in {Elapsed}ms", ctx.Envelope.MessageType.Name, ctx.Message.MessageId, sw.Elapsed.TotalMilliseconds);
}
catch (Exception ex)
{
sw.Stop();
_logger.LogError(ex, "Handler failed for {MessageType} Id={Id} after {Elapsed}ms", ctx.Envelope.MessageType.Name, ctx.Message.MessageId, sw.Elapsed.TotalMilliseconds);
throw;
}
}
}Common uses
A consumer is any class implementing IConsumer<TMessage>:
internal class UserCreatedEventHandler(ILogger<UserCreatedEventHandler> logger) : IConsumer<UserCreated>
{
public async Task Handle(IConsumeContext<UserCreated> context)
{
await Task.Delay(500);
logger.LogInformation("{Module}: UserCreatedEvent processed. {UserId}", "ModuleA", context.Message.UserId);
}
}Compensation consumer example (undo pattern):
public class CompensateHandleStep1(ILogger<CompensateHandleStep1> logger) : IConsumer<Undo<DoStepA>>
{
public async Task Handle(IConsumeContext<Undo<DoStepA>> context)
{
await context.PublishAsync(new Compensated<DoStepA> { CorrelationId = context.Message.CorrelationId });
await Task.CompletedTask;
}
}The repository contains a three-step orchestrator (ThreeStepSagaOrchestrator) and a state (ThreeStepState).
public class ThreeStepState : ISaga
{
public Guid CorrelationId { get; init; }
public State CurrentState { get; set; } = State.None;
public string? ErrorMessage { get; set; }
}public record DoStepA : CommandMessage;
public record DoStepB : CommandMessage;
public record DoStepC : CommandMessage;
public record StartWorkflow : EventMessage;
public record StepACompleted : EventMessage;
public record StepBCompleted : EventMessage;
public record StepCCompleted : EventMessage;public class ThreeStepSagaOrchestrator : SagaStateMachine<ThreeStepState>
{
public ThreeStepSagaOrchestrator()
{
Event(() => StartWorkflowEvent, e =>
{
e.CorrelateById(msg => msg.CorrelationId);
e.InsertOnInitial(msg => new ThreeStepState { CorrelationId = msg.CorrelationId });
});
Event(() => StepACompleted, e => e.CorrelateById(msg => msg.CorrelationId));
Event(() => StepBCompleted, e => e.CorrelateById(msg => msg.CorrelationId));
Event(() => StepCCompleted, e => e.CorrelateById(msg => msg.CorrelationId));
Event(() => FaultDoStepA, e => e.CorrelateById(msg => msg.CorrelationId));
Event(() => FaultDoStepB, e => e.CorrelateById(msg => msg.CorrelationId));
Event(() => FaultDoStepC, e => e.CorrelateById(msg => msg.CorrelationId));
Initially
.When(StartWorkflowEvent)
.Then(ctx => Console.WriteLine($"[Saga] During {ctx.Instance.CurrentState}: Received {StartWorkflowEvent.Name} ({ctx.Message.CorrelationId}) "))
.Send(ctx => new DoStepA() { CorrelationId = ctx.Message.CorrelationId })
.TransitionTo(StepAInProgress);
During(StepAInProgress)
.When(StepACompleted)
.Then(ctx => Console.WriteLine($"[Saga] During {ctx.Instance.CurrentState}: Received {StepACompleted.Name}({ctx.Message.CorrelationId})"))
.Send(ctx => new DoStepB() { CorrelationId = ctx.Message.CorrelationId })
.TransitionTo(StepBInProgress)
.When(FaultDoStepA)
.Then(ctx => Console.WriteLine($"[Saga] During {ctx.Instance.CurrentState}: Received {FaultDoStepA.Name}({ctx.Message.CorrelationId})"))
.Then(ctx => { ctx.Instance.ErrorMessage = ctx.Message.Exception?.Message; })
.TransitionTo(Failed);
During(StepBInProgress)
.When(StepBCompleted)
.Then(ctx => Console.WriteLine($"[Saga] During {ctx.Instance.CurrentState}: Received {StepBCompleted.Name}({ctx.Message.CorrelationId})"))
.Send(ctx => new DoStepC() { CorrelationId = ctx.Message.CorrelationId })
.TransitionTo(StepCInProgress)
.When(FaultDoStepB)
.Then(ctx => Console.WriteLine($"[Saga] During {ctx.Instance.CurrentState}: Received {FaultDoStepB.Name}({ctx.Message.CorrelationId})"))
.Then(ctx => { ctx.Instance.ErrorMessage = ctx.Message.Exception?.Message; })
.Send(ctx => new Undo<DoStepA>() { CorrelationId = ctx.Message.CorrelationId })
.TransitionTo(Failed);
During(StepCInProgress)
.When(StepCCompleted)
.Then(ctx => Console.WriteLine($"[Saga] During {ctx.Instance.CurrentState}: Received {StepCCompleted.Name}({ctx.Message.CorrelationId}) [FINAL]"))
.TransitionTo(Final)
.When(FaultDoStepC)
.Then(ctx => Console.WriteLine($"[Saga] During {ctx.Instance.CurrentState}: Received{FaultDoStepC.Name}({ctx.Message.CorrelationId})"))
.Then(ctx => { ctx.Instance.ErrorMessage = ctx.Message.Exception?.Message; })
.Send(ctx => new Undo<DoStepB>() { CorrelationId = ctx.Message.CorrelationId })
.Send(ctx => new Undo<DoStepA>() { CorrelationId = ctx.Message.CorrelationId })
.TransitionTo(Failed);
}
}Fault<T> messages when something fails (framework may populate Fault envelope).Fault<T> events and may:
ErrorMessage).Undo<T> compensation messages to roll back prior steps.Failed state for manual remediation.Example Fault consumer:
internal sealed class FaultCreateUserCommandHandler : IConsumer<Fault<CreateUserCommand>>
{
public Task Handle(IConsumeContext<Fault<CreateUserCommand>> context)
{
_ = context.Message.Message?.UserId;
_ = context.Message.Message?.Name;
return Task.CompletedTask;
}
}IdempotentCommandMessage for commands).EntityFramework, DistributedCache) depending on your infra.CorrelationId and the saga Event is configured with CorrelateById.ConsumerAssembliesToRegister contains assemblies where consumers live.AddProducerFilter, AddConsumerFilter, etc) before the bus is started.ChannelCapacity and transport settings.Program.cs / Startup.IConsumer<T>).StartWorkflow { CorrelationId = Guid.NewGuid() }.Use IBus for sending commands and making request/response calls.
Send a command (fire-and-forget):
// inject IBus (e.g., via constructor DI)
await bus.SendAsync(new CreateUserCommand(1, "ARMIN"));Request/response (await reply):
// Request a response for GetUserQuery and expect a UserDto
var resp = await bus.RequestAsync<GetUserQuery, UserDto>(new GetUserQuery(1));Notes
SendAsync is used for sending commands to a specific handler (no immediate response expected).RequestAsync<TRequest, TResponse> sends a request and awaits a typed response (useful for queries).ConsumerAssembliesToRegister.Define messages as records that inherit from the message base types provided by the toolkit.
Use IdempotentCommandMessage for commands that should be idempotent. Queries can use QueryMessage<TResponse>
to express the expected response type. Events typically inherit from EventMessage.
Examples:
// DTO
public record UserDto(int UserId, string Name);
// Commands
public record CreateUserCommand(int UserId, string Name) : IdempotentCommandMessage;
// Generic idempotent command with response (if needed)
public abstract record IdempotentCommandMessage<TResponse> : CommandMessage<TResponse> where TResponse : class;
// Queries (expect a response type)
public sealed record GetUserQuery(int UserId) : QueryMessage<UserDto>;
// Events
public sealed record UserCreated(int UserId) : EventMessage;
// Base idempotent command alias
public abstract record IdempotentCommandMessage : CommandMessage;Notes
IdempotentCommandMessage for commands to help ensure safe retries and deduplication.QueryMessage<TResponse> for request/response style queries so the bus can enforce typed responses.UserDto) small and serializable.CorrelationId is set where needed for tracing and saga correlation.This project is offered under the MIT License.