当前位置:网站首页>Floating source code comment (38) parallel job processor
Floating source code comment (38) parallel job processor
2022-07-03 19:08:00 【jinyangjie0】
Flowable Source code address :https://github.com/flowable/flowable-engine
Flowable-6.7.2 Source code comment address :https://github.com/solojin/flowable-6.7.2-annotated
Package path :org.flowable.engine.impl.jobexecutor
ParallelMultiInstanceWithNoWaitStatesAsyncLeaveJobHandler Parallel multi instance asynchronous leave job processor without waiting state
/** * Parallel multi instance asynchronous leave job processor without waiting state * * @author Joram Barrez */
public class ParallelMultiInstanceWithNoWaitStatesAsyncLeaveJobHandler implements JobHandler {
public static final String TYPE = "parallel-multi-instance-no-waits-async-leave";
@Override
public String getType() {
return TYPE;
}
@Override
public void execute(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) {
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext);
ExecutionEntityManager executionEntityManager = processEngineConfiguration.getExecutionEntityManager();
ExecutionEntity execution = executionEntityManager.findById(job.getExecutionId());
if (execution != null) {
FlowElement currentFlowElement = execution.getCurrentFlowElement();
if (currentFlowElement instanceof Activity) {
Object behavior = ((Activity) currentFlowElement).getBehavior();
if (behavior instanceof ParallelMultiInstanceBehavior) {
ParallelMultiInstanceBehavior parallelMultiInstanceBehavior = (ParallelMultiInstanceBehavior) behavior;
DelegateExecution multiInstanceRootExecution = ExecutionGraphUtil.getMultiInstanceRootExecution(execution);
if (multiInstanceRootExecution != null) {
// Optimize the activity count here : If there is still activity execution in the database , There is no need to do anything :
// No need to get any variables , If completed 、 Activities of the nr etc. .
// This work can be simply rearranged , And when things are not finished , It will try the same logic again .
long activeChildExecutionCount = executionEntityManager.countActiveExecutionsByParentId(multiInstanceRootExecution.getId());
if (activeChildExecutionCount > 0) {
List<String> boundaryEventActivityIds = ExecutionGraphUtil.getBoundaryEventActivityIds(multiInstanceRootExecution);
if (boundaryEventActivityIds.isEmpty()) {
reCreateJob(processEngineConfiguration, execution);
} else {
// If all remaining executions are boundary event executions , You can keep multiple instances .
List<ExecutionEntity> boundaryEventChildExecutions = executionEntityManager
.findExecutionsByParentExecutionAndActivityIds(multiInstanceRootExecution.getId(), boundaryEventActivityIds);
if (activeChildExecutionCount <= boundaryEventChildExecutions.size()) {
leaveMultiInstance(processEngineConfiguration, execution, parallelMultiInstanceBehavior);
} else {
reCreateJob(processEngineConfiguration, execution);
}
}
} else {
leaveMultiInstance(processEngineConfiguration, execution, parallelMultiInstanceBehavior);
}
}
}
}
}
}
protected void reCreateJob(ProcessEngineConfigurationImpl processEngineConfiguration, ExecutionEntity execution) {
// This is not a common way to create jobs , Because we especially don't want asynchronous actuators to trigger .
// This operation should extract , To avoid continuous circulation .
JobService jobService = processEngineConfiguration.getJobServiceConfiguration().getJobService();
JobEntity newJob = JobUtil.createJob(execution, TYPE, processEngineConfiguration);
jobService.createAsyncJobNoTriggerAsyncExecutor(newJob, true);
jobService.insertJob(newJob);
}
protected void leaveMultiInstance(ProcessEngineConfigurationImpl processEngineConfiguration, ExecutionEntity execution,
ParallelMultiInstanceBehavior parallelMultiInstanceBehavior) {
// ParallelMultiInstanceBehavior The implementation of considers sub execution .
// So choose random sub execution instead of passing multiple instance root execution
boolean multiInstanceCompleted = parallelMultiInstanceBehavior.leaveAsync(execution);
if (!multiInstanceCompleted) {
reCreateJob(processEngineConfiguration, execution);
}
}
}
ParallelMultiInstanceActivityCompletionJobHandler Parallel multi instance activity completion job processor
/** * Parallel multi instance activity completion job processor * * @author Filip Hrisafov */
public class ParallelMultiInstanceActivityCompletionJobHandler implements JobHandler {
public static final String TYPE = "parallel-multi-instance-complete";
@Override
public String getType() {
return TYPE;
}
@Override
public void execute(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) {
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext);
ExecutionEntity completingExecution = processEngineConfiguration.getExecutionEntityManager().findById(job.getExecutionId());
if (completingExecution != null) {
// It is possible that the execution completed (through another thread). In that case we ignore it.
FlowElement currentFlowElement = completingExecution.getCurrentFlowElement();
if (currentFlowElement instanceof Activity) {
Object behavior = ((Activity) currentFlowElement).getBehavior();
if (behavior instanceof ParallelMultiInstanceBehavior) {
ParallelMultiInstanceBehavior parallelMultiInstanceBehavior = (ParallelMultiInstanceBehavior) behavior;
parallelMultiInstanceBehavior.leaveAsync(completingExecution);
}
}
}
}
}
边栏推荐
- EGO Planner代码解析bspline_optimizer部分(2)
- Latex image rotates with title
- Web3 credential network project galaxy is better than nym?
- High concurrency Architecture - separate databases and tables
- High concurrency architecture cache
- OSPF - detailed explanation of stub area and full stub area
- Day-27 database
- Zhengda futures news: soaring oil prices may continue to push up global inflation
- Why should the gradient be manually cleared before back propagation in pytorch?
- Record: pymysql is used in pycharm to connect to the database
猜你喜欢

KINGS

The online customer service system developed by PHP is fully open source without encryption, and supports wechat customer service docking

Pytorch introduction to deep learning practice notes 13- advanced chapter of cyclic neural network - Classification

Recommend a simple browser tab

2020 intermediate financial management (escort class)

FBI警告:有人利用AI换脸冒充他人身份进行远程面试

为什么要做特征的归一化/标准化?

leetcode:556. 下一个更大元素 III【模拟 + 尽可能少变更】

Smart wax therapy machine based on STM32 and smart cloud
![[free sharing] kotalog diary2022 plan electronic manual ledger](/img/ca/1ffbfcc16e3019261f70274a89c16f.jpg)
[free sharing] kotalog diary2022 plan electronic manual ledger
随机推荐
Nous avons fait une plateforme intelligente de règlement de détail
flask 生成swagger文档
达梦数据库的物理备份和还原简解
【数学建模】基于matlab船舶三自由度MMG模型【含Matlab源码 1925期】
Using the visualization results, click to appear the corresponding sentence
C enum contains value - C enum contains value
Record: writing MySQL commands
[leetcode weekly race] game 300 - 6110 Number of incremental paths in the grid graph - difficult
How does if ($variable) work? [repeat] - how exactly does if ($variable) work? [duplicate]
Flutter网络和数据存储框架搭建 -b1
Hard disk monitoring and analysis tool: smartctl
平淡的生活里除了有扎破皮肤的刺,还有那些原本让你魂牵梦绕的诗与远方
[optics] vortex generation based on MATLAB [including Matlab source code 1927]
application
Record: install MySQL on ubuntu18.04
Scrape crawler framework
math_泰勒公式
leetcode:11. Container with the most water [double pointer + greed + remove the shortest board]
Briefly describe the quantitative analysis system of services
[free sharing] kotalog diary2022 plan electronic manual ledger