Activiti系列一之delegate拦截器

公司内部的工作流引擎用的是Activiti5, 所以这半年一直在研究这个开源项目,打算针对这个项目做一个系列,说一说使用心得。今天就先做系列一,先说使用场景。

基本原理

Activiti可以很好的与Spring结合,只需要使用SpringProcessEngineConfiguration配置就可以利用Spring管理Bean,所以在BPMN的标准中Activiti的扩展属性都是可以使用Spring Bean的。

例如Service Task,

1
2
<serviceTask id="N0db890f126c2" name="Service Task" activiti:delegateExpression="#{serviceTaskDelegate}">
</serviceTask>

activiti:delegateExpression中使用的就是一个Spring Bean,这个Bean实际上是一个JavaDelegate的实现。

1
2
3
4
5
6
7
8
9
10
11
12
@Service
public class ServiceTaskDelegate implements JavaDelegate {

@Autowired
private ContactGroupService contactGroupService;

@Override
public void executeDelegate(DelegateExecution execution, ContactDTO contactDTO) {
LOGGER.info("start execute action...");
LOGGER.info("end execute add group action...");
}
}

这个Bean和其他Spring Bean没有区别,可以注入其他Service,自己也会在Spring上下文中加载。

场景

接下来深入的一步讨论就是如何处理复杂的事务,一个工作流中包含若干个上述的Service Task,那么究竟是一个Task失败,整个流程就回滚还是一个失败之后,流程停止在失败的地方然后重试呢?

这些都需要根据具体的业务场景来处理,Activiti默认采取第一种办法直接全部回滚。在我们的场景中做了一些改动,当出现重试可解决的异常时全部回滚,整个流程等待若干秒后再次重试,当出现重试也无法解决的异常时,例如超出API调用次数之类的,流程直接失败并记录状态。

另外在单个ServiceTask的运行过程中,我们采取了新开一个事务的办法,避免与Activiti自身事务互相影响,另一方面也可以复用我们系统多租户的拦截器来处理复杂的数据库查询(非框架程序员不用处理租户相关的代码)。

解决方法

仔细阅读Activiti源码,发现在配置类中有一个processEngineConfiguration.setDelegateInterceptor方法,这个拦截器是在具体的Delegate启动之前调用的,所以就给了我们一个时机切入到业务逻辑之前。

不含糊,直接贴代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class CustomDelegateInterceptor implements DelegateInterceptor {

private static Logger LOGGER = LoggerFactory.getLogger(ActivitiWorkflowManager.class);

private final TenantResolver tenantResolver;

public CustomDelegateInterceptor(TenantResolver tenantResolver) {
this.tenantResolver = tenantResolver;
}

@Override
public void handleInvocation(DelegateInvocation invocation) throws Exception {
Object target = invocation.getTarget();
if (target instanceof JavaDelegate) {
Field executionField = ReflectionUtils.findField(JavaDelegateInvocation.class, "execution", DelegateExecution.class);
ReflectionUtils.makeAccessible(executionField);
DelegateExecution execution = (DelegateExecution) ReflectionUtils.getField(executionField, invocation);
// 异步任务需要设置TenantId
LOGGER.info("Executing activiti service task tenantId [{}]", execution.getTenantId());
if (StringUtils.isBlank(execution.getTenantId())) {
LOGGER.error("do not have tenantId, skipped");
return;
}
try {
tenantResolver.setCurrentTenant(Long.valueOf(execution.getTenantId()));
String traceId = execution.getProcessBusinessKey() + ":" + execution.getProcessInstanceId();
LogTraceUtils.beginTrace(traceId);
invocation.proceed();
} finally {
tenantResolver.clear();
LogTraceUtils.endTrace();
}
} else {
invocation.proceed();
}

}
}

在每个Service Task执行前,都会获取当前这个流程的租户ID并且写入到一个专门管理租户ID的ThreadLocal中。并且切入了一段日志逻辑方便排错。

总结

有些时候就是这样一个不经意的小函数就可以优雅的实现一个类似AOP的拦截。往往这种小函数在文档中是只字不提的,可见对于开源项目源码阅读的重要性。