A lightweight .NET 8 message bus library built on RabbitMQ for ASP.NET Core applications. Provides simple publish-subscribe patterns, typed consumers, broadcasting, automatic retries, and JWT integration. Visit GitHub for full documentation and examples.
$ dotnet add package SimplyWorks.BusA lightweight .NET 8 message bus library built on top of RabbitMQ, designed to simplify publish-subscribe patterns and event-driven architectures in ASP.NET Core applications.
IConsume<T>IListen<T> for cross-application communicationdotnet add package SimplyWorks.Bus
Add RabbitMQ connection string to your appsettings.json:
{
"ConnectionStrings": {
"RabbitMQ": "amqp://guest:guest@localhost:5672/"
}
}
In your Startup.cs or Program.cs:
// Basic bus setup
services.AddBus(config =>
{
config.ApplicationName = "MyApp";
// Optional JWT configuration
config.Token.Key = Configuration["Token:Key"];
config.Token.Issuer = Configuration["Token:Issuer"];
config.Token.Audience = Configuration["Token:Audience"];
});
// For publishing messages
services.AddBusPublish();
// For consuming messages
services.AddBusConsume();
// For listening to broadcasts only
services.AddBusListen();
public class OrderController : ControllerBase
{
private readonly IPublish _publisher;
private readonly IBroadcast _broadcaster;
public OrderController(IPublish publisher, IBroadcast broadcaster)
{
_publisher = publisher;
_broadcaster = broadcaster;
}
[HttpPost]
public async Task<IActionResult> CreateOrder(CreateOrderRequest request)
{
var order = new OrderCreated
{
OrderId = Guid.NewGuid(),
CustomerId = request.CustomerId,
Amount = request.Amount
};
// Publish to specific consumers
await _publisher.Publish(order);
// Broadcast to all listening applications
await _broadcaster.Broadcast(new OrderNotification
{
Message = "New order created"
});
return Ok();
}
}
Create typed consumers by implementing IConsume<T>:
public class OrderCreatedConsumer : IConsume<OrderCreated>
{
private readonly ILogger<OrderCreatedConsumer> _logger;
private readonly IOrderService _orderService;
public OrderCreatedConsumer(ILogger<OrderCreatedConsumer> logger, IOrderService orderService)
{
_logger = logger;
_orderService = orderService;
}
public async Task Process(OrderCreated message)
{
_logger.LogInformation("Processing order {OrderId}", message.OrderId);
await _orderService.ProcessOrder(message);
}
}
Create broadcast listeners by implementing IListen<T>:
public class OrderNotificationListener : IListen<OrderNotification>
{
private readonly INotificationService _notificationService;
public OrderNotificationListener(INotificationService notificationService)
{
_notificationService = notificationService;
}
public async Task Process(OrderNotification message)
{
await _notificationService.SendNotification(message.Message);
}
}
For dynamic message types, implement IConsume:
public class GenericConsumer : IConsume
{
public async Task<IEnumerable<string>> GetMessageTypeNames()
{
return new[] { "DynamicMessage1", "DynamicMessage2" };
}
public async Task Process(string messageTypeName, string message)
{
Console.WriteLine($"Received {messageTypeName}: {message}");
}
}
services.AddBus(config =>
{
config.ApplicationName = "MyApplication"; // Required: Your application name
config.DefaultQueuePrefetch = 1; // Messages to prefetch (default: 1)
config.DefaultRetryCount = 3; // Retry attempts (default: 3)
config.DefaultRetryAfter = 30000; // Retry delay in ms (default: 30s)
config.HeartBeatTimeOut = 60; // RabbitMQ heartbeat timeout
config.ListenRetryCount = 3; // Listener retry attempts
config.ListenRetryAfter = 30; // Listener retry delay
// JWT Token configuration (optional)
config.Token.Key = "your-secret-key";
config.Token.Issuer = "your-issuer";
config.Token.Audience = "your-audience";
});
Customize queue behavior for specific message types:
services.AddBus(config =>
{
config.Options["MyMessageType"] = new QueueOptions
{
RetryCount = 5,
RetryAfter = 60000,
QueuePrefetch = 10
};
});
For unit testing, use the mock publisher:
// In your test setup
services.AddBusPublishMock();
// The mock publisher will log publish calls instead of sending to RabbitMQ
Consumers can implement error handling methods:
public class OrderConsumer : IConsume<OrderCreated>
{
public async Task Process(OrderCreated message)
{
// Main processing logic
}
public async Task OnFail(Exception exception, string rawMessage)
{
// Handle processing failures
// This method is optional
}
}
Access request context and user information in consumers:
public class AuthenticatedConsumer : IConsume<SecureMessage>
{
private readonly RequestContext _requestContext;
public AuthenticatedConsumer(RequestContext requestContext)
{
_requestContext = requestContext;
}
public async Task Process(SecureMessage message)
{
var user = _requestContext.User; // Access the authenticated user
var correlationId = _requestContext.CorrelationId; // Trace requests
}
}
Refresh consumers dynamically at runtime:
await _broadcaster.RefreshConsumers();
SW.Bus uses RabbitMQ exchanges and queues with the following pattern:
This project is licensed under the MIT License - see the LICENSE file for details.
For issues and questions:
SW.Bus.SampleWeb for usage examples