工作流系列--完成任务命令分析

Posted by 张伟真 on 2024-11-04
Estimated Reading Time 7 Minutes
Words 1.5k In Total
Viewed Times

工作流系列–完成任务命令分析

上一篇讲到activiti使用命令模式,那究竟activiti是怎么通过命令去完成任务的,做了什么,我们一起看看
既然是跟任务相关,前面initServices初始化的时候初始了6个service,跟任务相关的,我们可以看看TaskService,定义了任务相关操作的入口,看实现最终commandExecutor去执行对应的CompleteTaskCmd命令
完成任务
完成任务
这里有不记得的可能疑惑commandExecutor是怎么来的, 可以看下initService初始化做的事情,里面就将初始化好的commandExecutor设置到各个service里面
完成任务
不管什么命令最终都是由executor去执行,既然命令交给了CommandExecutor执行,那么接下来看下它是如何执行的.
new了CompleteTaskCmd之后交由CommandExecutorImpl去执行,为什么是CommandExecutorImpl呢,因为在初始化的时候initCommandExecutor()指定的,并且CommandExecutorImpl持有默认的命令配置,以及拦截器链中的第一个拦截器
完成任务
完成任务
从源码中可以看到 CommandExecutorImpl的execute() 直接从拦截器链中的第一个拦截器开始调用。可以知道的是,它肯定会经过CommandContextInterceptor,于是在当前请求线程的局部变量中就会有一个栈(Stack),在栈的顶部放了一个CommandContext,在这个CommandContext中有待执行的Command,有processEngineConfiguration,还有agenda。
值得注意的是CommandContextInterceptor在执行的时候,通过Context.getCommandContext()去获取CommandContext,并且被设计成是每个线程私有的,就是每个线程都有自己的一个CommandContext
完成任务
初始化介绍的时候在拦截器链的最后一个拦截器是CommandInvoker,我们来看看做了啥

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class CommandInvoker extends AbstractCommandInterceptor {

private static final Logger logger = LoggerFactory.getLogger(CommandInvoker.class);

@Override
@SuppressWarnings("unchecked")
public <T> T execute(final CommandConfig config, final Command<T> command) {
// 1. 从线程中拿到上下文
final CommandContext commandContext = Context.getCommandContext();
// 2. 获取对应的agenda,要执行的线程操作加入计划任务链表
// Execute the command.
// This will produce operations that will be put on the agenda. 将命令操作放入agenda
commandContext.getAgenda().planOperation(new Runnable() {
@Override
public void run() {
commandContext.setResult(command.execute(commandContext));
}
});

// Run loop for agenda 循环执行
executeOperations(commandContext);
// 执行结束后,调用execution相关的变更事件,如果涉及的话
// At the end, call the execution tree change listeners.
// TODO: optimization: only do this when the tree has actually changed (ie check dbSqlSession).
if (commandContext.hasInvolvedExecutions()) {
Context.getAgenda().planExecuteInactiveBehaviorsOperation();
executeOperations(commandContext);
}
// 最终返回执行结果
return (T) commandContext.getResult();
}
/**
* 循环在agenda中获取是否有需要执行的命令,有则执行
*/
protected void executeOperations(final CommandContext commandContext) {
while (!commandContext.getAgenda().isEmpty()) {
Runnable runnable = commandContext.getAgenda().getNextOperation();
executeOperation(runnable);
}
}
//线程的真正启动
public void executeOperation(Runnable runnable) {
if (runnable instanceof AbstractOperation) {
AbstractOperation operation = (AbstractOperation) runnable;

// Execute the operation if the operation has no execution (i.e. it's an operation not working on a process instance)
// or the operation has an execution and it is not ended
if (operation.getExecution() == null || !operation.getExecution().isEnded()) {

if (logger.isDebugEnabled()) {
logger.debug("Executing operation {} ", operation.getClass());
}

runnable.run();

}

} else {
runnable.run();
}
}

@Override
public CommandInterceptor getNext() {
return null;
}
// 限制了CommandInvoker 必须是最后一个拦截器
@Override
public void setNext(CommandInterceptor next) {
throw new UnsupportedOperationException("CommandInvoker must be the last interceptor in the chain");
}

}

关于agenda 后续专门开一篇文章去讲,agenda是流程引擎一个很重要的东西,可以理解是一个任务项或者任务池, 每一个命令或者操作是就开启一个任务池,然后命令要做的事情都会拆解,然后放入agenda里面,最后按顺序执行,直到任务都做完

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected Void execute(CommandContext commandContext, TaskEntity task) {
if (variables != null) {
if (localScope) {
task.setVariablesLocal(variables);
} else if (task.getExecutionId() != null) {
task.setExecutionVariables(variables);
} else {
task.setVariables(variables);
}
}

if (transientVariables != null) {
if (localScope) {
task.setTransientVariablesLocal(transientVariables);
} else {
task.setTransientVariables(transientVariables);
}
}

setTaskVariables(task.getVariablesLocal());


executeTaskComplete(commandContext, task, variables, localScope);
return null;
}

看任务完成命令源码主要进行了:
1、做了变量的绑定(更新变量缓存,入库ACT_HI_VARINST历史变量实例表)
2、执行完成任务executeTaskComplete, 可以知道最终离开由agenda去驱动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
protected void executeTaskComplete(CommandContext commandContext, TaskEntity taskEntity, Map<String, Object> variables, boolean localScope) {
// Task complete logic

if (taskEntity.getDelegationState() != null && taskEntity.getDelegationState().equals(DelegationState.PENDING)) {
throw new ActivitiException("A delegated task cannot be completed, but should be resolved instead.");
}
//1.触发complete监听器
commandContext.getProcessEngineConfiguration().getListenerNotificationHelper().executeTaskListeners(taskEntity, TaskListener.EVENTNAME_COMPLETE);
if (Authentication.getAuthenticatedUserId() != null && taskEntity.getProcessInstanceId() != null) {
ExecutionEntity processInstanceEntity = commandContext.getExecutionEntityManager().findById(taskEntity.getProcessInstanceId());
//设置为参与者入库ACT_RU_IDENTITYLINK表(若该用户未与流程实例关联)
commandContext.getIdentityLinkEntityManager().involveUser(processInstanceEntity, Authentication.getAuthenticatedUserId(),IdentityLinkType.PARTICIPANT);
}
//2.分发器触发发布"任务完成"事件
ActivitiEventDispatcher eventDispatcher = Context.getProcessEngineConfiguration().getEventDispatcher();
if (eventDispatcher.isEnabled()) {
if (variables != null) {
eventDispatcher.dispatchEvent(ActivitiEventBuilder.createEntityWithVariablesEvent(ActivitiEventType.TASK_COMPLETED, taskEntity, variables, localScope));
} else {
eventDispatcher.dispatchEvent(ActivitiEventBuilder.createEntityEvent(ActivitiEventType.TASK_COMPLETED, taskEntity));
}
}
//3.删除任务
//删除该task下的子任务、人员、变量;更新历史任务的完成事件耗时等
commandContext.getTaskEntityManager().deleteTask(taskEntity, null, false, false);

// Continue process (if not a standalone task)
if (taskEntity.getExecutionId() != null) {
ExecutionEntity executionEntity = commandContext.getExecutionEntityManager().findById(taskEntity.getExecutionId());
//4.触发执行实例,通过agenda执行离开任务操作
Context.getAgenda().planTriggerExecutionOperation(executionEntity);
}
}

在步奏三中删除了任务,源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Override
public void deleteTask(TaskEntity task, String deleteReason, boolean cascade, boolean cancel) {
// 当任务未被删除则执行
if (!task.isDeleted()) {
//1、触发绑定在task上delete事件类型的监听器
getProcessEngineConfiguration().getListenerNotificationHelper()
.executeTaskListeners(task, TaskListener.EVENTNAME_DELETE);
task.setDeleted(true);

String taskId = task.getId();
//2、去ACT_RU_TASK查是否有子任务,如果有则递归删除
List<Task> subTasks = findTasksByParentTaskId(taskId);
for (Task subTask : subTasks) {
deleteTask((TaskEntity) subTask, deleteReason, cascade, cancel);
}
//3、去ACT_RU_IDENTITYLINK查该task下的所有人员并且遍历删除
getIdentityLinkEntityManager().deleteIdentityLinksByTaskId(taskId);
//4、删除该task下所有ACT_RU_VARIABLE变量数据
getVariableInstanceEntityManager().deleteVariableInstanceByTask(task);

if (cascade) {
// 如果设置了级联删除,则删除对应的历史级联
getHistoricTaskInstanceEntityManager().delete(taskId);
} else {
//获取ACT_HI_TASKINST 历史task实例设置删除原因、结束事件、耗时
getHistoryManager().recordTaskEnd(taskId, deleteReason);
}
//删除ACT_RU_TASK表记录
delete(task, false);

if (getEventDispatcher().isEnabled()) {
//取消任务时,创建并触发userTask任务取消事件
if (cancel && !task.isCanceled()) {
task.setCanceled(true);
getEventDispatcher().dispatchEvent(
ActivitiEventBuilder.createActivityCancelledEvent(task.getExecution() != null ? task.getExecution().getActivityId() : null,
task.getName(),
//temporary fix for standalone tasks
task.getExecutionId() != null ? task.getExecutionId() : task.getId(),
task.getProcessInstanceId(),
task.getProcessDefinitionId(),
"userTask",
deleteReason));
}
getEventDispatcher().dispatchEvent(ActivitiEventBuilder.createEntityEvent(ActivitiEventType.ENTITY_DELETED, task));
}
}
}

如果您喜欢此博客或发现它对您有用,则欢迎对此发表评论。 也欢迎您共享此博客,以便更多人可以参与。 如果博客中使用的图像侵犯了您的版权,请与作者联系以将其删除。 谢谢 !