工作流系列--ContinueProcessOperation流转源码解析

Posted by 张伟真 on 2024-11-11
Estimated Reading Time 8 Minutes
Words 1.7k In Total
Viewed Times

工作流系列–ContinueProcessOperation流转源码解析

分析

工作流节点之间怎么流转呢? 今天来详细讲讲工作流节点之间的流转是怎么流转的,涉及到的ContinueProcessOperation流转类
省流总结直接看最后总结

揭秘ContinueProcessOperation的神秘面纱

执行方法如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void run() {
// 获取当前元素
FlowElement currentFlowElement = getCurrentFlowElement(execution);
if (currentFlowElement instanceof FlowNode) {
// 当前是节点则执行continueThroughFlowNode
continueThroughFlowNode((FlowNode) currentFlowElement);
} else if (currentFlowElement instanceof SequenceFlow) {
// 当前如果是线则执行continueThroughSequenceFlow
continueThroughSequenceFlow((SequenceFlow) currentFlowElement);
} else {
throw new ActivitiException("Programmatic error: no current flow element found or invalid type: " + currentFlowElement + ". Halting.");
}
}

做的事情比较简单主要就是根据元素类型执行不同的操作

看方法continueThroughFlowNode

流转分析
这里主要做了3件事情:

  1. 如果是起始节点,触发开始节点启动执行监听器
  2. 如果是子流程:创建子执行实例execution入库并作为子流程返回。
  3. 多实例则执行多实例同步操作;强制同步操作或者非异步(默认)则执行同步操作;否则异步操作。

第一件事情

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected void executeProcessStartExecutionListeners() {
Process process = ProcessDefinitionUtil.getProcess(execution.getProcessDefinitionId());
executeExecutionListeners(process,
execution.getParent(),
ExecutionListener.EVENTNAME_START);
}

/**
* Executes the execution listeners defined on the given element, with the given event type,
* and passing the provided execution to the {@link ExecutionListener} instances.
*/
protected void executeExecutionListeners(HasExecutionListeners elementWithExecutionListeners,
ExecutionEntity executionEntity, String eventType) {
commandContext.getProcessEngineConfiguration().getListenerNotificationHelper()
.executeExecutionListeners(elementWithExecutionListeners, executionEntity, eventType);
}

层层调用最终调用
getListenerNotificationHelper().executeExecutionListeners(elementWithExecutionListeners, executionEntity, eventType);
这了的getListenerNotificationHelper的executeExecutionListeners,可以理解是做了监听器的通知后续会单独另开一篇细讲,觉得是是个重要的知识点
简单的说就是找到对应的监听器并且执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public void executeExecutionListeners(HasExecutionListeners elementWithExecutionListeners, DelegateExecution execution, String eventType) {
List<ActivitiListener> listeners = elementWithExecutionListeners.getExecutionListeners();
if (listeners != null && listeners.size() > 0) {
ListenerFactory listenerFactory = Context.getProcessEngineConfiguration().getListenerFactory();
for (ActivitiListener activitiListener : listeners) {

if (eventType.equals(activitiListener.getEvent())) {

BaseExecutionListener executionListener = null;

if (ImplementationType.IMPLEMENTATION_TYPE_CLASS.equalsIgnoreCase(activitiListener.getImplementationType())) {
executionListener = listenerFactory.createClassDelegateExecutionListener(activitiListener);
} else if (ImplementationType.IMPLEMENTATION_TYPE_EXPRESSION.equalsIgnoreCase(activitiListener.getImplementationType())) {
executionListener = listenerFactory.createExpressionExecutionListener(activitiListener);
} else if (ImplementationType.IMPLEMENTATION_TYPE_DELEGATEEXPRESSION.equalsIgnoreCase(activitiListener.getImplementationType())) {
if (activitiListener.getOnTransaction() != null) {
executionListener = listenerFactory.createTransactionDependentDelegateExpressionExecutionListener(activitiListener);
} else {
executionListener = listenerFactory.createDelegateExpressionExecutionListener(activitiListener);
}
} else if (ImplementationType.IMPLEMENTATION_TYPE_INSTANCE.equalsIgnoreCase(activitiListener.getImplementationType())) {
executionListener = (ExecutionListener) activitiListener.getInstance();
}

if (executionListener != null) {
if (activitiListener.getOnTransaction() != null) {
planTransactionDependentExecutionListener(listenerFactory, execution, (TransactionDependentExecutionListener) executionListener, activitiListener);
} else {
execution.setEventName(eventType); // eventName is used to differentiate the event when reusing an execution listener for various events
execution.setCurrentActivitiListener(activitiListener);
((ExecutionListener) executionListener).notify(execution);
execution.setEventName(null);
execution.setCurrentActivitiListener(null);
}
}
}
}
}
}

第二件事情

若是子流程,则通过当前实例创建子流程的执行execution入库ACT_RU_EXECUTION返回并把scope设置为true(子流程);删除执行实例相关的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected void createChildExecutionForSubProcess(SubProcess subProcess) {
//找父实例,也就是isScope=true的第一个实例
ExecutionEntity parentScopeExecution = findFirstParentScopeExecution(execution);

// Create the sub process execution that can be used to set variables
// We create a new execution and delete the incoming one to have a proper scope that
// does not conflict anything with any existing scopes
// 创建可用于设置变量的子流程, 执行我们创建一个新的执行并删除传入的执行以具有适当的范围,该范围不会与任何现有范围发生冲突
ExecutionEntity subProcessExecution = commandContext.getExecutionEntityManager().createChildExecution(parentScopeExecution);
subProcessExecution.setCurrentFlowElement(subProcess);
subProcessExecution.setScope(true);

commandContext.getExecutionEntityManager().deleteExecutionAndRelatedData(execution, null, false);
execution = subProcessExecution;
}

第三件事情

  1. 如果是多实例则执行executeMultiInstanceSynchronous
    a. 先执行了start的执行监听器
    b. 非补偿任务且是活动节点,则执行对应的边界事件,创建子执行相关信息
    c. 执行对应的多实例行为
    多实例行为相关的信息可查看 工作流系列–行为Behavior分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected void executeMultiInstanceSynchronous(FlowNode flowNode) {
// Execution listener: event 'start' 如果节点存在执行监听器则先执行
if (CollectionUtil.isNotEmpty(flowNode.getExecutionListeners())) {
executeExecutionListeners(flowNode, ExecutionListener.EVENTNAME_START);
}

// Execute any boundary events, sub process boundary events will be executed from the activity behavior
// 执行任何边界事件,子流程边界事件将从活动行为中执行(只有活动(排他网关和事件)可以有边界事件)
if (!inCompensation && flowNode instanceof Activity) { // Only activities can have boundary events
List<BoundaryEvent> boundaryEvents = ((Activity) flowNode).getBoundaryEvents();
if (CollectionUtil.isNotEmpty(boundaryEvents)) {
executeBoundaryEvents(boundaryEvents, execution);
}
}
// Execute the multi instance behavior
// 执行多实例的行为操作
ActivityBehavior activityBehavior = (ActivityBehavior) flowNode.getBehavior();

if (activityBehavior != null) {
executeActivityBehavior(activityBehavior, flowNode);
} else {
throw new ActivitiException("Expected an activity behavior in flow node " + flowNode.getId());
}
}
  1. 同步操作默认执行executeSynchronous,执行内容和1有点像,不一样的点在d多了一步
    a. 入库ACT_HI_ACTINST 记录历史节点行为实例
    b. 执行start类型事件监听器
    c. 执行边界事件
    d. 行为不为null则执行行为类(行为里最后出线),否则执行出线操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected void executeSynchronous(FlowNode flowNode) {
// 入库ACT_HI_ACTINST 记录历史节点行为实例
commandContext.getHistoryManager().recordActivityStart(execution);

// Execution listener: event 'start' 执行开始事件监听器
if (CollectionUtil.isNotEmpty(flowNode.getExecutionListeners())) {
executeExecutionListeners(flowNode,ExecutionListener.EVENTNAME_START);
}
// Execute any boundary events, sub process boundary events will be executed from the activity behavior
// 和上面一样执行边界事件
if (!inCompensation && flowNode instanceof Activity) { // Only activities can have boundary events
List<BoundaryEvent> boundaryEvents = ((Activity) flowNode).getBoundaryEvents();
if (CollectionUtil.isNotEmpty(boundaryEvents)) {
executeBoundaryEvents(boundaryEvents,execution);
}
}

// Execute actual behavior
ActivityBehavior activityBehavior = (ActivityBehavior) flowNode.getBehavior();

if (activityBehavior != null) {
executeActivityBehavior(activityBehavior, flowNode);
} else {
logger.debug("No activityBehavior on activity '{}' with execution {}", flowNode.getId(), execution.getId());
Context.getAgenda().planTakeOutgoingSequenceFlowsOperation(execution, true);
}
}
  1. 异步的情况则执行executeAsynchronous
    异步操作比较简单,创建异步job入库(ACT_RU_JOB)并进行调度。
1
2
3
4
protected void executeAsynchronous(FlowNode flowNode) {
JobEntity job = commandContext.getJobManager().createAsyncJob(execution, flowNode.isExclusive());
commandContext.getJobManager().scheduleAsyncJob(job);
}

如果是线则执行continueThroughSequenceFlow

话不多说,上代码看看
主要执行以下操作:
a. 执行所有的线上的监听器:执行start\take\end监听器
b. 创建并发布一个线条流转事件:表明已从源活动到目标活动的sequenceflow已经执行完
c. 获取target节点设置到execution,并继续进行流转Context.getAgenda().planContinueProcessOperation(execution);//也就是当前分析的类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
protected void continueThroughSequenceFlow(SequenceFlow sequenceFlow) {
// Execution listener. Sequenceflow only 'take' makes sense ... but we've supported all three since the beginning
// 执行所有的线上的监听器,包括开始,过程,结束. 3种状态
if (CollectionUtil.isNotEmpty(sequenceFlow.getExecutionListeners())) {
executeExecutionListeners(sequenceFlow, ExecutionListener.EVENTNAME_START);
executeExecutionListeners(sequenceFlow, ExecutionListener.EVENTNAME_TAKE);
executeExecutionListeners(sequenceFlow, ExecutionListener.EVENTNAME_END);
}

// Firing event that transition is being taken
if (Context.getProcessEngineConfiguration() != null && Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
FlowElement sourceFlowElement = sequenceFlow.getSourceFlowElement();
FlowElement targetFlowElement = sequenceFlow.getTargetFlowElement();
// 发布一个线已执行的事件
Context.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent(
ActivitiEventBuilder.createSequenceFlowTakenEvent(
(ExecutionEntity) execution,
ActivitiEventType.SEQUENCEFLOW_TAKEN,
sequenceFlow.getId(),
sourceFlowElement != null ? sourceFlowElement.getId() : null,
sourceFlowElement != null ? (String) sourceFlowElement.getName() : null,
sourceFlowElement != null ? sourceFlowElement.getClass().getName() : null,
sourceFlowElement != null ? ((FlowNode) sourceFlowElement).getBehavior() : null,
targetFlowElement != null ? targetFlowElement.getId() : null,
targetFlowElement != null ? targetFlowElement.getName() : null,
targetFlowElement != null ? targetFlowElement.getClass().getName() : null,
targetFlowElement != null ? ((FlowNode) targetFlowElement).getBehavior() : null));
}

FlowElement targetFlowElement = sequenceFlow.getTargetFlowElement();
execution.setCurrentFlowElement(targetFlowElement);

logger.debug("Sequence flow '{}' encountered. Continuing process by following it using execution {}",
sequenceFlow.getId(), execution.getId());
Context.getAgenda().planContinueProcessOperation(execution);
}

总结

1、当执行为节点时:出口为行为类(最终也是执行出线planTakeOutgoingSequenceFlowsOperation(节点)或者直接planTakeOutgoingSequenceFlowsOperation(节点)
2、当执行为线条时:出口为planContinueProcessOperation(节点)
3、在子流程行为类(SubProcessActivityBehavior)里会自动找startEvent节点进行planContinueProcessOperation(startEvent)开启子流程的流转
4、在startEvent节点行为类(NoneStartEventActivityBehavior通过父类去leave)会自动找出线leave(planTakeOutgoingSequenceFlowsOperation)


如果您喜欢此博客或发现它对您有用,则欢迎对此发表评论。 也欢迎您共享此博客,以便更多人可以参与。 如果博客中使用的图像侵犯了您的版权,请与作者联系以将其删除。 谢谢 !