扩展rabbitmq,并添加到mongodb记录
$ dotnet add package Asa.RabbitMq
一个功能强大的RabbitMQ消息队列库,支持依赖注入、MongoDB记录、智能重试策略、消息去重和延迟消息。
dotnet add package Asa.RabbitMq
// 在 Program.cs 或 Startup.cs 中
var builder = WebApplication.CreateBuilder(args);
// 注册 RabbitMQ 服务(需要传入 IConfiguration)
builder.Services.AddRabbitMqConfigureServices(builder.Configuration);
// 或者使用传统方式
// services.AddRabbitMqConfigureServices(configuration);
builder.Host.ConfigureContainer<ContainerBuilder>(containerBuilder =>
{
// 注册 RabbitMQ 服务(需要传入 IConfiguration)
containerBuilder.AddAutofacRabbitMqConfigureServices(builder.Configuration);
// 注册其他依赖项
containerBuilder.RegisterType<MyService>().As<IMyService>();
});
⚠️ 注意: 标准DI和Autofac配置只能选择一个!
在 appsettings.json 中添加连接字符串和重试策略配置:
{
"ConnectionStrings": {
"RabbitMQConnection": "amqp://username:password@hostname:port/virtual_host",
"MongodbMqRecordConnection": "mongodb://username:password@hostname:port/?authSource=admin"
},
"RabbitMq": {
"RetryPolicy": {
"MaxRetryCount": 3,
"InitialRetryInterval": 1,
"MaxRetryInterval": 30,
"BackoffMultiplier": 2,
"UseExponentialBackoff": true,
"AddJitter": true
}
}
}
💡 提示: 如果不配置重试策略,系统将使用默认值:重试3次,初始间隔1秒,使用指数退避和随机抖动。
RabbitMQ连接字符串格式(推荐使用标准 AMQP URI 格式):
# 单节点
amqp://username:password@hostname:port/virtual_host
# 集群模式(多个主机)
amqp://username:password@host1:port1,host2:port2,host3:port3/virtual_host
# 使用默认端口(5672)
amqp://username:password@hostname/virtual_host
旧格式兼容(向后兼容,建议迁移到新格式):
//username:password@hostname:port/virtual_host
MongoDB连接字符串格式:
mongodb://username:password@hostname:port/?authSource=admin
| 配置项 | 类型 | 描述 | 默认值 |
|---|---|---|---|
MaxRetryCount | int | 最大重试次数 | 3 |
InitialRetryInterval | double | 初始重试间隔(秒) | 1 |
MaxRetryInterval | double | 最大重试间隔(秒) | 30 |
BackoffMultiplier | double | 退避倍数 | 2 |
UseExponentialBackoff | bool | 是否使用指数退避 | true |
AddJitter | bool | 是否添加随机抖动 | true |
如果需要自定义重试策略,可以显式注册:
// 使用默认配置
builder.Services.AddRabbitMqRetryPolicy();
// 或从配置文件读取
builder.Services.AddRabbitMqRetryPolicy(builder.Configuration);
// 或使用代码配置
builder.Services.AddRabbitMqRetryPolicy(options =>
{
options.MaxRetryCount = 5;
options.InitialRetryInterval = 2;
options.MaxRetryInterval = 60;
});
⚠️ 注意: 如果使用
AddRabbitMqConfigureServices,重试策略会自动注册,无需手动添加。
public class OrderConsumer : IConsumer
{
private readonly ILogger<OrderConsumer> _logger;
public OrderConsumer(ILogger<OrderConsumer> logger)
{
_logger = logger;
}
public void Configure(IConsumerConfiguration consumerConfiguration)
{
consumerConfiguration
.WithExchangeName("order.exchange")
.WithRoutingKey("order.created")
.WithQueueName("order.queue")
.WithExchangeType(ExchangeTypeEnum.Topic)
.WithPrefetchCount(20)
.WithRetryCount(3)
.WithAutoAck(false); // 推荐手动确认
}
// 异步处理方法(推荐)
public async Task InvokeAsync(Order order)
{
try
{
_logger.LogInformation($"开始处理订单: {order.Id}");
// 处理订单逻辑
await ProcessOrderAsync(order);
_logger.LogInformation($"订单处理完成: {order.Id}");
}
catch (Exception ex)
{
_logger.LogError(ex, "处理订单时发生错误");
throw; // 框架会自动重试
}
}
// 或者使用同步版本
public void Invoke(Order order)
{
// 同步处理逻辑
_logger.LogInformation($"同步处理订单: {order.Id}");
ProcessOrder(order);
}
}
| 配置方法 | 类型 | 描述 | 默认值 | 注意事项 |
|---|---|---|---|---|
WithExchangeName | string | 交换机名称 | 必填 | - |
WithRoutingKey | string | 路由键 | 必填 | - |
WithQueueName | string | 队列名称 | 必填 | - |
WithExchangeType | ExchangeTypeEnum | 交换机类型 | Topic | Direct, Topic, Fanout, Headers |
WithPrefetchCount | int | 预取数量 | 10 | 控制消费者并发处理能力 |
WithRetryCount | int | 重试次数 | 3 | 消费级别的重试配置 |
WithAutoAck | bool | 自动确认 | false | 推荐设置为false确保消息可靠性 |
WithIsDelayQueue | bool | 延迟队列 | false | 需要RabbitMQ延迟插件 |
WithAutoDelete | bool | 自动删除 | false | 队列在没有消费者时自动删除 |
public class AdvancedOrderConsumer : IConsumer
{
private readonly ILogger<AdvancedOrderConsumer> _logger;
private readonly IOrderService _orderService;
public AdvancedOrderConsumer(
ILogger<AdvancedOrderConsumer> logger,
IOrderService orderService)
{
_logger = logger;
_orderService = orderService;
}
public void Configure(IConsumerConfiguration consumerConfiguration)
{
consumerConfiguration
.WithExchangeName("advanced.order.exchange")
.WithRoutingKey("order.*")
.WithQueueName("advanced.order.queue")
.WithExchangeType(ExchangeTypeEnum.Topic)
.WithPrefetchCount(50) // 高并发处理
.WithRetryCount(5) // 更多次的重试
.WithAutoAck(false) // 手动确认
.WithAutoDelete(false); // 持久化队列
}
public async Task InvokeAsync(Order order)
{
try
{
_logger.LogInformation($"开始处理高级订单: {order.Id},金额: {order.Amount}");
// 业务逻辑处理
var result = await _orderService.ProcessOrderAsync(order);
_logger.LogInformation($"高级订单处理成功: {order.Id},结果: {result.Status}");
}
catch (BusinessException ex)
{
// 业务异常不重试
_logger.LogWarning(ex, $"订单业务异常,跳过重试: {order.Id}");
throw new NonRetryableException(ex.Message);
}
catch (Exception ex)
{
_logger.LogError(ex, $"处理高级订单失败,将重试: {order.Id}");
throw; // 其他异常会自动重试
}
}
}
public class OrderService
{
private readonly IPublisher _publisher;
private readonly ILogger<OrderService> _logger;
public OrderService(IPublisher publisher, ILogger<OrderService> logger)
{
_publisher = publisher;
_logger = logger;
}
public async Task CreateOrderAsync(Order order)
{
try
{
_logger.LogInformation($"开始发布订单消息: {order.Id}");
// 直接发布对象,框架会自动序列化
await _publisher.PublishAsync(order, option =>
option
.WithExchangeName("order.exchange")
.WithRoutingKey("order.created")
.WithQueueName("order.queue")
.WithExchangeType(ExchangeTypeEnum.Topic));
_logger.LogInformation($"订单消息发布成功: {order.Id}");
}
catch (RabbitMqPublishException ex)
{
_logger.LogError(ex, $"发布订单消息失败: {order.Id}, Exchange: {ex.ExchangeName}, RoutingKey: {ex.RoutingKey}");
throw;
}
catch (Exception ex)
{
_logger.LogError(ex, $"发布订单消息时发生未知错误: {order.Id}");
throw;
}
}
}
| 配置方法 | 类型 | 描述 | 默认值 | 注意事项 |
|---|---|---|---|---|
WithExchangeName | string | 交换机名称 | 必填 | - |
WithRoutingKey | string | 路由键 | 必填 | - |
WithQueueName | string | 队列名称 | 必填 | - |
WithExchangeType | ExchangeTypeEnum | 交换机类型 | Topic | Direct, Topic, Fanout, Headers |
WithDeliverTime | int | 延迟投递时间(秒) | 0 | 0表示立即投递 |
WithAutoDelete | bool | 自动删除 | false | 队列在没有消费者时自动删除 |
WithArguments | Dictionary<string, object> | 自定义参数 | null | 可以添加额外的队列参数 |
public class AdvancedOrderService
{
private readonly IPublisher _publisher;
private readonly ILogger<AdvancedOrderService> _logger;
public AdvancedOrderService(IPublisher publisher, ILogger<AdvancedOrderService> logger)
{
_publisher = publisher;
_logger = logger;
}
public async Task CreateOrderWithDelayAsync(Order order, int delaySeconds = 0)
{
try
{
_logger.LogInformation($"开始发布{ (delaySeconds > 0 ? "延迟" : "") }订单消息: {order.Id}");
var customHeaders = new Dictionary<string, object>
{
{ "order-type", "advanced" },
{ "priority", order.Priority },
{ "source", "web-api" }
};
await _publisher.PublishAsync(order, option =>
option
.WithExchangeName("advanced.order.exchange")
.WithRoutingKey($"order.{order.Type.ToLower()}")
.WithQueueName("advanced.order.queue")
.WithExchangeType(ExchangeTypeEnum.Topic)
.WithDeliverTime(delaySeconds) // 延迟投递
.WithAutoDelete(false)
.WithArguments(customHeaders));
_logger.LogInformation($"{ (delaySeconds > 0 ? "延迟" : "") }订单消息发布成功: {order.Id}");
}
catch (RabbitMqConnectionException ex)
{
_logger.LogError(ex, $"RabbitMQ连接失败: {order.Id}");
// 可以在这里添加重连逻辑
throw;
}
catch (RabbitMqPublishException ex)
{
_logger.LogError(ex, $"发布订单消息失败: {order.Id}, MessageId: {ex.MessageId}");
throw;
}
catch (Exception ex)
{
_logger.LogError(ex, $"发布订单消息时发生未知错误: {order.Id}");
throw;
}
}
// 发布字符串消息
public async Task PublishRawMessageAsync(string content, string routingKey)
{
await _publisher.PublishAsync(content, option =>
option
.WithExchangeName("raw.message.exchange")
.WithRoutingKey(routingKey)
.WithQueueName("raw.message.queue"));
}
}
public class DelayedOrderConsumer : IConsumer
{
public void Configure(IConsumerConfiguration consumerConfiguration)
{
consumerConfiguration
.WithExchangeName("delayed.order.exchange")
.WithRoutingKey("delayed.order.created")
.WithQueueName("delayed.order.queue")
.WithExchangeType(ExchangeTypeEnum.Topic)
.WithIsDelayQueue(true) // 标记为延迟队列
.WithRetryCount(3);
}
public async Task InvokeAsync(Message message)
{
// 处理延迟消息
}
}
await _publisher.PublishAsync(message, option =>
option
.WithExchangeName("delayed.order.exchange")
.WithRoutingKey("delayed.order.created")
.WithDeliverTime(30) // 30秒后投递
.WithQueueName("delayed.order.queue"));
⚠️ 注意: 延迟消息需要RabbitMQ安装
rabbitmq_delayed_message_exchange插件
Asa.RabbitMq内置了强大的消息去重机制,可以有效防止消息重复处理:
双重检查机制:
去重标识:
x-message-id 作为唯一标识缓存管理:
消息去重是自动启用的,无需额外配置。系统会:
// 发布消息时会自动添加唯一ID
await _publisher.PublishAsync(order, option =>
option.WithExchangeName("order.exchange")
.WithRoutingKey("order.created"));
// 如果相同消息ID的消息再次到达,会被自动跳过
RabbitMqException (基础异常)
├── RabbitMqConnectionException (连接异常)
├── RabbitMqPublishException (发布异常)
├── RabbitMqConsumeException (消费异常)
├── RabbitMqConfigurationException (配置异常)
├── RabbitMqSerializationException (序列化异常)
└── RabbitMqTimeoutException (超时异常)
try
{
await _publisher.PublishAsync(message, config);
}
catch (RabbitMqConnectionException ex)
{
_logger.LogError(ex, "RabbitMQ连接失败");
// 可以尝试重连或使用备用方案
await HandleConnectionFailureAsync();
}
try
{
await _publisher.PublishAsync(message, config);
}
catch (RabbitMqPublishException ex)
{
_logger.LogError(ex, $"消息发布失败: {ex.MessageId}, Exchange: {ex.ExchangeName}");
// 可以记录失败消息并后续重试
await RecordFailedMessageAsync(ex.MessageId, message);
}
public async Task InvokeAsync(Order order)
{
try
{
await ProcessOrderAsync(order);
}
catch (BusinessException ex)
{
// 业务异常不重试
_logger.LogWarning(ex, $"订单业务异常: {order.Id}");
throw new NonRetryableException(ex.Message);
}
catch (TimeoutException ex)
{
// 超时异常可以重试
_logger.LogWarning(ex, $"订单处理超时: {order.Id}");
throw; // 框架会自动重试
}
catch (Exception ex)
{
_logger.LogError(ex, $"订单处理失败: {order.Id}");
throw; // 其他异常会根据重试策略处理
}
}
使用 NonRetryableException 标记不需要重试的异常:
throw new NonRetryableException("数据验证失败");
系统使用智能的指数退避算法:
重试次数: 1, 等待时间: 1秒
重试次数: 2, 等待时间: 2秒 (1 * 2^1)
重试次数: 3, 等待时间: 4秒 (1 * 2^2)
重试次数: 4, 等待时间: 8秒 (1 * 2^3)
...
最大等待时间: 30秒
为避免惊群效应,系统会添加随机抖动:
基础等待时间: 4秒
随机抖动: 0-50%
实际等待时间: 4-6秒
默认情况下,以下异常类型会触发重试:
TimeoutExceptionSystem.IO.IOExceptionSystem.Net.Sockets.SocketException可以通过配置文件自定义可重试异常类型。
注入 IRabbitMqRecord 服务来查询消息记录:
public class MessageQueryService
{
private readonly IRabbitMqRecord _rabbitMqRecord;
public MessageQueryService(IRabbitMqRecord rabbitMqRecord)
{
_rabbitMqRecord = rabbitMqRecord;
}
public async Task<List<MessageRecord>> GetPublishRecordsAsync()
{
var result = await _rabbitMqRecord.GetPublishPageListAsync(new PageQuery
{
PageIndex = 1,
PageSize = 20,
StartTime = DateTime.Now.AddDays(-7),
EndTime = DateTime.Now
});
return result.Data;
}
public async Task<List<MessageRecord>> GetReceivedRecordsAsync()
{
var result = await _rabbitMqRecord.GetReceivedPageListAsync(new PageQuery
{
PageIndex = 1,
PageSize = 20
});
return result.Data;
}
public async Task RePublishMessageAsync(string messageId)
{
await _rabbitMqRecord.RePublishAsync(messageId);
}
}
| 配置方法 | 类型 | 必填 | 默认值 | 描述 |
|---|---|---|---|---|
WithExchangeName | string | ✅ | - | 交换机名称 |
WithRoutingKey | string | ✅ | - | 路由键 |
WithQueueName | string | ✅ | - | 队列名称 |
WithExchangeType | ExchangeTypeEnum | ❌ | Topic | 交换机类型 |
WithPrefetchCount | int | ❌ | 10 | 预取数量 |
WithRetryCount | int | ❌ | 3 | 重试次数 |
WithAutoAck | bool | ❌ | false | 自动确认 |
WithIsDelayQueue | bool | ❌ | false | 是否为延迟队列 |
WithAutoDelete | bool | ❌ | false | 自动删除 |
| 配置方法 | 类型 | 必填 | 默认值 | 描述 |
|---|---|---|---|---|
WithExchangeName | string | ✅ | - | 交换机名称 |
WithRoutingKey | string | ✅ | - | 路由键 |
WithQueueName | string | ✅ | - | 队列名称 |
WithExchangeType | ExchangeTypeEnum | ❌ | Topic | 交换机类型 |
WithDeliverTime | int | ❌ | 0 | 延迟投递时间(秒) |
WithAutoDelete | bool | ❌ | false | 自动删除 |
WithArguments | Dictionary<string, object> | ❌ | null | 自定义参数 |
| 类型 | 描述 | 使用场景 |
|---|---|---|
Direct | 直接交换 | 精确路由匹配 |
Topic | 主题交换 | 模式匹配路由 |
Fanout | 扇出交换 | 广播到所有队列 |
Headers | 头交换 | 基于消息头路由 |
系统会自动配置以下MongoDB连接池参数:
| 参数 | 值 | 描述 |
|---|---|---|
MaxConnectionPoolSize | 200 | 最大连接池大小 |
ConnectTimeout | 30秒 | 连接超时时间 |
SocketTimeout | 60秒 | Socket超时时间 |
HeartbeatTimeout | 10秒 | 心跳超时时间 |
WaitQueueTimeout | 10秒 | 等待队列超时时间 |
系统会自动配置以下RabbitMQ连接参数:
| 参数 | 值 | 描述 |
|---|---|---|
AutomaticRecoveryEnabled | true | 自动恢复连接 |
TopologyRecoveryEnabled | true | 自动恢复拓扑 |
NetworkRecoveryInterval | 10秒 | 网络恢复间隔 |
RequestedHeartbeat | 30秒 | 心跳间隔 |
ClientProvidedName | "Asa.RabbitMq" | 客户端标识名称 |
自动重连机制:
资源管理:
IDisposable 接口,确保资源正确释放按需连接:
EnsureConnectedAsync)现象:抛出 RabbitMqConnectionException
解决方案:
# 检查RabbitMQ服务状态
sudo systemctl status rabbitmq-server
# 检查端口是否开放
telnet hostname 5672
# 查看RabbitMQ日志
sudo journalctl -u rabbitmq-server
检查项:
自动重连:
现象:消息已发布但消费者未处理
解决方案:
// 检查消费者是否正确注册
public class MyConsumer : IConsumer
{
public void Configure(IConsumerConfiguration config)
{
// 确保配置正确
config.WithExchangeName("my.exchange")
.WithRoutingKey("my.routing.key")
.WithQueueName("my.queue");
}
public Task InvokeAsync(MyMessage message)
{
// 处理逻辑
return Task.CompletedTask;
}
}
检查项:
IConsumer 接口现象:同一消息被处理多次
解决方案:
// 检查消息去重是否正常工作
// 系统会自动使用 x-message-id 进行去重
// 确保消息包含唯一的消息ID
检查项:
现象:异常发生时没有重试
解决方案:
// 检查异常类型
public Task InvokeAsync(MyMessage message)
{
try
{
// 业务逻辑
}
catch (BusinessException ex)
{
// 业务异常不会重试
throw new NonRetryableException(ex.Message);
}
catch (Exception ex)
{
// 其他异常会自动重试
throw;
}
}
检查项:
NonRetryableException现象:延迟消息立即投递或未投递
解决方案:
# 检查延迟插件是否安装
rabbitmq-plugins list | grep delayed
# 启用延迟插件
sudo rabbitmq-plugins enable rabbitmq_delayed_message_exchange
检查项:
x-delayed-message{
"ConnectionStrings": {
"MongodbMqRecordConnection": "mongodb://user:pass@host/?maxPoolSize=200&waitQueueMultiple=5"
}
}
public void Configure(IConsumerConfiguration config)
{
config.WithPrefetchCount(50); // 根据服务器性能调整
}
{
"RabbitMq": {
"RetryPolicy": {
"MaxRetryCount": 5,
"InitialRetryInterval": 2,
"MaxRetryInterval": 60,
"BackoffMultiplier": 2
}
}
}
{
"Logging": {
"LogLevel": {
"Asa.RabbitMq": "Information",
"Asa.RabbitMq.Impls.Publisher": "Debug",
"Asa.RabbitMq.Impls.Subscriber": "Debug"
}
}
}
// 检查连接状态
public async Task<bool> CheckRabbitMqHealthAsync()
{
try
{
await _publisher.PublishAsync(new HealthCheckMessage(), config =>
config.WithExchangeName("health.check")
.WithRoutingKey("ping"));
return true;
}
catch
{
return false;
}
}
// 查询消息处理统计
var stats = await _rabbitMqRecord.GetPublishPageListAsync(
routingKey: "order.*",
startDate: DateTime.Now.AddDays(-1),
endDate: DateTime.Now);
访问 http://localhost:15672 查看消息队列状态
// 查看发布记录
db.Publish.find({}).sort({Added: -1}).limit(10)
// 查看消费记录
db.Received.find({Status: "Success"}).sort({Added: -1}).limit(10)
// 统计失败消息
db.Received.find({Status: "Fail"}).count()
主要改进:
IDisposable,资源自动清理配置变更:
{
"ConnectionStrings": {
// 推荐使用新的标准 URI 格式
"RabbitMQConnection": "amqp://username:password@hostname:port/virtual_host"
},
"RabbitMq": {
"RetryPolicy": {
// 可选配置,不配置将使用默认值
"MaxRetryCount": 3,
"InitialRetryInterval": 1
}
}
}
代码变更:
// 代码无需修改,直接升级 NuGet 包即可
// dotnet add package Asa.RabbitMq --version 1.0.14