CQRS command and query handling with strongly-typed request/response patterns. Part of Zonit.Messaging - a lightweight library for event-driven and CQRS architectures with full AOT/Trimming support.
$ dotnet add package Zonit.Messaging.CommandsA lightweight, high-performance .NET library for building event-driven and CQRS architectures with full AOT/Trimming support.
| Package | Version | Downloads | Description |
|---|---|---|---|
| Zonit.Messaging.Commands | CQRS Commands & Queries | ||
| Zonit.Messaging.Commands.Abstractions | Command interfaces | ||
| Zonit.Messaging.Events | Pub/Sub Events | ||
| Zonit.Messaging.Events.Abstractions | Event interfaces | ||
| Zonit.Messaging.Tasks | Background Jobs | ||
| Zonit.Messaging.Tasks.Abstractions |
| Task interfaces |
| Zonit.Messaging.Schedules | Recurring Jobs |
| Zonit.Messaging.Schedules.Abstractions | Schedule interfaces |
| Package | Version | Downloads | Status |
|---|---|---|---|
| Zonit.Services.EventMessage | :warning: Deprecated | ||
| Zonit.Services.EventMessage.Abstractions | :warning: Deprecated |
# Install current packages
dotnet add package Zonit.Messaging.Commands
dotnet add package Zonit.Messaging.Events
dotnet add package Zonit.Messaging.Tasks
dotnet add package Zonit.Messaging.Schedules
# Or via NuGet Package Manager
Install-Package Zonit.Messaging.Commands
Install-Package Zonit.Messaging.Events
Install-Package Zonit.Messaging.Tasks
Install-Package Zonit.Messaging.Schedules
Just call the registration methods - they work with or without handlers:
using Zonit.Messaging.Commands;
using Zonit.Messaging.Events;
using Zonit.Messaging.Tasks;
using Zonit.Messaging.Schedules;
// In your DI configuration (Program.cs or plugin registration)
services.AddCommandHandlers(); // Registers Commands (CQRS)
services.AddEventHandlers(); // Registers Events (Pub/Sub)
services.AddTaskHandlers(); // Registers Tasks (Background Jobs)
services.AddScheduleServices(); // Registers Schedules (Recurring Jobs)
That's it! These methods:
TryAdd to prevent duplicatesFor modular applications, call the methods in each plugin:
// In Kemavo.Plugins.Catalogs.Application
namespace Kemavo.Plugins;
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddCatalogsPlugin(this IServiceCollection services)
{
// These methods exist in Zonit.Messaging.* libraries
// Source Generator automatically adds handlers from this assembly
services.AddCommandHandlers();
services.AddEventHandlers();
services.AddTaskHandlers();
services.AddScheduleServices();
return services;
}
}
// In Program.cs
services.AddCatalogsPlugin();
services.AddWalletsPlugin();
services.AddAIPlugin();
For fine-grained control, you can register handlers explicitly (100% AOT-safe):
// Commands - specify handler, request, and response types
services.AddCommand<CreateUserHandler, CreateUserCommand, Guid>();
// Events - specify handler and event types
services.AddEvent<UserCreatedHandler, UserCreatedEvent>();
// Tasks - specify handler and task types
services.AddTask<SendEmailHandler, SendEmailTask>();
// Schedules - specify handler and data types
services.AddScheduleHandler<CleanupHandler, CleanupJobData>();
Request/Response pattern - send a request, get a typed response.
public record CreateUserCommand(string Name, string Email) : IRequest<Guid>;
public class CreateUserHandler : IRequestHandler<CreateUserCommand, Guid>
{
public async Task<Guid> HandleAsync(CreateUserCommand request, CancellationToken ct = default)
{
var userId = Guid.NewGuid();
// Save to database...
return userId;
}
}
var commandProvider = serviceProvider.GetRequiredService<ICommandProvider>();
var userId = await commandProvider.SendAsync(new CreateUserCommand("John", "john@example.com"));
Publish events to multiple subscribers asynchronously.
Create a class implementing IEventHandler<T> for automatic registration:
public record UserCreatedEvent(string Name, string Email);
public class UserCreatedEventHandler : IEventHandler<UserCreatedEvent>
{
private readonly ILogger<UserCreatedEventHandler> _logger;
public UserCreatedEventHandler(ILogger<UserCreatedEventHandler> logger)
{
_logger = logger;
}
public async Task HandleAsync(UserCreatedEvent data, CancellationToken cancellationToken)
{
_logger.LogInformation("User created: {Name}", data.Name);
await SendWelcomeEmailAsync(data.Email, cancellationToken);
}
}
var eventManager = serviceProvider.GetRequiredService<IEventManager>();
eventManager.Subscribe<UserCreatedEvent>(async (data, cancellationToken) =>
{
Console.WriteLine($"User created: {data.Name}");
await Task.CompletedTask;
});
var eventProvider = serviceProvider.GetRequiredService<IEventProvider>();
eventProvider.Publish(new UserCreatedEvent("John", "john@example.com"));
Group events to be processed sequentially:
using (var transaction = eventProvider.CreateTransaction())
{
eventProvider.Publish(new Event1());
eventProvider.Publish(new Event2());
// Events are queued until the transaction is completed
}
// Events are processed after the transaction is disposed
Wait for all events in a transaction to be processed:
using (var transaction = eventProvider.CreateTransaction())
{
eventProvider.Publish(new OrderCreatedEvent());
eventProvider.Publish(new InventoryUpdatedEvent());
// Wait for all events to be processed before continuing
await transaction.WaitForCompletionAsync();
}
Long-running operations with retry support and real-time progress tracking.
var taskManager = serviceProvider.GetRequiredService<ITaskManager>();
taskManager.Subscribe<SendEmailTask>(async payload =>
{
await SendEmailAsync(payload.Data.To, payload.Data.Subject);
}, new TaskSubscriptionOptions
{
WorkerCount = 5,
MaxRetries = 3,
RetryDelay = TimeSpan.FromSeconds(10)
});
Create handlers with automatic progress reporting based on estimated step durations:
public class ImportDataHandler : TaskHandler<ImportDataTask>
{
public override int WorkerCount => 2;
public override TimeSpan Timeout => TimeSpan.FromMinutes(10);
// Optional: Display title and description in UI
public override string? Title => "Import Data";
public override string? Description => "Importing data from external source";
// Define steps with estimated durations for smooth progress calculation
public override TaskProgressStep[]? ProgressSteps =>
[
new(TimeSpan.FromSeconds(5), "Connecting to source..."),
new(TimeSpan.FromSeconds(10), "Downloading data..."),
new(TimeSpan.FromSeconds(15), "Processing records..."),
new(TimeSpan.FromSeconds(5), "Saving to database...")
];
protected override async Task HandleAsync(
ImportDataTask data,
ITaskProgressContext progress,
CancellationToken cancellationToken)
{
// Step 1: Connect (0% -> 14%)
await progress.NextAsync();
await ConnectAsync(cancellationToken);
// Step 2: Download (14% -> 43%)
await progress.NextAsync();
await DownloadAsync(data.Url, cancellationToken);
// Step 3: Process (43% -> 86%)
await progress.NextAsync();
for (int i = 0; i < data.RecordCount; i++)
{
await ProcessRecordAsync(i, cancellationToken);
// Update message without changing step
await progress.SetMessageAsync($"Processing {i + 1}/{data.RecordCount}...");
}
// Step 4: Save (86% -> 100%)
await progress.NextAsync();
await SaveAsync(cancellationToken);
}
}
With Source Generators, registration is automatic:
// In Program.cs - handlers are auto-discovered and registered
services.AddTaskHandlers();
var taskProvider = serviceProvider.GetRequiredService<ITaskProvider>();
// Simple publish
taskProvider.Publish(new SendEmailTask { To = "user@example.com", Subject = "Welcome!" });
// Publish with ExtensionId for filtering
var organizationId = Guid.NewGuid();
taskProvider.Publish(new ImportDataTask("data.csv", 1000), organizationId);
Subscribe to real-time progress updates:
var taskManager = serviceProvider.GetRequiredService<ITaskManager>();
// Monitor all tasks
taskManager.OnChange(state =>
{
Console.WriteLine($"Task {state.TaskType}: {state.Progress}% - {state.Message}");
Console.WriteLine($"Duration: {state.Duration}");
});
// Monitor specific task type with typed data access
taskManager.OnChange<ImportDataTask>(state =>
{
Console.WriteLine($"Import from {state.Data.Source}: {state.Progress}%");
Console.WriteLine($"Step {state.CurrentStep}/{state.TotalSteps}: {state.Message}");
Console.WriteLine($"Running for: {state.Duration?.TotalSeconds:F1}s");
});
// Monitor multiple task types at once (efficient filtering)
taskManager.OnChange<ImportDataTask, ExportDataTask>(state =>
{
// Receives updates only for ImportDataTask or ExportDataTask
UpdateProgress(state.TaskType, state.Progress);
});
// Monitor up to 4 types simultaneously
taskManager.OnChange<Task1, Task2, Task3, Task4>(state =>
{
// Efficient server-side filtering
NotifyUI(state);
});
// Monitor tasks for specific ExtensionId (e.g., organization)
taskManager.OnChange(organizationId, state =>
{
// Only tasks published with this ExtensionId
UpdateProgressBar(state.Progress ?? 0);
});
// Monitor specific type for specific ExtensionId
taskManager.OnChange<ImportDataTask>(organizationId, state =>
{
// Typed access + filtered by ExtensionId
UpdateUI(state.Data, state.Progress);
});
| Property | Type | Description |
|---|---|---|
TaskId | Guid | Unique task identifier |
ExtensionId | Guid? | Optional identifier for filtering (e.g., user/organization ID) |
TaskType | string | Full type name of the task |
Title | string? | Optional display title for the task (null = uses TaskType) |
Description | string? | Optional description of what the task does |
Status | TaskStatus | Current status (Pending, Processing, Completed, Failed, Cancelled) |
Progress | int? | Progress 0-100 (null if not tracked) |
CurrentStep | int? | Current step number (1-based) |
TotalSteps | int? | Total number of steps |
Message | string? | Current status message |
CreatedAt | DateTimeOffset | When task was created |
StartedAt | DateTimeOffset? | When processing started |
CompletedAt | DateTimeOffset? | When processing finished |
Duration | TimeSpan? | Time elapsed since start |
// Get all active tasks
var activeTasks = taskManager.GetActiveTasks();
// Get active tasks for specific ExtensionId
var orgTasks = taskManager.GetActiveTasks(organizationId);
// Get active tasks of specific type with typed data access
var importTasks = taskManager.GetActiveTasks<ImportDataTask>();
foreach (var task in importTasks)
{
Console.WriteLine($"Import from {task.Data.Source}: {task.Progress}% - {task.Duration}");
}
// Get active tasks of multiple types
var dataTasks = taskManager.GetActiveTasks<ImportDataTask, ExportDataTask>();
var allProcessingTasks = taskManager.GetActiveTasks<Task1, Task2, Task3>();
// Get tasks filtered by type AND ExtensionId
var orgImports = taskManager.GetActiveTasks<ImportDataTask>(organizationId);
foreach (var task in activeTasks)
{
Console.WriteLine($"{task.TaskType}: {task.Status} ({task.Progress}%) - {task.Duration}");
}
// Get specific task state
var state = taskManager.GetTaskState(taskId);
Tasks go through various states during their lifecycle:
Duration propertyOnChange<T> to get typed access to task dataExtensionId at the system level for better performance| Method | Returns | Description |
|---|---|---|
GetActiveTasks() | IReadOnlyCollection<TaskState> | All active tasks |
GetActiveTasks(extensionId) | IReadOnlyCollection<TaskState> | Active tasks for specific ExtensionId |
GetActiveTasks<TTask>() | IReadOnlyCollection<TaskState<TTask>> | Active tasks of specific type with typed data |
GetActiveTasks<TTask>(extensionId) | IReadOnlyCollection<TaskState<TTask>> | Active tasks of type for ExtensionId |
GetActiveTasks<T1, T2>() | IReadOnlyCollection<TaskState> | Active tasks of 2 types |
GetActiveTasks<T1, T2, T3>() | IReadOnlyCollection<TaskState> | Active tasks of 3 types |
GetActiveTasks<T1, T2, T3, T4>() | IReadOnlyCollection<TaskState> | Active tasks of 4 types |
GetTaskState(taskId) | TaskState? | Specific task by ID |
| Method | Parameters | Description |
|---|---|---|
OnChange(handler) | Action<TaskState> | Monitor all task changes |
OnChange(extensionId, handler) | Guid, Action<TaskState> | Monitor tasks for ExtensionId |
OnChange<TTask>(handler) | Action<TaskState<TTask>> | Monitor specific task type with typed data |
OnChange<TTask>(extensionId, handler) | Guid, Action<TaskState<TTask>> | Monitor type for ExtensionId |
OnChange<T1, T2>(handler) | Action<TaskState> | Monitor 2 task types |
OnChange<T1, T2, T3>(handler) | Action<TaskState> | Monitor 3 task types |
OnChange<T1, T2, T3, T4>(handler) | Action<TaskState> | Monitor 4 task types |
All OnChange methods return IDisposable for unsubscribing.
Example: Monitoring Multiple Types
// Efficient server-side filtering - only 2 types are monitored
var subscription = taskManager.OnChange<ImportTask, ExportTask>(state =>
{
// This handler only receives ImportTask and ExportTask updates
// Much more efficient than filtering in the handler
Console.WriteLine($"{state.TaskType}: {state.Progress}%");
});
// Unsubscribe when done
subscription.Dispose();
Execute jobs on a recurring schedule using strongly-typed Schedule ValueObject (cron-like but with compile-time safety).
public record CleanupJobData(string Directory, int RetentionDays);
public class CleanupHandler : IScheduleHandler<CleanupJobData>
{
private readonly ILogger<CleanupHandler> _logger;
public CleanupHandler(ILogger<CleanupHandler> logger)
{
_logger = logger;
}
public async Task HandleAsync(CleanupJobData data, CancellationToken cancellationToken)
{
_logger.LogInformation("Cleanup: {Dir}, retention: {Days} days",
data.Directory, data.RetentionDays);
// Perform cleanup...
await Task.CompletedTask;
}
}
using Zonit.Messaging.Schedules;
// Option A: Automatic registration via Source Generator (Recommended)
services.AddScheduleServices(); // Source Generator auto-discovers IScheduleHandler<T> implementations
// Option B: Manual registration (100% AOT-safe)
services.AddScheduleHandler<CleanupHandler, CleanupJobData>();
public class MyService
{
private readonly IScheduleProvider _scheduleProvider;
public MyService(IScheduleProvider scheduleProvider)
{
_scheduleProvider = scheduleProvider;
}
public void StartJobs()
{
// Every 5 minutes
var id1 = _scheduleProvider.Start(
new CleanupJobData("/tmp", 7),
Schedule.EveryMinutes(5)
);
// Daily at 3:00 AM
var id2 = _scheduleProvider.Start(
new CleanupJobData("/logs", 30),
Schedule.EveryDay(3, 0)
);
// Multiple schedules (8:00 AM and 6:00 PM)
var id3 = _scheduleProvider.Start(
new CleanupJobData("/cache", 1),
Schedule.EveryDay(8, 0),
Schedule.EveryDay(18, 0)
);
// With options
var id4 = _scheduleProvider.Start(
new CleanupJobData("/data", 14),
options =>
{
options.Name = "DataCleanup";
options.Schedules = [Schedule.EveryHour(atMinute: 30)];
options.ExecuteOnStartup = true;
}
);
}
}
// Pause/Resume
_scheduleProvider.Pause(id);
_scheduleProvider.Resume(id);
// Stop permanently
_scheduleProvider.Stop(id);
// Trigger immediate execution
_scheduleProvider.TriggerNow(id);
// Get state
var state = _scheduleProvider.GetState(id);
Console.WriteLine($"Status: {state?.Status}");
Console.WriteLine($"Last run: {state?.LastExecutionAt}");
Console.WriteLine($"Next run: {state?.NextExecutionAt}");
Console.WriteLine($"Executions: {state?.ExecutionCount}");
// Get all schedules
var all = _scheduleProvider.GetAllSchedules();
var active = _scheduleProvider.GetActiveSchedules();
// Subscribe to all schedule changes
_scheduleProvider.OnChange(state =>
{
Console.WriteLine($"{state.Name}: {state.Status} ({state.ExecutionCount} runs)");
});
// Subscribe to specific schedule
_scheduleProvider.OnChange(scheduleId, state =>
{
if (state.Status == ScheduleStatus.Failed)
AlertAdmin(state.LastError);
});
| Method | Description | Example |
|---|---|---|
EverySeconds(n) | Every N seconds | Schedule.EverySeconds(30) |
EveryMinutes(n) | Every N minutes | Schedule.EveryMinutes(5) |
EveryHours(n) | Every N hours | Schedule.EveryHours(2) |
EveryDays(n) | Every N days | Schedule.EveryDays(1) |
EveryMinute() | Every minute at :00 | Schedule.EveryMinute() |
EveryHour(min) | Every hour at minute | Schedule.EveryHour(30) |
EveryDay(h, m) | Daily at time | Schedule.EveryDay(15, 0) |
EveryWeek(day, h, m) | Weekly on day | Schedule.EveryWeek(DayOfWeek.Monday, 9, 0) |
EveryMonth(day, h, m) | Monthly on day | Schedule.EveryMonth(1, 0, 0) |
EveryYear(mo, d, h, m) | Yearly on date | Schedule.EveryYear(1, 1, 0, 0) |
| Property | Type | Description |
|---|---|---|
Id | ScheduleId | Unique schedule identifier |
Name | string | Schedule name |
Status | ScheduleStatus | Pending, Running, Paused, Stopped, Completed, Failed |
Schedules | Schedule[] | Schedule rules |
CreatedAt | DateTimeOffset | When schedule was created |
LastExecutionAt | DateTimeOffset? | When last execution occurred |
NextExecutionAt | DateTimeOffset? | When next execution is scheduled |
ExecutionCount | int | Total number of executions |
ConsecutiveFailures | int | Number of consecutive failures |
LastError | string? | Last error message |
LastExecutionDuration | TimeSpan? | Duration of last execution |
If upgrading from Zonit.Services.EventMessage:
| Legacy (deprecated) | New |
|---|---|
using Zonit.Services.EventMessage; | using Zonit.Messaging.Events; |
services.AddEventMessageService() | services.AddEventHandlers() |
EventBase<T> | IEventHandler<T> |
TaskBase<T> | TaskHandler<T> |
PayloadModel<T> | Direct TEvent data parameter |
payload.Data / payload.CancellationToken | (data, cancellationToken) parameters |
Before (Legacy):
public class UserHandler : EventBase<UserCreatedEvent>
{
protected override async Task HandleAsync(UserCreatedEvent data, CancellationToken ct)
{
// Handle event
}
}
After (New API):
public class UserHandler : IEventHandler<UserCreatedEvent>
{
public async Task HandleAsync(UserCreatedEvent data, CancellationToken cancellationToken)
{
// Handle event - same signature, cleaner interface!
}
}
Legacy code continues to work but shows deprecation warnings.
Found a bug or have a feature request? Open an issue on GitHub!