Message broker connector abstractions for WorkflowFramework — RabbitMQ, Azure Service Bus, and Kafka configuration models and interfaces.
$ dotnet add package WorkflowFramework.Extensions.Connectors.MessagingA fluent, extensible workflow/pipeline engine for .NET with async-first design, middleware, branching, parallel execution, saga/compensation, and rich extensibility.
If/Then/ElseParallel()OnStepStarted, OnStepCompleted, OnWorkflowFailed, etc.ForEach, While, DoWhile, Retry for iteration patternsTry/Catch/Finally blocks in your workflowIPipelineStep<TIn, TOut> with chained input/output types[StepName], [StepTimeout], [StepRetry], [StepOrder]IWorkflowValidator / IStepValidator for pre-execution validationIApprovalServiceAuditMiddleware with IAuditStoreWorkflowTestHarness, FakeStep, StepTestBuilderdotnet add package WorkflowFrameworkpublic class ValidateInput : IStep
{
public string Name => "ValidateInput";
public Task ExecuteAsync(IWorkflowContext context)
{
// validation logic
return Task.CompletedTask;
}
}
public class ProcessData : IStep
{
public string Name => "ProcessData";
public Task ExecuteAsync(IWorkflowContext context)
{
// processing logic
return Task.CompletedTask;
}
}var workflow = Workflow.Create("MyWorkflow")
.Step<ValidateInput>()
.Step<ProcessData>()
.Step("SaveResult", ctx =>
{
Console.WriteLine("Saved!");
return Task.CompletedTask;
})
.Build();
var result = await workflow.ExecuteAsync(new WorkflowContext());
Console.WriteLine(result.Status); // Completedpublic class OrderData
{
public string OrderId { get; set; } = "";
public decimal Total { get; set; }
public bool IsValid { get; set; }
}
public class ValidateOrder : IStep<OrderData>
{
public string Name => "ValidateOrder";
public Task ExecuteAsync(IWorkflowContext<OrderData> context)
{
context.Data.IsValid = context.Data.Total > 0;
return Task.CompletedTask;
}
}
var workflow = Workflow.Create<OrderData>("OrderPipeline")
.Step(new ValidateOrder())
.If(ctx => ctx.Data.IsValid)
.Then(new ProcessOrder())
.Else(new RejectOrder())
.Build();
var result = await workflow.ExecuteAsync(
new WorkflowContext<OrderData>(new OrderData { OrderId = "ORD-1", Total = 99.99m }));
Console.WriteLine(result.Data.IsValid); // trueWorkflow.Create()
.If(ctx => someCondition)
.Then<ProcessStep>()
.Else<RejectStep>()
.Step<FinalStep>()
.Build();Workflow.Create()
.Parallel(p => p
.Step<SendEmail>()
.Step<SendSms>()
.Step<UpdateDashboard>())
.Build();public class LoggingMiddleware : IWorkflowMiddleware
{
public async Task InvokeAsync(IWorkflowContext context, IStep step, StepDelegate next)
{
Console.WriteLine($"Starting: {step.Name}");
await next(context);
Console.WriteLine($"Completed: {step.Name}");
}
}
Workflow.Create()
.Use<LoggingMiddleware>()
.Step<MyStep>()
.Build();public class DebitAccount : ICompensatingStep
{
public string Name => "DebitAccount";
public Task ExecuteAsync(IWorkflowContext context) { /* debit */ return Task.CompletedTask; }
public Task CompensateAsync(IWorkflowContext context) { /* credit back */ return Task.CompletedTask; }
}
Workflow.Create()
.WithCompensation()
.Step(new DebitAccount())
.Step(new CreditAccount())
.Build();Workflow.Create()
.WithEvents(new MyEventHandler())
.Step<MyStep>()
.Build();
public class MyEventHandler : WorkflowEventsBase
{
public override Task OnStepStartedAsync(IWorkflowContext ctx, IStep step)
{
Console.WriteLine($"Step started: {step.Name}");
return Task.CompletedTask;
}
}graph TD
A[Workflow.Create] --> B[IWorkflowBuilder]
B --> C[Add Steps]
B --> D[Add Middleware]
B --> E[Add Events]
B --> F[Build]
F --> G[WorkflowEngine]
G --> H{Execute Steps}
H --> I[Middleware Pipeline]
I --> J[Step.ExecuteAsync]
J --> K[Context / Properties]
H --> L{Compensation?}
L -->|Yes| M[Reverse Compensate]
H --> N[WorkflowResult]| Package | Description |
|---|---|
WorkflowFramework | Core abstractions + fluent builder |
WorkflowFramework.Extensions.DependencyInjection | Microsoft DI integration |
WorkflowFramework.Extensions.Polly | Polly resilience policies |
WorkflowFramework.Extensions.Persistence | Checkpoint/state persistence abstractions |
WorkflowFramework.Extensions.Persistence.InMemory | In-memory state store |
WorkflowFramework.Extensions.Diagnostics | OpenTelemetry tracing + timing |
WorkflowFramework.Generators | Source generators for step discovery |
WorkflowFramework.Extensions.Configuration | JSON/YAML workflow definitions |
WorkflowFramework.Extensions.Scheduling | Cron scheduling, approvals, delayed execution |
WorkflowFramework.Extensions.Visualization | Mermaid + DOT diagram export |
WorkflowFramework.Extensions.Reactive | Async streams / IAsyncEnumerable support |
WorkflowFramework.Extensions.Persistence.Sqlite | SQLite state store |
WorkflowFramework.Testing | Test harness, fake steps, event capture |
WorkflowFramework.Extensions.Persistence.EntityFramework | EF Core state store |
WorkflowFramework.Extensions.Distributed | Distributed locking and queuing abstractions |
WorkflowFramework.Extensions.Distributed.Redis | Redis lock and queue implementations |
WorkflowFramework.Extensions.Hosting | ASP.NET Core hosting integration + health checks |
WorkflowFramework.Extensions.Http | HTTP request steps with fluent builder |
WorkflowFramework.Analyzers | Roslyn analyzers for common mistakes |
using WorkflowFramework.Extensions.Polly;
Workflow.Create()
.UseResilience(builder => builder
.AddRetry(new RetryStrategyOptions { MaxRetryAttempts = 3 }))
.Step<UnreliableStep>()
.Build();using WorkflowFramework.Extensions.Diagnostics;
Workflow.Create()
.Use<TracingMiddleware>() // OpenTelemetry spans
.Use<TimingMiddleware>() // Step timing
.Step<MyStep>()
.Build();using WorkflowFramework.Extensions.Persistence;
using WorkflowFramework.Extensions.Persistence.InMemory;
var store = new InMemoryWorkflowStateStore();
Workflow.Create()
.Use(new CheckpointMiddleware(store))
.Step<LongRunningStep>()
.Build();using WorkflowFramework.Extensions.DependencyInjection;
services.AddWorkflowFramework();
services.AddStep<ValidateInput>();
services.AddWorkflowMiddleware<LoggingMiddleware>();using WorkflowFramework.Builder;
Workflow.Create()
.ForEach<string>(
ctx => (List<string>)ctx.Properties["Items"]!,
body => body.Step("Process", ctx =>
{
var item = (string)ctx.Properties["ForEach.Current"]!;
Console.WriteLine($"Processing: {item}");
return Task.CompletedTask;
}))
.Build();
// While loop
Workflow.Create()
.While(ctx => (int)ctx.Properties["Count"]! < 10,
body => body.Step("Increment", ctx =>
{
ctx.Properties["Count"] = (int)ctx.Properties["Count"]! + 1;
return Task.CompletedTask;
}))
.Build();
// Retry group
Workflow.Create()
.Retry(body => body.Step<FlakyStep>(), maxAttempts: 3)
.Build();Workflow.Create()
.Try(body => body.Step<RiskyStep>())
.Catch<InvalidOperationException>((ctx, ex) =>
{
Console.WriteLine($"Caught: {ex.Message}");
return Task.CompletedTask;
})
.Finally(body => body.Step("Cleanup", _ => { /* cleanup */ return Task.CompletedTask; }))
.Build();var validation = Workflow.Create("Validation")
.Step<ValidateInput>()
.Step<ValidatePermissions>()
.Build();
var main = Workflow.Create("Main")
.SubWorkflow(validation)
.Step<ProcessData>()
.Build();using WorkflowFramework.Pipeline;
var pipeline = Pipeline.Create<string>()
.Pipe<int>((s, ct) => Task.FromResult(int.Parse(s)))
.Pipe<string>((n, ct) => Task.FromResult($"Value: {n * 2}"))
.Build();
var result = await pipeline("21", CancellationToken.None); // "Value: 42"using WorkflowFramework.Registry;
using WorkflowFramework.Versioning;
var registry = new WorkflowRegistry();
registry.Register("OrderProcessing", () => BuildOrderWorkflow());
var runner = new WorkflowRunner(registry);
var result = await runner.RunAsync("OrderProcessing", context);
// Versioned workflows
var versionedRegistry = new VersionedWorkflowRegistry();
versionedRegistry.Register("OrderProcessing", 1, () => BuildV1());
versionedRegistry.Register("OrderProcessing", 2, () => BuildV2());
var latest = versionedRegistry.Resolve("OrderProcessing"); // Returns v2using WorkflowFramework.Extensions.Visualization;
var workflow = Workflow.Create("MyWorkflow")
.Step<StepA>()
.Step<StepB>()
.Build();
Console.WriteLine(workflow.ToMermaid()); // Mermaid diagram
Console.WriteLine(workflow.ToDot()); // Graphviz DOT formatusing WorkflowFramework.Extensions.Configuration;
var loader = new JsonWorkflowDefinitionLoader();
var definition = loader.LoadFromFile("workflow.json");
var stepRegistry = new StepRegistry();
stepRegistry.Register<ValidateOrder>();
stepRegistry.Register<ChargePayment>();
var builder = new WorkflowDefinitionBuilder(stepRegistry);
var workflow = builder.Build(definition);using WorkflowFramework.Testing;
// Override specific steps in tests
var harness = new WorkflowTestHarness()
.OverrideStep("ChargePayment", ctx =>
{
ctx.Properties["PaymentCharged"] = true;
return Task.CompletedTask;
});
var result = await harness.ExecuteAsync(workflow, new WorkflowContext());
// Capture events for assertions
var events = new InMemoryWorkflowEvents();
var workflow = Workflow.Create()
.WithEvents(events)
.Step<MyStep>()
.Build();
await workflow.ExecuteAsync(new WorkflowContext());
Assert.Single(events.StepCompleted);Run benchmarks with:
dotnet run -c Release --project benchmarks/WorkflowFramework.BenchmarksBenchmarks cover workflow execution, middleware pipeline overhead, and typed pipeline throughput.
dotnet build WorkflowFramework.slnx
dotnet test WorkflowFramework.slnxMIT © JDH Productions LLC.