An activity is defined as an item on an external queue of work, that a workflow can wait for.
In this example the workflow will wait for activity-1
, before proceeding. It also passes the value of data.Value1
to the activity, it then maps the result of the activity to data.Value2
.
Then we create a worker to process the queue of activity items. It uses the GetPendingActivity
method to get an activity and the data that a workflow is waiting for.
public class ActivityWorkflow : IWorkflow<MyData>
{
public void Build(IWorkflowBuilder<MyData> builder)
{
builder
.StartWith<HelloWorld>()
.Activity("activity-1", (data) => data.Value1)
.Output(data => data.Value2, step => step.Result)
.Then<PrintMessage>()
.Input(step => step.Message, data => data.Value2);
}
}
...
var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
if (activity != null)
{
Console.WriteLine(activity.Parameters);
host.SubmitActivitySuccess(activity.Token, "Some response data");
}
The JSON representation of this step would look like this
{
"Id": "activity-step",
"StepType": "WorkflowCore.Primitives.Activity, WorkflowCore",
"Inputs":
{
"ActivityName": "\"activity-1\"",
"Parameters": "data.Value1"
},
"Outputs": { "Value2": "step.Result" }
}
JSON / YAML API
The Activity
step can be configured using inputs as follows
Field | Description |
---|---|
CancelCondition | Optional expression to specify a cancel condition |
Inputs.ActivityName | Expression to specify the activity name |
Inputs.Parameters | Expression to specify the parameters to pass the activity worker |
Inputs.EffectiveDate | 可选,指定生效的日期 |
{
"Id": "MyActivityStep",
"StepType": "WorkflowCore.Primitives.Activity, WorkflowCore",
"NextStepId": "...",
"CancelCondition": "...",
"Inputs": {
"ActivityName": "\"my-activity\"",
"Parameters": "data.SomeValue"
}
}
Id: MyActivityStep
StepType: WorkflowCore.Primitives.Activity, WorkflowCore
NextStepId: "..."
CancelCondition: "..."
Inputs:
ActivityName: '"my-activity"'
EventKey: '"Key1"'
Parameters: data.SomeValue
Activity源码逻辑:
1、开始进入Activity节点
2、产生订阅任务
3、获得订阅任务并设置操作token
4、调用SubmitActivitySuccess(跟PublishEvent的区别是有一个token验证的过程)确认完成订阅任务,并发布事件,将在事件表产生记录
5、往下流转
6、事件消费者发现事件,并进行处理 ,同时删除订阅任务
WaitFor的源码逻辑:
1、开始进入WaitFor节点
2、产生事件订阅任务
3、调用PublishEvent
4、往下流转
5、事件消费者发现事件,并进行处理 ,同时删除订阅任务