基于RabbitMQ实现的消息总线,参考eShop EventBus
$ dotnet add package EasilyNET.RabbitBus.AspNetCoreorder 参数控制顺序执行时的处理器顺序本库已移除对 RabbitMQ 延迟消息交换机(rabbitmq-delayed-message-exchange)插件的支持。
原因:RabbitMQ 官方团队已于 2026 年 1 月 29 日宣布停止维护该插件,主要原因如下:
替代方案:
// Program.cs / Startup
builder.Services.AddRabbitBus(c =>
{
// 1) 连接配置(支持连接串/单点/集群)
c.WithConnection(f => f.Uri = new(builder.Configuration.GetConnectionString("Rabbit")!));
// 2) 消费者默认设置
// dispatchConcurrency: ConsumerDispatchConcurrency(默认 10)
// prefetchCount/prefetchSize/global: QoS(默认 100/0/false)
// consumerChannelLimit: 消费者通道上限(0 不限制)
c.WithConsumerSettings(dispatchConcurrency: 10, prefetchCount: 100, prefetchSize: 0, global: false, consumerChannelLimit: 0);
// 3) 弹性与发布确认
// retryCount/retryIntervalSeconds: 重试次数与后台重试间隔
// publisherConfirms: 发布确认(默认 true)
// maxOutstandingConfirms: 最大未确认发布数(默认 1000)
// batchSize: 批量发布大小(默认 100)
// confirmTimeoutMs: 发布确认超时(默认 30000ms)
c.WithResilience(retryCount: 5, retryIntervalSeconds: 1, publisherConfirms: true, maxOutstandingConfirms: 1000, batchSize: 100, confirmTimeoutMs: 30000);
// 4) 交换机声明/验证
// 注意:若你调用了 WithExchangeSettings() 且未传参,则 validateExchangesOnStartup 将变为 false
c.WithExchangeSettings(skipExchangeDeclare: false, validateExchangesOnStartup: true);
// 5) 重试队列容量(可选)
c.WithRetryQueueSizing(maxSize: null, memoryRatio: 0.02, avgEntryBytes: 2048);
// 6) 应用标识(可选)
c.WithApplication("YourAppName");
// ===== 事件配置 =====
c.AddEvent<TestEvent>(EModel.Routing, exchangeName: "test.exchange", routingKey: "test.key", queueName: "test.queue")
.WithEventQos(prefetchCount: 20)
.WithEventHeaders(new() { ["x-version"] = "v1" })
.WithEventQueueArgs(new() { ["x-max-priority"] = 9 })
.WithEventExchangeArgs(new() { ["alternate-exchange"] = "alt.exchange" })
.WithMiddleware<TestEventMiddleware>() // 可选:中间件(事务/幂等性)
.WithFallbackHandler<TestEventFallbackHandler>() // 可选:回退处理器
.WithHandler<TestEventHandler>()
.WithHandler<TestEventHandlerSecond>()
.And();
// 发布/订阅(Fanout)
c.AddEvent<FanoutEvent>(EModel.PublishSubscribe, "fanout.exchange", queueName: "fanout.queue")
.WithHandler<FanoutEventHandler>();
// Headers 交换机(基于消息头属性匹配路由)
c.AddEvent<HeadersEvent>(EModel.Headers, "headers.exchange", queueName: "headers.queue")
.WithBindingArguments(new() { ["x-match"] = "all", ["format"] = "pdf", ["type"] = "report" })
.WithEventHeaders(new() { ["format"] = "pdf", ["type"] = "report" })
.WithHandler<HeadersEventHandler>()
.And();
// 顺序执行 + 显式排序
c.AddEvent<OrderEvent>(EModel.Routing, "order.exchange", "order.key", "order.queue")
.ConfigureEvent(cfg => cfg.SequentialHandlerExecution = true)
.WithMiddleware<OrderEventMiddleware>()
.WithFallbackHandler<OrderFallbackHandler>()
.WithHandler<OrderValidationHandler>(order: 0)
.WithHandler<OrderProcessingHandler>(order: 10)
.WithHandler<OrderNotificationHandler>(order: 20)
.And();
// 忽略某个处理器
c.IgnoreHandler<TestEvent, TestEventHandlerSecond>();
// 自定义序列化器(可选)
// c.WithSerializer<MsgPackSerializer>();
});
using EasilyNET.RabbitBus.Core;
using EasilyNET.RabbitBus.Core.Abstraction;
public class TestEvent : Event
{
public string Message { get; set; } = default!;
}
public class TestEventHandler(ILogger<TestEventHandler> logger) : IEventHandler<TestEvent>
{
public Task HandleAsync(TestEvent @event)
{
logger.LogInformation("TestEvent: {event} @ {time}", @event.Message, DateTime.Now);
return Task.CompletedTask;
}
}
public class TestEventHandlerSecond(ILogger<TestEventHandlerSecond> logger) : IEventHandler<TestEvent>
{
public Task HandleAsync(TestEvent @event)
{
logger.LogInformation("SecondHandler: {event} @ {time}", @event.Message, DateTime.Now);
return Task.CompletedTask;
}
}
// 构造注入 IBus 后使用
private readonly IBus _bus;
public MyController(IBus bus) => _bus = bus;
[HttpPost("send")]
public async Task Send()
{
// 普通消息(按事件配置路由)
await _bus.Publish(new TestEvent { Message = "normal" });
// 显式指定 routingKey(Topic/多路由场景)
await _bus.Publish(new TestEvent { Message = "topic" }, routingKey: "topic.queue.1");
// 使用优先级(需队列设置 x-max-priority)
await _bus.Publish(new TestEvent { Message = "priority" }, priority: 5);
// 批量发布
var events = Enumerable.Range(1, 100).Select(i => new TestEvent { Message = $"batch-{i}" });
await _bus.PublishBatch(events);
// 批量发布 + 自定义路由
await _bus.PublishBatch(events, routingKey: "batch.topic");
}
public sealed class MsgPackSerializer : IBusSerializer
{
private static readonly MessagePackSerializerOptions standardOptions =
MessagePackSerializerOptions.Standard
.WithResolver(CompositeResolver.Create(NativeDateTimeResolver.Instance, ContractlessStandardResolver.Instance))
.WithSecurity(MessagePackSecurity.UntrustedData);
private static readonly MessagePackSerializerOptions lz4BlockArrayOptions =
standardOptions.WithCompression(MessagePackCompression.Lz4BlockArray);
private static readonly MessagePackSerializerOptions lz4BlockOptions =
standardOptions.WithCompression(MessagePackCompression.Lz4Block);
public byte[] Serialize(object? obj, Type type)
{
var data = MessagePackSerializer.Serialize(type, obj, standardOptions);
var options = data.Length > 8192 ? lz4BlockArrayOptions : lz4BlockOptions;
return MessagePackSerializer.Serialize(type, obj, options);
}
public object? Deserialize(byte[] data, Type type)
{
var options = data.Length > 8192 ? lz4BlockArrayOptions : lz4BlockOptions;
return MessagePackSerializer.Deserialize(type, data, options);
}
}
// 注册
builder.Services.AddRabbitBus(c =>
{
// ...其他配置
c.WithSerializer<MsgPackSerializer>();
});
Headers 交换机根据消息头属性(而非 routing key)进行路由,适用于多维度内容过滤场景。
WithBindingArguments):定义队列绑定时的匹配规则,包含 x-match(all 或 any)和匹配键值对WithEventHeaders):定义发布时携带的头部键值对,用于与绑定参数进行匹配x-match=all:消息头必须包含绑定中所有键值对才匹配x-match=any:消息头只要包含绑定中任意一个键值对即匹配// 定义事件
public class PdfReportEvent : Event
{
public string Content { get; set; } = default!;
}
// 注册事件(x-match=all:必须同时匹配 format=pdf 和 type=report)
c.AddEvent<PdfReportEvent>(EModel.Headers, "report.headers.exchange", queueName: "pdf-reports")
.WithBindingArguments(new() { ["x-match"] = "all", ["format"] = "pdf", ["type"] = "report" })
.WithEventHeaders(new() { ["format"] = "pdf", ["type"] = "report" })
.WithHandler<PdfReportHandler>()
.And();
// 注册事件(x-match=any:匹配 region=asia 或 priority=high 任意一个即可)
c.AddEvent<UrgentEvent>(EModel.Headers, "urgent.headers.exchange", queueName: "urgent-queue")
.WithBindingArguments(new() { ["x-match"] = "any", ["region"] = "asia", ["priority"] = "high" })
.WithEventHeaders(new() { ["region"] = "asia" })
.WithHandler<UrgentEventHandler>()
.And();
注意:Headers 交换机完全忽略 routing key,路由仅依赖消息头与绑定参数的匹配。性能上比 direct/topic 交换机稍差(需逐个匹配 header 键值对),适合多维度过滤场景。
WithHandler<THandler>() 明确注册的处理器才会创建消费者并注入 DI。⚠️ 破坏性变更(Breaking Change):处理器(Handler)、中间件(Middleware)和回退处理器(FallbackHandler)的 DI 生命周期已从 Singleton 变更为 Scoped。如果你的处理器依赖 Singleton 语义(如内部维护可变状态),请改用注入的 Singleton 服务来管理共享状态。此变更是为了正确支持每条消息独立的 DI 作用域,使处理器可以安全注入
DbContext等 Scoped 服务。此外,中间件和回退处理器在 DI 解析失败时将抛出InvalidOperationException而非静默降级,以确保显式配置的组件不会被意外跳过。
SequentialHandlerExecution = true 配置,确保处理器按 order 参数排序后依次执行WithMiddleware<T>() 注册中间件,包裹整个处理器链路。适用于事务、幂等性检查、审计日志等横切关注点。WithFallbackHandler<T>() 注册回退处理器,在所有重试耗尽后决定消息的处置方式(Ack/Nack/Requeue/DeadLetter)。AddSource("EasilyNET.RabbitBus") 即可接入。HandlerThreadCount(每事件消费者数量)以及 ConsumerDispatchConcurrency 以提升并发;ConsumerChannelLimit 可限制通道数量。x-max-priority。EModel.None 表示不显式声明交换机,使用默认交换机;此时 routingKey 默认为队列名。Publish 的 routingKey 参数可覆盖事件配置中的路由键,便于 Topic 多路由。PublishBatch 减少网络往返;根据消息大小调整 BatchSize(默认 100,建议 50-500)。ValidateExchangesOnStartup=true);若外部统一声明交换机,可设 SkipExchangeDeclare=true 跳过声明。MaxOutstandingConfirms 会等待以保护内存。RetryCount 后写入死信存储。可通过实现 IDeadLetterStore 接口自定义死信存储(如数据库、Redis 等)。支持两种处理器执行模式,通过事件配置中的 SequentialHandlerExecution 属性控制:
// 顺序执行:处理器按注册顺序依次执行(适用于有执行顺序依赖的场景)
c.AddEvent<OrderEvent>(EModel.Routing, "order.exchange", "order.key", "order.queue")
.ConfigureEvent(cfg => cfg.SequentialHandlerExecution = true)
.WithHandler<OrderValidationHandler>() // 第一步:验证
.WithHandler<OrderProcessingHandler>() // 第二步:处理
.WithHandler<OrderNotificationHandler>() // 第三步:通知
.And();
// 并发执行(默认):处理器并行执行,提高吞吐量
c.AddEvent<LogEvent>(EModel.Routing, "log.exchange", "log.key", "log.queue")
.WithHandler<ConsoleLogHandler>()
.WithHandler<FileLogHandler>()
.WithHandler<DatabaseLogHandler>()
.And();
| 模式 | 配置 | 特点 | 适用场景 |
|---|---|---|---|
| 并发执行 | SequentialHandlerExecution = false(默认) | 处理器并行执行,高吞吐 | 处理器之间无依赖关系 |
| 顺序执行 | SequentialHandlerExecution = true | 按注册顺序依次执行 | 处理器有执行顺序依赖 |
中间件包裹整个处理器执行链路,支持事务、幂等性检查、日志记录等横切关注点。不调用 next() 可短路管道(如幂等性检查命中时直接返回)。
using EasilyNET.RabbitBus.Core.Abstraction;
public class OrderEventMiddleware(DbContext db, ILogger<OrderEventMiddleware> logger) : IEventMiddleware<OrderEvent>
{
public async Task HandleAsync(EventContext<OrderEvent> context, Func<Task> next)
{
// 幂等性检查:如果已处理过该事件,直接返回(短路管道)
if (await db.ProcessedEvents.AnyAsync(e => e.EventId == context.Event.EventId, context.CancellationToken))
{
logger.LogInformation("Event {EventId} already processed, skipping", context.Event.EventId);
return; // 不调用 next(),短路管道
}
// 开启数据库事务包裹所有处理器
await using var tx = await db.Database.BeginTransactionAsync(context.CancellationToken);
try
{
await next(); // 执行所有处理器
// 记录已处理事件
db.ProcessedEvents.Add(new ProcessedEvent { EventId = context.Event.EventId, ProcessedAt = DateTime.UtcNow });
await db.SaveChangesAsync(context.CancellationToken);
await tx.CommitAsync(context.CancellationToken);
}
catch
{
await tx.RollbackAsync(context.CancellationToken);
throw;
}
}
}
c.AddEvent<OrderEvent>(EModel.Routing, "order.exchange", "order.key", "order.queue")
.WithMiddleware<OrderEventMiddleware>() // 注册中间件
.WithHandler<OrderValidationHandler>()
.WithHandler<OrderProcessingHandler>()
.And();
中间件注册为 Scoped 生命周期,可注入 DbContext 等 Scoped 服务。每个事件类型最多配置一个中间件。
当处理器执行过程中发生异常(Polly 重试耗尽后仍然失败)时,回退处理器被调用。返回 ConsumerAction 枚举决定消息的命运。
注意:顺序执行模式下,任一处理器失败即触发回退(后续处理器不再执行);并发执行模式下,任一处理器失败也会触发回退。
using EasilyNET.RabbitBus.Core.Abstraction;
public class OrderFallbackHandler(ILogger<OrderFallbackHandler> logger) : IEventFallbackHandler<OrderEvent>
{
public Task<ConsumerAction> OnFallbackAsync(OrderEvent @event, Exception exception, int retryCount)
{
logger.LogError(exception, "Order event {EventId} failed after {RetryCount} retries", @event.EventId, retryCount);
// 根据异常类型决定消息处置方式
return Task.FromResult(exception switch
{
// 数据验证错误:确认消费(不重试)
ValidationException => ConsumerAction.Ack,
// 临时性错误:重新入队稍后重试
TimeoutException => ConsumerAction.Requeue,
// 其他错误:发送到死信存储
_ => ConsumerAction.DeadLetter
});
}
}
c.AddEvent<OrderEvent>(EModel.Routing, "order.exchange", "order.key", "order.queue")
.WithMiddleware<OrderEventMiddleware>()
.WithFallbackHandler<OrderFallbackHandler>() // 注册回退处理器
.WithHandler<OrderValidationHandler>()
.WithHandler<OrderProcessingHandler>()
.And();
ConsumerAction 枚举:
| 值 | 说明 |
|---|---|
Ack | 确认消息(即使处理失败也标记为已消费) |
Nack | 拒绝消息,不重新入队 |
Requeue | 拒绝消息并重新入队,稍后重新消费 |
DeadLetter | 将消息存入死信存储后确认 |
若未配置回退处理器,处理失败后默认 Nack(与之前行为一致)。
当启用顺序执行时,可通过 order 参数显式控制处理器的执行顺序(值越小越先执行):
c.AddEvent<OrderEvent>(EModel.Routing, "order.exchange", "order.key", "order.queue")
.ConfigureEvent(cfg => cfg.SequentialHandlerExecution = true)
.WithHandler<OrderValidationHandler>(order: 0) // 第一步:验证
.WithHandler<OrderProcessingHandler>(order: 10) // 第二步:处理
.WithHandler<OrderNotificationHandler>(order: 20) // 第三步:通知
.And();
不指定
order时默认为 0,按注册顺序执行(与之前行为一致)。
每个事件可配置独立的 Polly 弹性管道,覆盖全局默认策略:
c.AddEvent<CriticalEvent>(EModel.Routing, "critical.exchange", "critical.key", "critical.queue")
.WithHandlerResilience(builder =>
{
// 关键业务:更多重试、更长超时
builder.AddRetry(new()
{
MaxRetryAttempts = 5,
Delay = TimeSpan.FromSeconds(2),
BackoffType = DelayBackoffType.Exponential,
UseJitter = true
});
builder.AddTimeout(TimeSpan.FromMinutes(2));
})
.WithHandler<CriticalEventHandler>()
.And();
c.AddEvent<LogEvent>(EModel.Routing, "log.exchange", "log.key", "log.queue")
.WithHandlerResilience(builder =>
{
// 日志事件:快速失败,不重试
builder.AddTimeout(TimeSpan.FromSeconds(5));
})
.WithHandler<LogEventHandler>()
.And();
未配置自定义弹性管道时,使用全局默认的 HandlerPipeline。
框架内置 System.Diagnostics.ActivitySource 支持,自动为发布和消费操作创建追踪 span,并通过 RabbitMQ 消息头传播 trace context(traceparent/tracestate)。
builder.Services.AddOpenTelemetry()
.ConfigureResource(resource => resource.AddService("MyApp"))
.WithTracing(tracing =>
{
tracing.AddSource("EasilyNET.RabbitBus") // 添加 RabbitBus ActivitySource
.AddAspNetCoreInstrumentation()
.AddOtlpExporter();
});
发布 span(rabbitmq.publish,ActivityKind.Producer):
| 标签 | 说明 |
|---|---|
messaging.system | rabbitmq |
messaging.destination | 交换机名称 |
messaging.destination_kind | exchange |
messaging.rabbitmq.routing_key | 路由键 |
messaging.message.id | 事件 ID |
消费 span(rabbitmq.consume,ActivityKind.Consumer):
| 标签 | 说明 |
|---|---|
messaging.system | rabbitmq |
messaging.source | 来源交换机 |
messaging.destination | 路由键 |
messaging.consumer_id | 消费者索引 |
发布端自动将
traceparent/tracestate注入消息头,消费端自动提取并关联为父级 span,实现跨进程的完整链路追踪。
IDeadLetterStore 是公开接口,可实现自定义死信存储(如 Redis、数据库等):
// ✅ 可复制使用的 Redis 死信存储示例(同时也是一个模板):
// - 该实现会把死信消息序列化后存入 Redis String
// - 读取时会根据 OriginalEventType 反序列化回具体事件类型(要求事件类型在当前应用可加载)
// - 注意:示例中使用 server.KeysAsync(pattern) 扫描 key,生产环境建议改为 Set/SortedSet + Scan 维护索引
//
// NuGet:
// - StackExchange.Redis
using System.Runtime.CompilerServices;
using System.Text.Json;
using EasilyNET.RabbitBus.AspNetCore.Abstractions;
using EasilyNET.RabbitBus.Core.Abstraction;
using StackExchange.Redis;
/// <summary>
/// Redis 死信存储示例
/// </summary>
public class RedisDeadLetterStore : IDeadLetterStore
{
private readonly IConnectionMultiplexer _redis;
private readonly JsonSerializerOptions _jsonOptions;
public RedisDeadLetterStore(IConnectionMultiplexer redis, JsonSerializerOptions? jsonOptions = null)
{
_redis = redis;
_jsonOptions = jsonOptions ?? new(JsonSerializerDefaults.Web);
}
private const string KeyPrefix = "deadletter:";
private static string BuildKey(string eventType, string eventId) => $"{KeyPrefix}{eventType}:{eventId}";
private sealed record DeadLetterEnvelope(
string EventType,
string EventId,
DateTime CreatedUtc,
int RetryCount,
string OriginalEventType,
string OriginalEventJson);
private sealed class RedisDeadLetterMessage(string eventType, string eventId, DateTime createdUtc, int retryCount, IEvent originalEvent) : IDeadLetterMessage
{
public string EventType { get; } = eventType;
public string EventId { get; } = eventId;
public DateTime CreatedUtc { get; } = createdUtc;
public int RetryCount { get; } = retryCount;
public IEvent OriginalEvent { get; } = originalEvent;
}
public async ValueTask StoreAsync(IDeadLetterMessage message, CancellationToken cancellationToken = default)
{
var db = _redis.GetDatabase();
var originalEventType = message.OriginalEvent.GetType().AssemblyQualifiedName;
if (string.IsNullOrWhiteSpace(originalEventType))
{
throw new InvalidOperationException("OriginalEvent type must have a valid AssemblyQualifiedName.");
}
var envelope = new DeadLetterEnvelope(
message.EventType,
message.EventId,
message.CreatedUtc,
message.RetryCount,
originalEventType,
JsonSerializer.Serialize(message.OriginalEvent, message.OriginalEvent.GetType(), _jsonOptions));
var key = BuildKey(message.EventType, message.EventId);
var value = JsonSerializer.Serialize(envelope, _jsonOptions);
await db.StringSetAsync(key, value).ConfigureAwait(false);
}
public async IAsyncEnumerable<IDeadLetterMessage> GetAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var db = _redis.GetDatabase();
var server = _redis.GetServer(_redis.GetEndPoints().First());
await foreach (var key in server.KeysAsync(pattern: $"{KeyPrefix}*").WithCancellation(cancellationToken))
{
if (cancellationToken.IsCancellationRequested) yield break;
var value = await db.StringGetAsync(key);
if (value.HasValue)
{
var envelope = JsonSerializer.Deserialize<DeadLetterEnvelope>(value!, _jsonOptions);
if (envelope is null)
{
continue;
}
var evtType = Type.GetType(envelope.OriginalEventType);
if (evtType is null)
{
continue;
}
var evt = JsonSerializer.Deserialize(envelope.OriginalEventJson, evtType, _jsonOptions) as IEvent;
if (evt is null)
{
continue;
}
yield return new RedisDeadLetterMessage(envelope.EventType, envelope.EventId, envelope.CreatedUtc, envelope.RetryCount, evt);
}
}
}
public async ValueTask ClearAsync(CancellationToken cancellationToken = default)
{
var db = _redis.GetDatabase();
var server = _redis.GetServer(_redis.GetEndPoints().First());
await foreach (var key in server.KeysAsync(pattern: $"{KeyPrefix}*").WithCancellation(cancellationToken))
{
if (cancellationToken.IsCancellationRequested)
{
break;
}
await db.KeyDeleteAsync(key).ConfigureAwait(false);
}
}
}
// 注册自定义死信存储(替换内置的 InMemoryDeadLetterStore)
builder.Services.AddSingleton<IDeadLetterStore, RedisDeadLetterStore>();
IDeadLetterMessage 接口成员:
| 属性 | 类型 | 说明 |
|---|---|---|
EventType | string | 事件类型名称 |
EventId | string | 事件唯一标识 |
CreatedUtc | DateTime | 消息创建时间(UTC) |
RetryCount | int | 进入死信前的重试次数 |
OriginalEvent | IEvent | 原始事件实例 |
// 通过增加 HandlerThreadCount 创建多个消费者通道并行消费
c.AddEvent<OrderEvent>(EModel.Routing, "order.exchange", "order.key", "order.queue")
.WithHandlerThreadCount(4)
.WithHandler<OrderEventHandler>()
.And();
builder.Services.AddRabbitBus(c =>
{
// 高并发消费者设置
c.WithConsumerSettings(dispatchConcurrency: 50, prefetchCount: 200);
// 禁用发布确认以提升性能(生产环境可根据需要启用)
c.WithResilience(retryCount: 3, publisherConfirms: false);
// 增加消费者并发
// 事件级通过 WithHandlerThreadCount 调整(见下方示例)
// 事件配置
c.AddEvent<HighVolumeEvent>(EModel.Routing, "highvol.exchange", "highvol.key", "highvol.queue")
.WithEventQos(prefetchCount: 100)
.WithHandlerThreadCount(8)
.WithHandler<HighVolumeEventHandler>()
.And();
});
builder.Services.AddRabbitBus(c =>
{
// 保守的消费者设置
c.WithConsumerSettings(dispatchConcurrency: 5, prefetchCount: 20);
// 启用发布确认确保消息不丢失
c.WithResilience(retryCount: 10, publisherConfirms: true);
// 事件配置
c.AddEvent<CriticalEvent>(EModel.Routing, "critical.exchange", "critical.key", "critical.queue")
.WithEventQos(prefetchCount: 10)
.WithHandlerThreadCount(1)
.WithHandler<CriticalEventHandler>()
.And();
});
builder.Services.AddRabbitBus(c =>
{
// 集群连接(多个节点)
c.WithConnection(f =>
{
f.HostName = "rabbitmq-cluster";
f.UserName = "user";
f.Password = "password";
f.Port = 5672;
f.VirtualHost = "/";
// 如使用 AmqpTcpEndpoints, 可在 builder 中提供多个节点
});
// 其他配置...
});
健康检查
RabbitBusHealthCheck。若你启用了 ASP.NET Core 健康检查端点,只需在管道中映射:// Program.cs
builder.Services.AddHealthChecks(); // 若外部未调用,库内部也会注册
var app = builder.Build();
app.MapHealthChecks("/health");
指标(基于 System.Diagnostics.Metrics)
Meter 名称: EasilyNET.RabbitBus
rabbitmq.publish.normal.total, rabbitmq.publish.retried.total, rabbitmq.publish.discarded.totalrabbitmq.publish.confirm.ack.total, rabbitmq.publish.confirm.nack.total, rabbitmq.publish.outstanding.confirmsrabbitmq.retry.enqueued.totalrabbitmq.connection.reconnects.total, rabbitmq.connection.active, rabbitmq.channel.active, rabbitmq.connection.staterabbitmq.deadletter.total说明(EN):These are the latest dot-separated metric names. Older versions used underscore-based names (for example:
rabbitmq_published_normal_total). If you previously collected metrics via the old names, please update your dashboards/alerts accordingly. 说明(中文):以上为最新的点分式指标命名规范,已从旧的下划线风格(例如:rabbitmq_published_normal_total)迁移而来。如你已基于旧名称配置监控/告警,请同步更新对应配置。
快速观察(开发):
dotnet-counters monitor --process <your-app-pid> --counters EasilyNET.RabbitBus
OpenTelemetry: 按常规方式接入 OTLP/Prometheus 导出器即可收集上述指标。
分布式追踪: ActivitySource 名称为 EasilyNET.RabbitBus,支持发布/消费 span 自动创建与 trace context 传播。
// 接入 OpenTelemetry 追踪
builder.Services.AddOpenTelemetry()
.WithTracing(tracing => tracing.AddSource("EasilyNET.RabbitBus"));
MaxOutstandingConfirms 为阈值控制未确认发布数量。SkipExchangeDeclare=true 时,框架不会主动声明交换机,仅在需要时进行被动验证或直接发布(取决于场景)。ValidateExchangesOnStartup=true 时,启动阶段会被动(passive)验证交换机是否存在且类型匹配; 若类型不一致,会明确报错并终止启动(避免运行期频繁连接被关闭)。SkipExchangeDeclare 以减少不必要的声明开销。builder.Services.AddRabbitBus(c =>
{
// 集群连接(多个节点)
c.WithConnection(f =>
{
f.HostName = "rabbitmq-cluster";
f.UserName = "user";
f.Password = "password";
f.Port = 5672;
// 集群节点
f.VirtualHost = "/";
});
// 其他配置...
});
连接失败
消息丢失
性能问题
内存泄漏
| 配置方法 | 参数 | 默认值 | 说明 |
|---|---|---|---|
WithConnection | - | - | RabbitMQ 连接配置(主机、端口、认证等) |
WithConsumerSettings | dispatchConcurrency | 10 | 消费者调度并发数,控制同时处理的消息数 |
prefetchCount | 100 | QoS 预取计数,限制未确认消息数量 | |
prefetchSize | 0 | QoS 预取大小 | |
global | false | QoS 是否全局 | |
consumerChannelLimit | 0 | 消费者通道上限(0 表示不限制) | |
WithResilience | retryCount | 5 | 发布失败/确认失败的重试次数 |
retryIntervalSeconds | 1 | 后台重试检查间隔(秒) | |
publisherConfirms | true | 是否启用发布确认模式 | |
maxOutstandingConfirms | 1000 | 最大未确认发布数量 | |
batchSize | 100 | 批量发布大小 | |
confirmTimeoutMs | 30000 | 发布确认超时时间(毫秒) | |
WithExchangeSettings | skipExchangeDeclare | false | 跳过交换机声明(外部已声明时可启用) |
validateExchangesOnStartup | false | 启动验证交换机类型(调用该方法时默认) | |
WithRetryQueueSizing | maxSize | - | 固定最大重试队列长度(>0 生效) |
memoryRatio | 0.02 | 估算队列内存占比(0-0.25) | |
avgEntryBytes | 2048 | 单条重试项估算字节数 | |
WithApplication | appName | MachineName, EasilyNET | 应用标识,用于日志与指标标签 |
WithEventQos | prefetchCount | - | 事件级 QoS 设置(覆盖全局设置) |
prefetchSize | - | 事件级 QoS 预取大小 | |
global | - | 事件级 QoS 是否全局 | |
WithEventHeaders | headers | - | 消息头参数 |
WithEventQueueArgs | args | - | 队列声明参数(x-max-priority 等) |
WithEventExchangeArgs | args | - | 交换机声明参数 |
WithBindingArguments | arguments | - | Headers交换机绑定参数(x-match及匹配键值对) |
WithHandler | THandler | - | 注册事件处理器(必须) |
order | 0 | 处理器执行顺序(值越小越先执行) | |
WithMiddleware | TMiddleware | - | 注册事件中间件(可选,每事件最多一个) |
WithFallbackHandler | TFallback | - | 注册回退处理器(可选,重试耗尽后调用) |
WithHandlerResilience | Action<ResiliencePipelineBuilder> | 全局 HandlerPipeline | 自定义事件级弹性管道 |
WithHandlerThreadCount | threadCount | 1 | 该事件消费者数量(并行度) |
ConfigureEvent | SequentialHandlerExecution | false | 是否按顺序执行处理器 |
Exchange/Queue/Qos/Headers/... | - | 事件高级配置入口 | |
IgnoreHandler | - | - | 忽略指定的处理器 |
WithSerializer | - | System.Text.Json | 自定义消息序列化器 |
| 全局 | SkipExchangeDeclare | false | 跳过交换机声明(外部已声明时可启用) |
| 全局 | ValidateExchangesOnStartup | false | 启动阶段验证交换机类型与存在性 |
以下为高级内容,普通使用者无需关注。
框架内部使用 System.Threading.Channels.Channel<T> 实现高性能的重试消息队列:
ConcurrentQueue<T>,Channel 提供更好的异步消费体验struct RetryMessage 减少 GC 压力框架使用 Polly v8+ 的 ResiliencePipeline 实现弹性策略:
WithHandlerResilience 按事件覆盖)消费侧采用中间件管道模式处理消息:
消息到达 → 反序列化 → [中间件 HandleAsync] → 处理器链路(顺序/并发) → Ack
↓ (异常)
[回退处理器 OnFallbackAsync] → ConsumerAction → Ack/Nack/Requeue/DeadLetter
IEventMiddleware<T>):包裹整个处理器链路,支持事务、幂等性、日志等IEventFallbackHandler<T>):Polly 重试耗尽后调用,决定消息命运HandlerConfiguration.Order 控制顺序执行时的处理器顺序框架使用 System.Diagnostics.ActivitySource(名称:EasilyNET.RabbitBus)实现分布式追踪:
rabbitmq.publish Producer span,将 traceparent/tracestate 注入消息头rabbitmq.consume Consumer span// 内部注册示例(仅供参考)
services.AddResiliencePipeline(Constant.HandlerPipelineName, (builder, context) =>
{
builder.AddRetry(new()
{
ShouldHandle = new PredicateBuilder()
.Handle<BrokerUnreachableException>()
.Handle<SocketException>()
.Handle<TimeoutException>(),
MaxRetryAttempts = 2,
Delay = TimeSpan.FromSeconds(1),
BackoffType = DelayBackoffType.Exponential,
UseJitter = true
});
builder.AddTimeout(TimeSpan.FromSeconds(30));
});
生产环境建议
开发环境建议
性能监控
错误处理