Inbox/outbox messaging pattern for modular applications.
$ dotnet add package Fluens.MessagingInbox/outbox messaging pattern for modular applications.
dotnet add package Fluens.Messaging
// Host setup
fluensBuilder.AddMessaging();
// Module setup (inside IModuleSetup.RegisterServices)
// Consumers are auto-discovered from the module assembly
module.AddMessaging<OrdersDbContext>(msg =>
{
msg.Publishes<OrderCreatedMessage>();
});
Register the outbox interceptor and messaging entity configurations in your module DbContext:
// In AddDbContext registration
services.AddDbContext<OrdersDbContext>((sp, options) =>
{
options.AddOutboxInterceptor(sp); // Enables atomic outbox writes
});
// In DbContext.OnModelCreating
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.HasDefaultSchema("orders");
modelBuilder.ApplyConfigurationsFromAssembly(typeof(OrdersDbContext).Assembly);
modelBuilder.ApplyMessagingConfigurations("orders"); // Registers OutboxMessage, InboxMessage, DeadLetter
}
public record OrderCreatedMessage(Guid OrderId) : IMessage;
// Publish is synchronous — message is stored in outbox during SaveChangesAsync
publisher.Publish(new OrderCreatedMessage(order.ID));
await dbContext.SaveChangesAsync(); // Outbox interceptor writes message
public class OrderCreatedConsumer : IMessageConsumer<OrderCreatedMessage>
{
public async Task ConsumeAsync(
OrderCreatedMessage message,
MessageContext context,
CancellationToken ct)
{
// Process the message
}
}
Architecture: Publish (sync, in-memory) -> Outbox (EF SaveChanges interceptor) -> Transport (background worker) -> Inbox (per-module processor) -> Consumer. Failed messages are moved to a dead_letters table.
MessagingOptions is bound from the "Fluens:Messaging" configuration section:
| Property | Default | Description |
|---|---|---|
OutboxBatchSize | 100 | Max outbox messages fetched per transport cycle |
InboxBatchSize | 100 | Max inbox messages fetched per processing cycle |
InboxMaxRetries | 3 | Max retries before a message is moved to dead letters |
InboxProcessorIntervalSeconds | 30 | Polling interval in seconds for the inbox processor |
TransportProcessorIntervalSeconds | 60 | Polling interval in seconds for the outbox transport processor |
{
"Fluens": {
"Messaging": {
"OutboxBatchSize": 100,
"InboxBatchSize": 100,
"InboxMaxRetries": 3,
"InboxProcessorIntervalSeconds": 30,
"TransportProcessorIntervalSeconds": 60
}
}
}
Both MessageTransportProcessor and InboxProcessor support cooperative cancellation:
OperationCanceledException during shutdown is not treated as a processing failure — no retry count increment, no dead letter movesThis project is licensed under the MIT License.