EasyRabbitFlow: high-performance RabbitMQ client for .NET. Fluent configuration, optional topology (queue/exchange/dead-letter) generation, reflection-free consumers, configurable retry (exponential/backoff), custom dead-letter routing, temporary batch processing, queue state helpers. Targets .NET Standard 2.1.
$ dotnet add package EasyRabbitFlow⚠️ Breaking Changes (v5.0.0)
IRabbitFlowConsumer<TEvent>.HandleAsyncsignature changedA new
RabbitFlowMessageContextparameter was added to provide AMQP metadata (MessageId, CorrelationId, headers, delivery info) directly to each consumer.- Task HandleAsync(TEvent message, CancellationToken cancellationToken); + Task HandleAsync(TEvent message, RabbitFlowMessageContext context, CancellationToken cancellationToken);How to migrate: Add the
RabbitFlowMessageContext contextparameter to everyHandleAsyncimplementation. If you don't need the context, simply ignore it:public Task HandleAsync(MyEvent message, RabbitFlowMessageContext context, CancellationToken ct) { // context is available but not required to use return ProcessAsync(message, ct); }Other changes in v5.0.0:
PublishAsync/PublishBatchAsyncnow accept an optionalcorrelationIdparameter.PublishAsyncreturnsPublishResult(instead ofbool). Useresult.Successinstead of the raw return value.PublishBatchAsyncis new — publish multiple messages atomically (Transactional) or individually confirmed ().
ConfirmChannelMode was removed from PublisherOptions — it is now a per-call parameter on PublishBatchAsync (defaults to Transactional). Single-message publishes always use publisher confirms.PublisherOptions.IdempotencyEnabled is new — auto-assigns a unique MessageId to every published message.| Feature | EasyRabbitFlow |
|---|---|
| Fluent, strongly-typed configuration | ✅ |
| Automatic queue / exchange / dead-letter generation | ✅ |
| Reflection-free per-message processing | ✅ |
| Configurable retry with exponential backoff | ✅ |
| Temporary batch processing with auto-cleanup | ✅ |
| Queue state & purge utilities | ✅ |
| Full DI integration (scoped/transient/singleton) | ✅ |
| Publisher confirms (single) & transactional batch | ✅ |
Built-in idempotency support (auto MessageId) | ✅ |
| CorrelationId support (end-to-end tracing) | ✅ |
RabbitFlowMessageContext per-message metadata | ✅ |
Rich PublishResult / BatchPublishResult types | ✅ |
| Thread-safe channel operations | ✅ |
| .NET Standard 2.1 (works with .NET 6, 7, 8, 9+) | ✅ |
┌─────────────────────────────────────────────────────────────────────┐
│ Your Application │
├────────────┬──────────────┬──────────────┬──────────────────────────┤
│ Publisher │ Consumers │ State / │ Temporary Batch │
│ │ (Hosted) │ Purger │ Processing │
├────────────┴──────────────┴──────────────┴──────────────────────────┤
│ EasyRabbitFlow Library │
│ │
│ ┌──────────────┐ ┌────────────────┐ ┌─────────────────────────┐ │
│ │ IRabbitFlow │ │ ConsumerHosted │ │ IRabbitFlowTemporary │ │
│ │ Publisher │ │ Service │ │ (batch processing) │ │
│ └──────┬───────┘ └───────┬────────┘ └────────────┬────────────┘ │
│ │ │ │ │
│ ┌──────┴──────────────────┴─────────────────────────┴────────┐ │
│ │ RabbitMQ.Client (ConnectionFactory) │ │
│ └────────────────────────────┬────────────────────────────────┘ │
└───────────────────────────────┼─────────────────────────────────────┘
│
┌───────────┴───────────┐
│ RabbitMQ Broker │
│ │
│ ┌─────────────────┐ │
│ │ Exchanges │ │
│ └────────┬────────┘ │
│ │ │
│ ┌────────▼────────┐ │
│ │ Queues │ │
│ └────────┬────────┘ │
│ │ │
│ ┌────────▼────────┐ │
│ │ Dead-Letter │ │
│ │ Queues │ │
│ └─────────────────┘ │
└───────────────────────┘
Publisher RabbitMQ Consumer
──────── ──────── ────────
PublishAsync() ──────► Exchange ──routing──► Queue ──────► HandleAsync()
│ │
│ (on failure)
│ │
│ Retry Policy
│ (exponential
│ backoff)
│ │
│ (exhausted)
│ │
└──► Dead-Letter Queue
dotnet add package EasyRabbitFlow1. Define your event model:
public class OrderCreatedEvent
{
public string OrderId { get; set; } = string.Empty;
public decimal Total { get; set; }
public DateTime CreatedAt { get; set; }
}2. Implement a consumer:
public class OrderConsumer : IRabbitFlowConsumer<OrderCreatedEvent>
{
private readonly ILogger<OrderConsumer> _logger;
public OrderConsumer(ILogger<OrderConsumer> logger)
{
_logger = logger;
}
public async Task HandleAsync(OrderCreatedEvent message, RabbitFlowMessageContext context, CancellationToken cancellationToken)
{
_logger.LogInformation("Processing order {OrderId}, total: {Total}, correlationId: {CorrelationId}",
message.OrderId, message.Total, context.CorrelationId);
// Your business logic here: save to DB, send email, call API, etc.
await Task.CompletedTask;
}
}3. Configure in Program.cs:
builder.Services
.AddRabbitFlow(cfg =>
{
cfg.ConfigureHost(host =>
{
host.Host = "localhost";
host.Port = 5672;
host.Username = "guest";
host.Password = "guest";
});
cfg.AddConsumer<OrderConsumer>("orders-queue", c =>
{
c.AutoGenerate = true;
c.PrefetchCount = 10;
c.Timeout = TimeSpan.FromSeconds(30);
c.ConfigureRetryPolicy(r =>
{
r.MaxRetryCount = 3;
r.ExponentialBackoff = true;
r.ExponentialBackoffFactor = 2;
});
});
})
.UseRabbitFlowConsumers(); // Starts background consumer automatically4. Publish from an endpoint or service:
app.MapPost("/orders", async (OrderCreatedEvent order, IRabbitFlowPublisher publisher) =>
{
var result = await publisher.PublishAsync(order, "orders-queue");
return result.Success
? Results.Ok(new { result.Destination, result.MessageId, result.TimestampUtc })
: Results.Problem(result.Error?.Message);
});That's it — four steps from zero to a working publish/consume pipeline.
All configuration is done through the AddRabbitFlow extension method:
builder.Services.AddRabbitFlow(cfg =>
{
cfg.ConfigureHost(...); // Connection settings
cfg.ConfigureJsonSerializerOptions(...); // Serialization (optional)
cfg.ConfigurePublisher(...); // Publisher behavior (optional)
cfg.AddConsumer<T>(...); // Register consumers
});cfg.ConfigureHost(host =>
{
host.Host = "rabbitmq.example.com";
host.Port = 5672;
host.Username = "admin";
host.Password = "secret";
host.VirtualHost = "/";
host.AutomaticRecoveryEnabled = true;
host.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);
host.RequestedHeartbeat = TimeSpan.FromSeconds(30);
});| Property | Type | Default | Description |
|---|---|---|---|
Host | string | "localhost" | RabbitMQ server hostname or IP |
Port | int | 5672 | AMQP port |
Username | string | "guest" | Authentication username |
Password | string | "guest" | Authentication password |
VirtualHost | string | "/" | RabbitMQ virtual host |
AutomaticRecoveryEnabled | bool | true | Auto-reconnect after failures |
TopologyRecoveryEnabled | bool | true | Auto-recover queues/exchanges after reconnect |
NetworkRecoveryInterval | TimeSpan | 10s | Wait time between recovery attempts |
RequestedHeartbeat | TimeSpan | 30s | Heartbeat interval for connection health |
Optionally customize how messages are serialized/deserialized:
cfg.ConfigureJsonSerializerOptions(json =>
{
json.PropertyNameCaseInsensitive = true;
json.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
json.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull;
});If not configured, the default JsonSerializerOptions are used.
cfg.ConfigurePublisher(pub =>
{
pub.DisposePublisherConnection = false; // Keep connection alive (default)
pub.IdempotencyEnabled = true; // Auto-assign MessageId to every message
});| Property | Type | Default | Description |
|---|---|---|---|
DisposePublisherConnection | bool | false | Dispose connection after each publish |
IdempotencyEnabled | bool | false | Auto-assign a unique MessageId to every published message for deduplication |
Every consumer implements IRabbitFlowConsumer<TEvent>:
public interface IRabbitFlowConsumer<TEvent>
{
Task HandleAsync(TEvent message, RabbitFlowMessageContext context, CancellationToken cancellationToken);
}The RabbitFlowMessageContext parameter provides AMQP metadata (MessageId, CorrelationId, headers, etc.) for the message being processed. See Message Context for details.
Consumers support full dependency injection — you can inject any service (scoped, transient, singleton):
public class EmailConsumer : IRabbitFlowConsumer<NotificationEvent>
{
private readonly IEmailService _emailService;
private readonly ILogger<EmailConsumer> _logger;
public EmailConsumer(IEmailService emailService, ILogger<EmailConsumer> logger)
{
_emailService = emailService;
_logger = logger;
}
public async Task HandleAsync(NotificationEvent message, RabbitFlowMessageContext context, CancellationToken cancellationToken)
{
_logger.LogInformation("Sending email to {Recipient}, MessageId={MessageId}", message.Email, context.MessageId);
await _emailService.SendAsync(message.Email, message.Subject, message.Body, cancellationToken);
}
}cfg.AddConsumer<EmailConsumer>("email-queue", c =>
{
c.Enable = true; // Enable/disable this consumer
c.PrefetchCount = 5; // Messages fetched in parallel
c.Timeout = TimeSpan.FromSeconds(30); // Per-message processing timeout
c.AutoAckOnError = false; // Don't ack failed messages
c.ExtendDeadletterMessage = true; // Add error details to DLQ messages
c.ConsumerId = "email-consumer-1"; // Custom connection identifier
});| Property | Type | Default | Description |
|---|---|---|---|
Enable | bool | true | Whether this consumer is active |
QueueName | string | (set in constructor) | Queue to consume from |
ConsumerId | string? | null | Custom connection ID (falls back to queue name) |
PrefetchCount | ushort | 1 | How many messages to prefetch |
Timeout | TimeSpan | 30s | Processing timeout per message |
AutoAckOnError | bool | false | Auto-acknowledge on error (message lost) |
AutoGenerate | bool | false | Auto-create queue/exchange/DLQ |
ExtendDeadletterMessage | bool | false | Enrich dead-letter messages with error details |
When AutoGenerate = true, EasyRabbitFlow creates queues, exchanges, and dead-letter queues automatically:
cfg.AddConsumer<OrderConsumer>("orders-queue", c =>
{
c.AutoGenerate = true;
c.ConfigureAutoGenerate(ag =>
{
ag.ExchangeName = "orders-exchange"; // Custom exchange name
ag.ExchangeType = ExchangeType.Fanout; // Direct | Fanout | Topic | Headers
ag.RoutingKey = "orders-routing-key"; // Routing key for binding
ag.GenerateExchange = true; // Create the exchange
ag.GenerateDeadletterQueue = true; // Create a dead-letter queue
ag.DurableQueue = true; // Queue survives broker restart
ag.DurableExchange = true; // Exchange survives broker restart
ag.ExclusiveQueue = false; // Not limited to one connection
ag.AutoDeleteQueue = false; // Don't delete when last consumer disconnects
});
});Generated topology when AutoGenerate = true:
┌──────────────────────┐
│ orders-exchange │
│ (fanout) │
└──────────┬───────────┘
│ routing-key
┌──────────▼───────────┐
│ orders-queue │──── args: x-dead-letter-exchange
│ (durable) │ x-dead-letter-routing-key
└──────────┬───────────┘
│ (on failure)
┌────────────────▼─────────────────┐
│ orders-queue-deadletter-exchange │
│ (direct) │
└────────────────┬─────────────────┘
│
┌────────────────▼─────────────────┐
│ orders-queue-deadletter │
└──────────────────────────────────┘
| Property | Type | Default | Description |
|---|---|---|---|
GenerateExchange | bool | true | Create the exchange |
GenerateDeadletterQueue | bool | true | Create dead-letter queue and exchange |
ExchangeType | ExchangeType | Direct | Direct, Fanout, Topic, or Headers |
ExchangeName | string? | null | Custom name (defaults to {queue}-exchange) |
RoutingKey | string? | null | Custom routing key (defaults to {queue}-routing-key) |
DurableExchange | bool | true | Exchange survives broker restart |
DurableQueue | bool | true | Queue survives broker restart |
ExclusiveQueue | bool | false | Queue limited to declaring connection |
AutoDeleteQueue | bool | false | Delete queue when last consumer disconnects |
Args | IDictionary? | null | Additional RabbitMQ arguments |
Configure how failed messages are retried:
c.ConfigureRetryPolicy(r =>
{
r.MaxRetryCount = 5; // Number of attempts
r.RetryInterval = 1000; // Base delay in ms
r.ExponentialBackoff = true; // Enable exponential backoff
r.ExponentialBackoffFactor = 2; // Multiply delay by this factor
r.MaxRetryDelay = 30_000; // Cap delay at 30 seconds
});Example: retry timeline with exponential backoff (factor=2, base=1000ms):
Attempt 1 → fail → wait 1000ms
Attempt 2 → fail → wait 2000ms
Attempt 3 → fail → wait 4000ms
Attempt 4 → fail → wait 8000ms
Attempt 5 → fail → sent to dead-letter queue
| Property | Type | Default | Description |
|---|---|---|---|
MaxRetryCount | int | 1 | Total retry attempts |
RetryInterval | int | 1000 | Base delay between retries (ms) |
ExponentialBackoff | bool | false | Enable exponential backoff |
ExponentialBackoffFactor | int | 1 | Multiplier for exponential growth |
MaxRetryDelay | int | 60000 | Upper bound for delay (ms) |
Route failed messages to a specific dead-letter queue:
c.ConfigureCustomDeadletter(dl =>
{
dl.DeadletterQueueName = "custom-errors-queue";
});When ExtendDeadletterMessage = true, the dead-letter message includes full error details:
{
"dateUtc": "2026-01-15T10:30:00Z",
"messageType": "OrderCreatedEvent",
"messageData": { "orderId": "ORD-123", "total": 99.99 },
"exceptionType": "TimeoutException",
"errorMessage": "The operation was canceled.",
"stackTrace": "...",
"source": "OrderService",
"innerExceptions": []
}Inject IRabbitFlowPublisher to publish messages. All single-message publishes use publisher confirms — the await only completes after the broker confirms receipt.
Returns a PublishResult with Success, Destination, RoutingKey, MessageId, TimestampUtc, and Error:
public class OrderService
{
private readonly IRabbitFlowPublisher _publisher;
public OrderService(IRabbitFlowPublisher publisher)
{
_publisher = publisher;
}
// Publish directly to a queue
public async Task CreateOrderAsync(OrderCreatedEvent order)
{
var result = await _publisher.PublishAsync(order, queueName: "orders-queue");
if (!result.Success)
throw new Exception($"Publish failed: {result.Error?.Message}");
}
// Publish to an exchange with routing key and correlation ID
public async Task BroadcastNotificationAsync(NotificationEvent notification, string correlationId)
{
var result = await _publisher.PublishAsync(
notification,
exchangeName: "notifications",
routingKey: "user.created",
correlationId: correlationId);
Console.WriteLine($"Published to {result.Destination} at {result.TimestampUtc}");
}
}Method signatures:
// Publish to exchange (with routing key) — always uses publisher confirms
Task<PublishResult> PublishAsync<TEvent>(TEvent message, string exchangeName, string routingKey,
string? correlationId = null, string publisherId = "",
JsonSerializerOptions? jsonOptions = null,
CancellationToken cancellationToken = default) where TEvent : class;
// Publish directly to queue — always uses publisher confirms
Task<PublishResult> PublishAsync<TEvent>(TEvent message, string queueName,
string? correlationId = null, string publisherId = "",
JsonSerializerOptions? jsonOptions = null,
CancellationToken cancellationToken = default) where TEvent : class;PublishResult properties:
| Property | Type | Description |
|---|---|---|
Success | bool | Whether the broker confirmed the message |
MessageId | string? | Unique ID (when IdempotencyEnabled = true) |
Destination | string | Target exchange or queue name |
RoutingKey | string | Routing key used (empty for queue publishes) |
TimestampUtc | DateTime | When the publish was executed |
Error | Exception? | The exception if Success is false |
Use PublishBatchAsync to publish multiple messages in a single operation. The channelMode parameter controls atomicity:
Transactional (default): All-or-nothing — if any message fails, the entire batch is rolled back.Confirm: Each message is individually confirmed — a mid-batch failure does not roll back previous messages.// Atomic batch (Transactional — default)
var result = await publisher.PublishBatchAsync(
orders,
exchangeName: "orders-exchange",
routingKey: "new-order");
Console.WriteLine($"Batch: {result.MessageCount} messages, success={result.Success}");
// Non-atomic batch (Confirm mode — higher throughput)
var result = await publisher.PublishBatchAsync(
orders,
queueName: "orders-queue",
channelMode: ChannelMode.Confirm);Method signatures:
// Batch to exchange
Task<BatchPublishResult> PublishBatchAsync<TEvent>(IReadOnlyList<TEvent> messages,
string exchangeName, string routingKey,
ChannelMode channelMode = ChannelMode.Transactional,
string? correlationId = null, string publisherId = "",
JsonSerializerOptions? jsonOptions = null,
CancellationToken cancellationToken = default) where TEvent : class;
// Batch to queue
Task<BatchPublishResult> PublishBatchAsync<TEvent>(IReadOnlyList<TEvent> messages,
string queueName,
ChannelMode channelMode = ChannelMode.Transactional,
string? correlationId = null, string publisherId = "",
JsonSerializerOptions? jsonOptions = null,
CancellationToken cancellationToken = default) where TEvent : class;BatchPublishResult properties:
| Property | Type | Description |
|---|---|---|
Success | bool | Whether all messages were published |
MessageCount | int | Number of messages in the batch |
MessageIds | IReadOnlyList<string> | IDs per message (when IdempotencyEnabled = true) |
Destination | string | Target exchange or queue name |
RoutingKey | string | Routing key used |
ChannelMode | ChannelMode | Mode used (Transactional or Confirm) |
TimestampUtc | DateTime | When the batch was executed |
Error | Exception? | The exception if Success is false |
Enable automatic MessageId generation so consumers can deduplicate:
cfg.ConfigurePublisher(pub => pub.IdempotencyEnabled = true);When enabled, every published message (single or batch) gets a unique MessageId set in BasicProperties.MessageId. The ID is also returned in PublishResult.MessageId or BatchPublishResult.MessageIds.
Pass a correlationId when publishing to trace related messages end-to-end:
// Single message with correlation
await publisher.PublishAsync(order, "orders-queue", correlationId: "req-abc-123");
// Exchange publish with correlation
await publisher.PublishAsync(event, "notifications", routingKey: "new", correlationId: requestId);
// Batch — same correlationId shared across all messages
await publisher.PublishBatchAsync(events, "orders-queue", correlationId: batchId);The correlationId is set on BasicProperties.CorrelationId and received by consumers via RabbitFlowMessageContext.CorrelationId.
Every consumer receives a RabbitFlowMessageContext as a parameter of HandleAsync, providing access to AMQP metadata of the message being processed:
public class OrderConsumer : IRabbitFlowConsumer<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent message, RabbitFlowMessageContext context, CancellationToken ct)
{
// Idempotency check using MessageId
if (context.MessageId != null && await _db.ExistsAsync(context.MessageId))
{
_logger.LogWarning("Duplicate message {Id}, skipping", context.MessageId);
return;
}
// Trace correlation across services
_logger.LogInformation("Processing order. CorrelationId={CorrelationId}", context.CorrelationId);
// Check if this is a redelivery
if (context.Redelivered)
{
_logger.LogWarning("Redelivered message, DeliveryTag={Tag}", context.DeliveryTag);
}
await ProcessOrderAsync(message, ct);
}
}RabbitFlowMessageContext properties:
| Property | Type | Description |
|---|---|---|
MessageId | string? | Unique ID from BasicProperties.MessageId (set when IdempotencyEnabled = true) |
CorrelationId | string? | Correlation ID from BasicProperties.CorrelationId (set via correlationId parameter) |
Exchange | string? | Exchange that delivered the message (empty for direct queue publishes) |
RoutingKey | string? | Routing key used when the message was published |
Headers | IDictionary? | Custom AMQP headers from BasicProperties.Headers |
DeliveryTag | ulong | Broker-assigned delivery tag for this message |
Redelivered | bool | Whether the broker redelivered this message |
Note:
RabbitFlowMessageContextis immutable — all properties are read-only and populated automatically fromBasicDeliverEventArgsbeforeHandleAsyncis invoked.
Use IRabbitFlowState to query queue metadata at runtime:
public class HealthCheckService
{
private readonly IRabbitFlowState _state;
public HealthCheckService(IRabbitFlowState state)
{
_state = state;
}
public async Task<object> GetQueueHealthAsync(string queueName)
{
return new
{
IsEmpty = await _state.IsEmptyQueueAsync(queueName),
MessageCount = await _state.GetQueueLengthAsync(queueName),
ConsumerCount = await _state.GetConsumersCountAsync(queueName),
HasConsumers = await _state.HasConsumersAsync(queueName)
};
}
}| Method | Returns | Description |
|---|---|---|
IsEmptyQueueAsync(queueName) | Task<bool> | Is the queue empty? |
GetQueueLengthAsync(queueName) | Task<uint> | Number of messages in the queue |
GetConsumersCountAsync(queueName) | Task<uint> | Number of active consumers |
HasConsumersAsync(queueName) | Task<bool> | Does the queue have any consumers? |
Use IRabbitFlowPurger to remove all messages from queues:
// Purge a single queue
await purger.PurgeMessagesAsync("orders-queue");
// Purge multiple queues at once
await purger.PurgeMessagesAsync(new[] { "orders-queue", "emails-queue", "notifications-queue" });IRabbitFlowTemporary is designed for fire-and-forget batch workflows — process a collection of messages through RabbitMQ with automatic queue creation and cleanup.
Ideal for:
public class InvoiceService
{
private readonly IRabbitFlowTemporary _temporary;
public InvoiceService(IRabbitFlowTemporary temporary)
{
_temporary = temporary;
}
public async Task ProcessInvoiceBatchAsync(List<Invoice> invoices)
{
int processed = await _temporary.RunAsync(
invoices,
onMessageReceived: async (invoice, ct) =>
{
Console.WriteLine($"Processing invoice {invoice.Id}...");
await Task.Delay(500, ct); // Simulate work
},
onCompleted: (total, errors) =>
{
Console.WriteLine($"Done! Processed: {total}, Errors: {errors}");
},
options: new RunTemporaryOptions
{
PrefetchCount = 10,
Timeout = TimeSpan.FromSeconds(30),
CorrelationId = Guid.NewGuid().ToString()
});
}
}int processed = await _temporary.RunAsync<Invoice, InvoiceResult>(
invoices,
onMessageReceived: async (invoice, ct) =>
{
var result = await ProcessInvoiceAsync(invoice, ct);
return new InvoiceResult { InvoiceId = invoice.Id, Status = "Completed" };
},
onCompletedAsync: async (count, results) =>
{
// results is a ConcurrentQueue<InvoiceResult> with all collected results
Console.WriteLine($"Processed {count} invoices, collected {results.Count} results");
await SaveResultsAsync(results);
}); Your Code RabbitMQ (Temporary) Handler
───────── ────────────────── ───────
RunAsync(messages) ─────► Create temp queue ───────────► onMessageReceived()
│ Publish all msgs │
│ │ │
│ Consume & process ◄──────────────────┘
│ │
│ All processed?
│ │ yes
◄──────────────────── Delete temp queue
│ Call onCompleted()
│
return count
| Option | Type | Default | Description |
|---|---|---|---|
PrefetchCount | ushort | 1 | Parallel message processing (>0) |
Timeout | TimeSpan? | null | Per-message timeout |
QueuePrefixName | string? | null | Custom prefix for the temp queue name |
CorrelationId | string? | Guid | Correlation ID for tracing/logging |
By default, timeouts and RabbitFlowTransientException trigger retries. Throw RabbitFlowTransientException from your consumer to signal a retryable error:
using EasyRabbitFlow.Exceptions;
public class PaymentConsumer : IRabbitFlowConsumer<PaymentEvent>
{
public async Task HandleAsync(PaymentEvent message, RabbitFlowMessageContext context, CancellationToken cancellationToken)
{
try
{
await ProcessPaymentAsync(message);
}
catch (HttpRequestException ex) when (ex.StatusCode == HttpStatusCode.ServiceUnavailable)
{
// This will trigger the retry policy
throw new RabbitFlowTransientException("Payment gateway temporarily unavailable", ex);
}
// Any other exception → no retry, sent to dead-letter queue
}
}Exception types:
| Exception | Purpose |
|---|---|
RabbitFlowTransientException | Signals a retryable error — triggers retry policy |
RabbitFlowException | General library error |
RabbitFlowOverRetriesException | Thrown internally when all retry attempts are exhausted |
| Interface | Lifetime | Description |
|---|---|---|
IRabbitFlowPublisher | Singleton | Publish messages to exchanges or queues |
IRabbitFlowState | Singleton | Query queue metadata |
IRabbitFlowTemporary | Singleton | Temporary batch processing |
IRabbitFlowPurger | Singleton | Purge queue messages |
ConsumerHostedService | Hosted | Background consumer lifecycle (via UseRabbitFlowConsumers) |
// Register all EasyRabbitFlow services
IServiceCollection AddRabbitFlow(this IServiceCollection services,
Action<RabbitFlowConfigurator>? configurator = null);
// Start background consumer processing
IServiceCollection UseRabbitFlowConsumers(this IServiceCollection services);| Method | Description |
|---|---|
ConfigureHost(Action<HostSettings>) | Set RabbitMQ connection details |
ConfigureJsonSerializerOptions(Action<JsonSerializerOptions>) | Customize JSON serialization |
ConfigurePublisher(Action<PublisherOptions>?) | Configure publisher behavior |
AddConsumer<TConsumer>(string queueName, Action<ConsumerSettings<TConsumer>>) | Register a consumer |
EasyRabbitFlow is designed for high-throughput scenarios:
PrefetchCount for optimal throughput vs. memory usage.PrefetchCount > 1.Recommended settings for high throughput:
cfg.AddConsumer<MyConsumer>("high-volume-queue", c =>
{
c.PrefetchCount = 50; // Process 50 messages concurrently
c.Timeout = TimeSpan.FromSeconds(60); // Generous timeout for heavy processing
c.ConfigureRetryPolicy(r =>
{
r.MaxRetryCount = 3;
r.RetryInterval = 500;
r.ExponentialBackoff = true;
r.ExponentialBackoffFactor = 2;
r.MaxRetryDelay = 10_000; // Cap at 10 seconds
});
});
cfg.ConfigurePublisher(pub =>
{
pub.DisposePublisherConnection = false; // Reuse connection
pub.IdempotencyEnabled = true; // Auto-assign MessageId
});This project is licensed under the MIT License.