This CleanCodeJN package for Service Bus simplifies the development of asynchronous microservices by providing a framework that leverages the power of MediatR and IOSP to consume service bus events from topics and execute commands to process these events.
$ dotnet add package CleanCodeJN.GenericApis.ServiceBusConsumerThis CleanCodeJN package for Service Bus simplifies the development of asynchronous microservices by providing a framework that leverages the power of MediatR and IOSP to consume service bus events from topics and execute commands to process these events.
Implement the full IServiceBusConsumerConfigurationService
public class SampleServiceBusConsumerConfigurationService(
IOptionsMonitor<SampleConfiguration> configuration,
ILogger<SampleServiceBusConsumerConfigurationService> logger) : IServiceBusConsumerConfigurationService
{
public virtual bool IsLocalEnvironment() => Environment.GetEnvironmentVariable("IS_LOCAL")?.Equals("true") ?? false;
public virtual void PrintLogoForDebugging() => StringExtensions.PrintLogo();
public virtual string PrintStartTextForDebugging() => "Please add the event as JSON and press ENTER twice.";
public virtual string GetServiceBusConnectionString() => configuration.CurrentValue.ServiceBus.ConnectionString;
public virtual ServiceBusConfiguration GetServiceBusTopicConfiguration() => configuration.CurrentValue.ServiceBus;
public virtual void LogIncomingEvent(string name, string body) => logger.LogDebug($"EventRequest_{name.Replace(" ", string.Empty)}", body);
public virtual string MaxRetryMessage(ProcessMessageEventArgs args) => "Max Retry reached";
public virtual void LogMaxRetryReached(ProcessMessageEventArgs args) => logger.LogCritical(message: MaxRetryMessage(args));
public virtual List<Assembly> GetCommandAssemblies() => [typeof(UpdateInvoiceEventRequest).Assembly];
public virtual Task LogAndHandleException(Exception exception, string message)
{
logger.LogCritical(exception, message);
return Task.CompletedTask;
}
public virtual void LogExecutionResponse(string body, Response response, Exception exception = null) =>
logger.LogDebug($"EventResponse_{JsonSerializer.Deserialize<JsonElement>(body).GetProperty("Name").GetString().Replace(" ", string.Empty)}_{(response.Succeeded ? "Success" : "Failure")}", new Dictionary<string, string>
{
{ nameof(response.Succeeded), response.Succeeded.ToString() },
{ nameof(response.Message), response.Message ?? exception?.Message },
{ nameof(response.Info), response.Info },
{ nameof(exception), exception?.StackTrace },
{ "data", body }
});
}
Or derive from ServiceBusConsumerConfigurationServiceBase for using reasonable defaults and only override the following methods
public class SampleServiceBusConsumerConfigurationService(
IOptionsMonitor<SampleConfiguration> configuration,
ILogger<SampleServiceBusConsumerConfigurationService> logger) : ServiceBusConsumerConfigurationServiceBase(logger)
{
public override ServiceBusConfiguration GetServiceBusTopicConfiguration() => configuration.CurrentValue.ServiceBus;
public override List<Assembly> GetCommandAssemblies() => [typeof(UpdateInvoiceEventRequest).Assembly];
}Add RegisterServiceBusConsumer() to your Program.cs
builder.Services.RegisterServiceBusConsumer<SampleServiceBusConsumerConfigurationService>(
builder.Configuration["ServiceBus:ConnectionString"],
[typeof(UpdateInvoiceEventRequest).Assembly]);Add Service Bus Configuration to your appsettings.json and your Configuration classes
{
"ServiceBus": {
"MaxRetryCount": 30,
"RetryDelayInMinutes": 10,
"ConnectionString": "",
"TopicConfigurations": [
{
"Name": "topicA",
"SubscriptionName": "general",
"MaxAutoLockRenewalDurationInMinutes": 15,
"PrefetchCount": 10,
"MaxConcurrentCalls": 5
},
{
"Name": "topicB",
"SubscriptionName": "general",
"MaxAutoLockRenewalDurationInMinutes": 15,
"PrefetchCount": 10,
"MaxConcurrentCalls": 5
}
]
}
}Consume Events by just posting events to the Service Bus
{
"InstanceId": "<Guid>",
"Name": "Event Name",
"Type": "CleanCodeJN.GenericApis.ServiceBusConsumer.Sample.Commands.UpdateInvoiceEventRequest",
"Environment": "Production",
"CreatedAt": "2024-03-25 16:05:50",
"CreatedFrom": "Event Producer",
"RequestId": "Businnss process Id",
"RetryCount": 0,
"Topic": "invoices",
"Data":
{
// Payload
}
}If you want to use the Generic Apis Commands and Repositories together with the Service Bus Consumer, than register everything as below
var builder = Host.CreateApplicationBuilder(args);
List<Assembly> assemblies = [
typeof(CleanCodeJN.GenericApis.Sample.Business.AssemblyRegistration).Assembly,
typeof(CleanCodeJN.GenericApis.Sample.Core.AssemblyRegistration).Assembly,
typeof(CleanCodeJN.GenericApis.Sample.Domain.AssemblyRegistration).Assembly,
Assembly.GetExecutingAssembly(),
];
builder.Services.Configure<SampleConfiguration>(builder.Configuration);
builder.Services
.RegisterValidatorsFromAssembly(typeof(CleanCodeJN.GenericApis.Sample.Core.AssemblyRegistration).Assembly)
.RegisterGenericCommands(assemblies)
.RegisterAutomapper(assemblies)
.RegisterServiceBusConsumer<SampleServiceBusConsumerConfigurationService>(builder.Configuration["ServiceBus:ConnectionString"], assemblies)
.RegisterDbContextAndRepositories<MyDbContext>();
await builder.Build().RunAsync();