A message driven distributed asynchronous workflow framework. 消息驱动的分布式异步工作流程处理框架。
$ dotnet add package FluentWorkflow.CoreA message driven distributed asynchronous workflow framework. 消息驱动的分布式异步工作流程处理框架。
基于消息驱动的分布式异步工作流程处理框架,使用 SourceGenerator 简化开发中的重复工作。
在典型的消息驱动处理流程中,阶段的开始消息与结束消息、各个消息的触发都需要手动定义,这些多数属于重复工作,FluentWorkflow是为了减少这些重复劳动而诞生的
2.0 完全不兼容之前的代码
消息、处理器、相关命名空间 等,都需要按新的命名规则调整FluentWorkflow.RabbitMQ 启用 RabbitMQ.Client 的 7.* 版本支持,6.* 版本支持使用包 FluentWorkflow.RabbitMQ.Legacypartial/继承);Diagnostic支持;net8.0+;尽可能的全链路更新,避免导致的未知问题;WorkflowContext 核心为 字符串字典 其属性在赋值时进行序列化存放,对象后续的修改不会反应到上下文中;2.0 已调整相关逻辑,引用类型对象的修改将会正常反馈到后续流程中Workflow 中重写各个阶段的触发事件方法时,方法内不能往外抛出异常,会导致该阶段消息重新进入队列,再次执行;FluentWorkflow.RabbitMQ 依赖 交换机 和 队列 进行消息分发,当存在多套环境需要隔离时,确保 交换机 和 队列 都不相同,否则将会出现消息重复消费;FluentWorkflow.RabbitMQ 在 绑定信息(交换机、队列)变更时不能完全自动调整,需要人工修正,如手动移除队列错误的交换机绑定和RoutingKey绑定,否则将会出现消息重复消费;消息队列中间件异常的情况下可能会出现流程中断、重复消费等情况;FluentWorkflow.Core 包<ItemGroup>
<PackageReference Include="FluentWorkflow.Core" Version="2.1.1" />
</ItemGroup>
工作流程声明public partial class SampleWorkflowDeclaration : IWorkflowDeclaration
{
}
partial;IWorkflowDeclaration;public partial class SampleWorkflowDeclaration : IWorkflowDeclaration
{
internal override void DeclareContext(IWorkflowContextDeclarator declarator)
{
declarator.Property<int>("Count");
}
internal override void DeclareWorkflow(IWorkflowDeclarator declarator)
{
declarator.Name("Sample") //声明工作流程名称
.Begin() //开始定义流程
.Then("SampleStage1") //声明阶段 SampleStage1
.Then("SampleStage2") //声明阶段 SampleStage2
.Then("SampleStage3") //声明阶段 SampleStage3
.Completion(); //完成声明
}
}
到此一个 工作流程声明 就定义完成了,该工作流程名为Sample,包含三个阶段 SampleStage1 -> SampleStage2 -> SampleStage3,上下文中包含一个名为Count的int类型属性
工作流程声明在DeclareWorkflow方法中使用参数declarator定义,必须链式调用:
Name("{WorkflowName}")定义名称Begin()开始定义流程Then("{StageName}")声明每个阶段,声明顺序即为阶段顺序,阶段名称必须满足C#标识符命名规则和约定Completion()结束定义工作流程上下文声明在DeclareContext方法中使用参数declarator定义,必须链式调用:
Property<T>("{PropertyName}", "{Comment}")定义上下文的属性及其类型与备注partial 类进行属性添加相同,此逻辑更方便分发工作流程 的工作代码GenerateWorkflowCodesAttribute 声明要生成的工作流程[assembly: GenerateWorkflowCodes<SampleWorkflowDeclaration>]
工作流程声明 的相同命名空间下声明 {WorkflowName}Workflow 的 partial 类[assembly: GenerateWorkflowCodes<SampleWorkflowDeclaration>]
namespace SampleWorkflowNamespace; //应当与目标声明相同,即与 SampleWorkflowDeclaration 的命名空间相同
public partial class SampleWorkflow
{
public SampleWorkflow(SampleWorkflowContext context, IServiceProvider serviceProvider) : base(context, serviceProvider)
{
}
}
SampleWorkflowContext (模板:{WorkflowName}Context)Stage{StageName}Message)Stage{StageName}CompletedMessage)StageSampleStage1HandlerBase、StageSampleStage2HandlerBase、StageSampleStage3HandlerBase (模板:Stage{StageName}HandlerBase)SampleWorkflowDeclaration 的命名空间及其子命名空间下阶段处理器基类,并实现各个阶段处理逻辑// SampleStage2 与 SampleStage3 同理
public class StageSampleStage1Handler : StageSampleStage1HandlerBase
{
public StageSampleStage1Handler(IServiceProvider serviceProvider) : base(serviceProvider)
{
}
protected override Task ProcessAsync(ProcessContext processContext, StageSampleStage1Message stageMessage, CancellationToken cancellationToken)
{
//TODO 阶段业务逻辑
return Task.CompletedTask;
}
}
控制服务services.AddFluentWorkflow()
.AddSampleWorkflow(configuration => //添加工作流程
{
configuration.AddScheduler() //添加工作流程调度器
.AddResultObserver(); //添加结果观察器
})
.UseInMemoryWorkflowMessageDispatcher(); //配置使用的消息分发器,这里使用基于内存的分发器来示范
阶段处理服务services.AddFluentWorkflow()
.AddSampleWorkflow(configuration => //添加工作流程
{
configuration.StageSampleStage1Handler<StageSampleStage1Handler>(); //添加对应阶段的处理器, SampleStage2 与 SampleStage3 同理
})
.<>()
.UseInMemoryWorkflowMessageDispatcher(); //配置使用的消息分发器,这里使用基于内存的分发器来示范
FluentWorkflow正常工作的必要条件:
服务使用同一套消息分发器;WorkflowScheduler;StageHandler,各个阶段的阶段处理器有且仅有一个(单个服务,可多实例);子工作流程时必须配置子工作流程结果观察器 - ResultObserver;子工作流程时,必须使用支持等待多个子工作流程的 IWorkflowAwaitProcessor; (默认实现了基于redis的多流程等待处理器,配置时使用UseRedisWorkflowAwaitProcessor方法以启用)//从DI容器中获取工作流程构建器
var workflowBuilder = ServiceProvider.GetRequiredService<IWorkflowBuilder<SampleWorkflow>>();
//创建工作流程上下文
var context = new SampleWorkflowContext();
//构建工作流程
var workflow = workflowBuilder.Build(context);
//启动工作流程,框架会自动触发各个阶段处理器后完成
await workflow.StartAsync(default);
WorkflowScheduler的服务,但需要接入消息分发器并在配置时使用 Add****Workflow() 添加对应的工作流程构造器;partial的,可以声明partial类进行拓展,不可使用partial类拓展的功能基本上都可以继承后重写,在配置服务时替换默认实现即可;Workflow 类会添加生命周期各个阶段的触发事件方法,可以继承后重写其逻辑以在各个阶段执行相关的逻辑(注意每次触发可能不在同一个服务实例中。重写后应当捕获并处理所有异常,不要抛出);WorkflowContext 核心为字符串/对象混合字典,对其修改理论上只对后续可见并在整个执行周期可用,可以将执行参数、结果、中间值等存放其中;IWorkflowMessageDispatcher控制(默认提供了基于CAP、Abp以及基础的FluentWorkflow.RabbitMQ可选);StageHandler 出现异常则认为工作流程失败,不会将异常抛给上层 IWorkflowMessageDispatcher(消息分发的重试不会触发),可以重写 StageHandler 的 OnException 方法来将异常向上抛出;修改/删除了既有的阶段定义,会导致还在处理过程中工作流程无法正常运行(但添加不会影响);部分功能为源码接入的方式,默认不生成,在项目中指定需要的功能后自动生成
<PropertyGroup>
<FluentWorkflowGeneratorAdditional>AbpFoundation,CAPFoundation,AbpMessageDispatcher,CAPMessageDispatcher,RedisAwaitProcessor</FluentWorkflowGeneratorAdditional>
</PropertyGroup>
| 名称 | 功能 |
|---|---|
| AbpFoundation | Abp的基础功能支持 |
| CAPFoundation | CAP的基础功能支持 |
| AbpMessageDispatcher | Abp的消息分发器 |
| CAPMessageDispatcher | CAP的消息分发器 |
| RedisAwaitProcessor | 基于StackExchange.Redis的子流程等待处理器 |
FluentWorkflow.GenericExtension.{工作流程命名空间} 下,如配置拓展方法等;FluentWorkflow.RabbitMQFluentWorkflow.RabbitMQ 包<ItemGroup>
<PackageReference Include="FluentWorkflow.RabbitMQ" Version="2.1.1" />
</ItemGroup>
services.AddFluentWorkflow()
.UseRabbitMQMessageDispatcher(options =>
{
//配置RabbitMQ
});
配置单个消息的消费速率,其它消息不受限
services.Configure<RabbitMQOptions>(options =>
{
//配置阶段Stage1的消息 - StageSampleStage1Message 的消费速率,即当前服务实例同时只会有一个阶段Stage1在处理
options.MessageGroup("Group1", builder =>
{
builder.Add<StageSampleStage1Message>()
.WithQosChannelInitialization(1);
});
});
RabbitMQ消息的消费ack超时时间默认为30分钟,进行长时间处理时可能会出现意外情况,可参照 acknowledgement-timeout 进行调整
x-consumer-timeout 为 1 小时(如果RabbitMQ版本支持的话);RabbitMQOptions.QueueArgumentsSetup 对队列的 x-consumer-timeout 参数进行调整;默认情况下,当抛出的异常继承接口 IBusyConsumer 时,流程不会立即失败,而是将消息重新返回消息队列
在阶段处理器中实现子工作流程等待逻辑
internal class StageSampleStage1Handler : StageSampleStage1HandlerBase
{
public StageSampleStage1Handler(IServiceProvider serviceProvider) : base(serviceProvider)
{
}
protected async override Task ProcessAsync(ProcessContext processContext, StageSampleStage1Message stageMessage, CancellationToken cancellationToken)
{
//构建子工作流程
var workflowBuilder = ServiceProvider.GetRequiredService<IWorkflowBuilder<SampleWorkflow>>();
var workflow = workflowBuilder.Build(new SampleWorkflowContext());
//Other workflow setting
//将未启动的子工作流程传递给当前阶段处理上下文,并命名为 - 'taskName'
processContext.AwaitChildWorkflow("taskName", workflow);
// Other logic
//当前阶段将等待子工作流程处理完成后再完成
}
protected override async Task OnAwaitFinishedAsync(SampleWorkflowContext context, IReadOnlyDictionary<string, IWorkflowContext?> childWorkflowContexts, CancellationToken cancellationToken)
{
//从等待的子工作流程上下文字典中取出 - 'taskName'
var workflowContext = (SampleWorkflowContext)childWorkflowContexts["taskName"];
//处理子工作流程结果,如将 workflowContext 内的结果赋值给 context,以便在当前工作流程的后续阶段中使用等
await base.OnAwaitFinishedAsync(context, childWorkflowContexts, cancellationToken);
//当前阶段将完成
}
}
Diagnostic支持services.AddFluentWorkFlow().EnableDiagnostic();
IWorkflowDebugRunner进行调试运行services.AddFluentWorkflow().AddDebugRunner();
IWorkflowDebugRunner进行消息的执行var transmissionModelRawData = """
//MessageJson
"""u8.ToArray();
var debugRunner = ServiceProvider.GetRequiredService<IWorkflowDebugRunner>();
debugRunner.RunAsync(transmissionModelRawData);
在 WorkFlow 的 On{StageName}Async 和 On{StageName}CompletedAsync 中不执行参数委托 fireMessage,则后续流程不再执行
在 WorkFlow 的 On{StageName}Async 和 On{StageName}CompletedAsync 中不执行参数委托 fireMessage,中止流程,在此基础上调用 SerializeContext 方法将上下文序列化后存放
// 存放 contextData 以用于流程恢复
var contextData = SerializeContext(message.Context);
调用具体 WorkFlow 的静态方法 ResumeAsync 使用挂起的流程数据进行恢复执行
// contextData 为序列化的上下文数据
await XXXXWorkflow.ResumeAsync(contextData, serviceProvider, cancellationToken)
恢复流程将会再次调用序列化上下文时的方法,需要注意,小心再次被挂起
更多信息参见源码内的测试代码