当前位置:网站首页>Apache dolphin scheduler source code analysis (super detailed)
Apache dolphin scheduler source code analysis (super detailed)
2022-07-06 06:44:00 【Yang Linwei】
List of articles
01 introduction
Apache DolphinScheduler Official document address :https://dolphinscheduler.apache.org/zh-cn/index.html
Apache DolphinScheduler It's a distributed decentralization , Extensible visualization DAG Workflow task scheduling platform . Committed to solving the complex dependencies in the data processing process , Make scheduling system use out of the box in data processing flow .
The schematic diagram is as follows :
Next , This article explains its source code in detail step by step .
02 DolphinScheduler Project structure
2.1 structural analysis
use first IDEA
from Github clone
Project to local , Project address :https://github.com/apache/dolphinscheduler
After importing the project , You can see Its main core modules are as follows :
modular | describe |
---|---|
dolphinscheduler-alert | Alarm module , Provide AlertServer service . |
dolphinscheduler-api | web Application module , Provide ApiServer service . |
dolphinscheduler-common | General constant enumeration 、 Tool class 、 Data structure or base class |
dolphinscheduler-dao | Provide database access and other operations . |
dolphinscheduler-remote | be based on netty The client of 、 Server side |
dolphinscheduler-server | MasterServer and WorkerServer service |
dolphinscheduler-service | service modular , contain Quartz、Zookeeper、 Log client access service , Easy server Module and api Module call |
dolphinscheduler-ui | Front end module |
2.2 Table analysis
In the project /dolphinscheduler-dao/src/main/resources/sql/create/release-1.0.0_schema/mysql
Under the table of contents , There are database initialization scripts dolphinscheduler_ddl.sql And dolphinscheduler_dml.sql Scripts and some upgrade scripts :
After the execution , You can see the following table in the database :
Table name | Table information |
---|---|
t_ds_access_token | visit ds Back end token |
t_ds_alert | The alarm information |
t_ds_alertgroup | Alarm group |
t_ds_command | Carry out orders |
t_ds_datasource | data source |
t_ds_error_command( Core table ) | Wrong command |
t_ds_process_definition( Core table ) | Process definition |
t_ds_process_instance( Core table ) | Process instance |
t_ds_project | project |
t_ds_queue | queue |
t_ds_relation_datasource_user | User associated data source |
t_ds_relation_process_instance | Sub process |
t_ds_relation_project_user | User associated items |
t_ds_relation_resources_user | User associated resources |
t_ds_relation_udfs_user | User Association UDF function |
t_ds_relation_user_alertgroup | User associated alarm group |
t_ds_resources | Resource file |
t_ds_schedules( Core table ) | Process scheduling |
t_ds_session | User logged in session |
t_ds_task_instance( Core table ) | Task instance |
t_ds_tenant | Tenant |
t_ds_udfs | UDF resources |
t_ds_user | user |
t_ds_version | ds Version information |
The core table can be seen directly in the appendix at the end of the text .
2.2.1 Class diagram ( user / queue / data source )
Described below :
- There can be multiple users under a tenant ;
t_ds_user
Mediumqueue
The field storesqueue_name
Information ;t_ds_tenant
What is saved isqueue_id
, During the implementation of process definition , User queue has the highest priority , If the user queue is empty, the tenant queue ;t_ds_datasource
In the tableuser_id
Field indicates the user who created the data source ;t_ds_relation_datasource_user
Mediumuser_id
Express , Users who have permissions on the data source .
2.2.2 Class diagram ( project / resources / The alarm )
Described below :
- A user can have multiple projects , User project authorization passed t_ds_relation_project_user Table complete project_id and user_id The relationship binding of ;
- t_ds_projcet In the table user_id Indicates the user who created the project ;
- t_ds_relation_project_user In the table user_id Indicates the user who has permission to the project ;
- t_ds_resources In the table user_id Indicates the user who created the resource ;
- t_ds_relation_resources_user Medium user_id Indicates the user who has permission to the resource ;
- t_ds_udfs In the table user_id Indicates that the UDF Users of ;
- t_ds_relation_udfs_user In the table user_id Said to UDF Users with permissions .
2.2.3 Class diagram ( command / technological process / Mission )
Described below :
- A project has multiple process definitions , A process definition can generate multiple process instances , A process instance can generate multiple task instances ;
- t_ds_schedulers Table stores the scheduled scheduling information defined by the process ;
- t_ds_relation_process_instance The data stored in the table is used to deal with the situation that there are sub processes in the process definition ,parent_process_instance_id Represents an instance of the main process with sub processes id,process_instance_id That represents a subprocess instance id,parent_task_instance_id Represents the task instance of the sub process node id, The process instance table and task instance table correspond to t_ds_process_instance Table and t_ds_task_instance surface
03 DolphinScheduler Source code analysis
Before explaining the source code , First post a startup flow chart of the official website :
3.1 ExecutorController
According to the above flow chart , You can see , We are UI Layer Click “ Start button ” after , Will visit dophinscheduler-api
The directory Api Server
service , Will enter the ExecutorController
( Full classpath :org.apache.dolphinscheduler.api.controller.ExecutorController), Let's see below. ExecutorController
What interface methods are provided ?
The following is a description of each interface :
Interface | describe |
---|---|
/start-process-instance | Execution process instance |
/batch-start-process-instance | Batch execution process instances |
/execute | Operation process instance , Such as : Pause , stop it , Heavy run , Resume from pause , Resume from stop |
/batch-execute | Batch operation process instance |
/start-check | Check the process definition or check whether all sub process definitions are online |
Next, let's take a look at the core execute
Method :
/** * do action to process instance: pause, stop, repeat, recover from pause, recover from stop * * @param loginUser login user * @param projectCode project code * @param processInstanceId process instance id * @param executeType execute type * @return execute result code */
@ApiOperation(value = "execute", notes = "EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "executeType", value = "EXECUTE_TYPE", required = true, dataType = "ExecuteType")
})
@PostMapping(value = "/execute")
@ResponseStatus(HttpStatus.OK)
@ApiException(EXECUTE_PROCESS_INSTANCE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result execute(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam("processInstanceId") Integer processInstanceId,
@RequestParam("executeType") ExecuteType executeType
) {
Map<String, Object> result = execService.execute(loginUser, projectCode, processInstanceId, executeType);
return returnDataList(result);
}
You can see execute Interface , Direct use ExecService To execute , Let's analyze .
3.2 ExecService
Let's take a look inside execute Method , Comments have been added :
/** * Operate workflow instances * * @param loginUser The logged in user * @param projectCode Project code * @param processInstanceId Process instance ID * @param executeType Execution type (repeat running、resume pause、resume failure、stop、pause) * @return Execution results */
@Override
public Map<String, Object> execute(User loginUser, long projectCode, Integer processInstanceId, ExecuteType executeType) {
/*** Query project information **/
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
/*** Judge whether the current user has operation permission **/
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, ApiFuncIdentificationConstant.map.get(executeType));
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
/*** Check Master Whether the node exists **/
if (!checkMasterExists(result)) {
return result;
}
/*** Query workflow instance details **/
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
if (processInstance == null) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
/*** According to the process definition bound by the workflow instance ID Query process definition **/
ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) {
/*** Verify whether the workflow definition can be executed ( Whether the workflow exists ? Whether it is online ? There are sub workflow definitions that are not online ?) **/
result = checkProcessDefinitionValid(projectCode, processDefinition, processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
}
/*** According to the status of the current workflow instance, judge whether the corresponding executeType Type of operation **/
result = checkExecuteType(processInstance, executeType);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
/*** Judge whether the right tenant has been selected **/
if (!checkTenantSuitable(processDefinition)) {
logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ",
processDefinition.getId(), processDefinition.getName());
putMsg(result, Status.TENANT_NOT_SUITABLE);
}
/*** stay executeType In the state of re running , Get the startup parameters specified by the user **/
Map<String, Object> commandMap = JSONUtils.parseObject(processInstance.getCommandParam(), new TypeReference<Map<String, Object>>() {
});
String startParams = null;
if (MapUtils.isNotEmpty(commandMap) && executeType == ExecuteType.REPEAT_RUNNING) {
Object startParamsJson = commandMap.get(Constants.CMD_PARAM_START_PARAMS);
if (startParamsJson != null) {
startParams = startParamsJson.toString();
}
}
/*** According to ExecuteType To perform the corresponding operation **/
switch (executeType) {
case REPEAT_RUNNING: // Heavy run
result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams);
break;
case RECOVER_SUSPENDED_PROCESS: // Resume the mounted workflow
result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams);
break;
case START_FAILURE_TASK_PROCESS: // Start the failed workflow
result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams);
break;
case STOP: // stop it
if (processInstance.getState() == ExecutionStatus.READY_STOP) {
putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
} else {
result = updateProcessInstancePrepare(processInstance, CommandType.STOP, ExecutionStatus.READY_STOP);
}
break;
case PAUSE: // Pause
if (processInstance.getState() == ExecutionStatus.READY_PAUSE) {
putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
} else {
result = updateProcessInstancePrepare(processInstance, CommandType.PAUSE, ExecutionStatus.READY_PAUSE);
}
break;
default:
logger.error("unknown execute type : {}", executeType);
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type");
break;
}
return result;
}
You can see , The first half of the above code is mainly for verification , The second half is to do different operations according to the execution type , The operation is mainly divided into two parts :insertCommand as well as updateProcessInstancePrepare.
3.2.1 insertCommand
The method code is as follows , In fact, the main thing is to insert the generated command t_ds_command( Execute the command table ), Insert comments that have been added :
/** * Insert command (re run, recovery (pause / failure) execution) * * @param loginUser The logged in user * @param instanceId Workflow instance id * @param processDefinitionCode Workflow definition id * @param processVersion Workflow version * @param commandType Command type * @return Operating results */
private Map<String, Object> insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, int processVersion, CommandType commandType, String startParams) {
Map<String, Object> result = new HashMap<>();
/*** Encapsulate startup parameters **/
Map<String, Object> cmdParam = new HashMap<>();
cmdParam.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, instanceId);
if (!StringUtils.isEmpty(startParams)) {
cmdParam.put(CMD_PARAM_START_PARAMS, startParams);
}
Command command = new Command();
command.setCommandType(commandType);
command.setProcessDefinitionCode(processDefinitionCode);
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
command.setExecutorId(loginUser.getId());
command.setProcessDefinitionVersion(processVersion);
command.setProcessInstanceId(instanceId);
/*** Determine whether the workflow instance is executing **/
if (!processService.verifyIsNeedCreateCommand(command)) {
putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, String.valueOf(processDefinitionCode));
return result;
}
/*** Save command **/
int create = processService.createCommand(command);
if (create > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
}
return result;
}
3.2.2 updateProcessInstancePrepare
The method code is as follows , Comments have been added
/** * Prepare to update the command type and status of the workflow instance * * @param processInstance Workflow instance * @param commandType Command type * @param executionStatus Execution status * @return Update results */
private Map<String, Object> updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType, ExecutionStatus executionStatus) {
Map<String, Object> result = new HashMap<>();
processInstance.setCommandType(commandType);
processInstance.addHistoryCmd(commandType);
processInstance.setState(executionStatus);
int update = processService.updateProcessInstance(processInstance);
// Judge whether the process is normal
if (update > 0) {
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
processInstance.getId(), 0, processInstance.getState(), processInstance.getId(), 0
);
Host host = new Host(processInstance.getHost());
stateEventCallbackService.sendResult(host, stateEventChangeCommand.convert2Command());
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
}
return result;
}
According to the flow chart , We can see that the code with the following red box has been executed , That is to put our command It's cached to DB. Next we need to see Master Code for .
3.3 MasterServer
MasterSerer In the project /dolphinscheduler-dev/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
Catalog . It corresponds to this part of the architecture diagram :
The code and comments are as follows :
@SpringBootApplication
@ComponentScan("org.apache.dolphinscheduler")
@EnableTransactionManagement
@EnableCaching
public class MasterServer implements IStoppable {
private static final Logger logger = LoggerFactory.getLogger(MasterServer.class);
@Autowired
private SpringApplicationContext springApplicationContext;
@Autowired
private MasterRegistryClient masterRegistryClient;
@Autowired
private TaskPluginManager taskPluginManager;
@Autowired
private MasterSchedulerService masterSchedulerService;
@Autowired
private SchedulerApi schedulerApi;
@Autowired
private EventExecuteService eventExecuteService;
@Autowired
private FailoverExecuteThread failoverExecuteThread;
@Autowired
private MasterRPCServer masterRPCServer;
public static void main(String[] args) {
Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
SpringApplication.run(MasterServer.class);
}
/** * start-up master server */
@PostConstruct
public void run() throws SchedulerException {
// initialization RPC service
this.masterRPCServer.start();
// Install the task plug-in
this.taskPluginManager.installPlugin();
/*** MasterServer Register client , Used to connect to the registry and pass registry events . * When the master node starts , It will be registered in the registry , And dispatch a {@link HeartBeatTask} To update metadata in the registry **/
this.masterRegistryClient.init();
this.masterRegistryClient.start();
this.masterRegistryClient.setRegistryStoppable(this);
// Main scheduler thread , This thread will use the command from the database and trigger the execution processInstance.
this.masterSchedulerService.init();
this.masterSchedulerService.start();
this.eventExecuteService.start();
this.failoverExecuteThread.start();
// This is the interface of the scheduler , Contains methods for operating scheduling tasks .
this.schedulerApi.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (Stopper.isRunning()) {
close("MasterServer shutdownHook");
}
}));
}
/** * Elegant closing method * * @param cause The reason for closing */
public void close(String cause) {
try {
// set stop signal is true
// execute only once
if (!Stopper.stop()) {
logger.warn("MasterServer is already stopped, current cause: {}", cause);
return;
}
logger.info("Master server is stopping, current cause : {}", cause);
// thread sleep 3 seconds for thread quietly stop
ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
// close
this.schedulerApi.close();
this.masterSchedulerService.close();
this.masterRPCServer.close();
this.masterRegistryClient.closeRegistry();
// close spring Context and will invoke method with @PreDestroy annotation to destroy beans.
// like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
springApplicationContext.close();
logger.info("MasterServer stopped, current cause: {}", cause);
} catch (Exception e) {
logger.error("MasterServer stop failed, current cause: {}", cause, e);
}
}
@Override
public void stop(String cause) {
close(cause);
}
}
stay run Method inside , You can see , Mainly in turn :
- ① MasterRPCServer.start(): start-up master Of rpc service ;
- ② TaskPluginManager.installPlugin(): Install the task plug-in ;
- ③ MasterRegistryClient.start(): towards Zookeeper register MasterServer;
- ④ MasterSchedulerService.start(): Main scheduler thread , This thread will use the command from the database and trigger the execution processInstance.
- ⑤ EventExecuteService.start(): Workflow instance execution
- ⑥ FailoverExecuteThread(): Failover detection
- ⑦ SchedulerApi.start():scheduler Interface to operate task instances
3.3.1 MasterRPCServer
Master RPC Server It is mainly used to send or receive requests to other systems .
The initialization method is as follows :
@PostConstruct
private void init() {
// Initialize the remote service
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL, taskRecallProcessor);
// The log service
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.start();
}
3.3.2 TaskPluginManager
Plug in installation manager , adopt SPI Mechanism to load data source plug-ins , The plug-ins are in the following directory :
As long as each data source plug-in implements the specified factory , Such as Spark:
The core code is as follows , And pass loadTaskChannel ()
Method , hold factory
Cache into Map<String, TaskChannel>
aggregate :
3.3.3 MasterRegistryClient
We know DolphinScheduler It uses Decentralized thinking , therefore MasterRegistryClient The main function is to register MasterServer client , Used to connect to the registry and pass registry events . When Master When the node starts , It will be registered in the registry , And dispatch a HeartBeatTask To update metadata in the registry .
Get into RegistryClient Inside subscribe Method , You can see that there are two kinds of registration centers , One is MySQL Of , The other is Zookeeper Of , It should be used by default here ZookeeperRegistry.
3.3.4 MasterSchedulerService
Its init and run The method is as follows ,init It mainly initializes the queue of a workflow instance :
Look at the actual operation inside run Method , You can see that there is an endless cycle , Keep doing it scheduleWorkflow() Method :
Look at the inside scheduleWorkflow() Method , Notes have been written :
/** * Query commands by slot from the database , Convert to workflow instance , And submit it to workflowExecuteThreadPool. */
private void scheduleWorkflow() throws InterruptedException, MasterException {
// Query commands by slot from the database
List<Command> commands = findCommands();
if (CollectionUtils.isEmpty(commands)) {
// indicate that no command ,sleep for 1s
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
return;
}
// Convert to workflow instance
List<ProcessInstance> processInstances = command2ProcessInstance(commands);
if (CollectionUtils.isEmpty(processInstances)) {
// indicate that the command transform to processInstance error, sleep for 1s
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
return;
}
MasterServerMetrics.incMasterConsumeCommand(commands.size());
for (ProcessInstance processInstance : processInstances) {
// Submit to workflowExecuteThreadPool
submitProcessInstance(processInstance);
}
}
The method of submitting workflow instances is as follows , Notice the submission workflowExecuteThreadPool:
/** * Submit workflow instances to workflowExecuteThreadPool * * @param processInstance Workflow instance */
private void submitProcessInstance(@NonNull ProcessInstance processInstance) {
try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
logger.info("Master schedule service starting workflow instance");
// Encapsulate workflow instances Runnable
final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
processInstance
, processService
, nettyExecutorManager
, processAlertManager
, masterConfig
, stateWheelExecuteThread
, curingGlobalParamsService);
this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
if (processInstance.getTimeout() > 0) {
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
}
ProcessInstanceMetrics.incProcessInstanceSubmit();
// Submit the encapsulated workflow instance Runnable to workflowExecuteThreadPool
CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture = CompletableFuture.supplyAsync(
workflowExecuteRunnable::call, workflowExecuteThreadPool);
workflowSubmitFuture.thenAccept(workflowSubmitStatue -> {
if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) {
// submit failed
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
submitFailedProcessInstances.add(processInstance);
}
});
logger.info("Master schedule service started workflow instance");
} catch (Exception ex) {
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex);
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
}
3.3.5 EventExecuteService
EventExecuteService There is no class comment , Guess should be used to pass the execution event of each workflow instance .
3.3.6 FailoverExecuteThread
FailoverExecuteThread Detect threads for failover . every other 10 Check once per second , The core approach is FailoverService
Inside failoverMasterWithLock()
And failoverMaster()
Method :
3.3.7 SchedulerApi
SchedulerApi It is the interface to operate scheduling task instances , Its main functions are Start the scheduler 、 Insert or update scheduling tasks 、 Delete scheduled task 、 Close scheduling tasks and release resources .
3.3.8 TaskPriorityQueueConsumer
According to the flow chart , We can know TaskConsumer
Get split tasks from the queue , Then forward it to Worker perform .
Let's see. TaskPriorityQueueConsumer
The core code in , You can see the batch distribution task method :
Get into batchDispatch Method , You can see that the task is obtained from the queue and distributed :
Finally, distribute the task to worker:
ok, Come here , We can see worker Part of the code .
3.4 WorkerServer
MasterSerer In the project /dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
Catalog . Its main program code is similar to MasterServer Almost the same , The code and comments are as follows :
@PostConstruct
public void run() {
// worker rpc service
this.workerRpcServer.start();
// Task plug-in installation
this.taskPluginManager.installPlugin();
// towards Zookeeper Register client
this.workerRegistryClient.registry();
this.workerRegistryClient.setRegistryStoppable(this);
Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths();
this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
// management Worker Threads
this.workerManagerThread.start();
// Reporting status thread
this.retryReportTaskStatusThread.start();
/* * registry hooks, which are called before the process exits */
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (Stopper.isRunning()) {
close("WorkerServer shutdown hook");
}
}));
}
According to the flow chart provided on the official website , What we need to pay attention to is TaskExecutePorcessor:
3.4.1 TaskExecutePorcessor
TaskExecuteProcessor stay /dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
Catalog , Beyond all doubt , Its core method is :
@Counted(value = "ds.task.execution.count", description = "task execute total count")
@Timed(value = "ds.task.execution.duration", percentiles = {
0.5, 0.75, 0.95, 0.99}, histogram = true)
@Override
public void process(Channel channel, Command command) {
// code ...
}
There is a core code in it , Submit tasks TaskExecuteThread
To the manager :
So let's see TaskExecuteThread Code for .
3.4.2 TaskExecuteThread
TaskExecuteThread It's the code that finally executes the task , Inside run The method is as follows , Annotated :
@Override
public void run() {
// dry run Preview mode
if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS);
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setEndTime(new Date());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
logger.info("[WorkflowInstance-{}][TaskInstance-{}] Task dry run success",
taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
return;
}
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
logger.info("script path : {}", taskExecutionContext.getExecutePath());
if (taskExecutionContext.getStartTime() == null) {
taskExecutionContext.setStartTime(new Date());
}
logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());
// Callback task execution status
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
taskCallbackService.sendTaskExecuteRunningCommand(taskExecutionContext);
// Copy hdfs/minio File to local
List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources());
if (!fileDownloads.isEmpty()) {
downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads);
}
taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
taskExecutionContext.setTaskAppId(String.format("%s_%s",
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));
TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
if (null == taskChannel) {
throw new ServiceException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType()));
}
String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setTaskLogName(taskLogName);
// Set the name of the current thread
Thread.currentThread().setName(taskLogName);
task = taskChannel.createTask(taskExecutionContext);
// Execute task plug-in method - init
this.task.init();
//init varPool
this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());
// Execute task plug-in method - handle
this.task.handle();
// Determine whether to send an alarm
if (this.task.getNeedAlert()) {
sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus().getCode());
}
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.of(this.task.getExitStatus().getCode()));
taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
taskExecutionContext.setProcessId(this.task.getProcessId());
taskExecutionContext.setAppIds(this.task.getAppIds());
taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus());
} catch (Throwable e) {
logger.error("task scheduler failure", e);
kill();
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
taskExecutionContext.setProcessId(this.task.getProcessId());
taskExecutionContext.setAppIds(this.task.getAppIds());
} finally {
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
clearTaskExecPath();
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
04 appendix
4.1 Core table
① t_ds_process_definition( Process definition form ):
Field | type | notes |
---|---|---|
id | int | Primary key |
name | varchar | Process definition name |
version | int | Process definition version |
release_state | tinyint | Release status of process definition :0 Not online 1 Online |
project_id | int | project id |
user_id | int | The user of the process definition id |
process_definition_json | longtext | Process definition json strand |
description | text | Process definition description |
global_params | text | Global parameter |
flag | tinyint | Whether the process is available :0 Unavailable ,1 You can use |
locations | text | Node coordinate information |
connects | text | Node connection information |
receivers | text | The recipient |
receivers_cc | text | Cc people |
create_time | datetime | Creation time |
timeout | int | Timeout time |
tenant_id | int | Tenant id |
update_time | datetime | Update time |
modify_by | varchar | Modify the user |
resource_ids | varchar | resources id Set |
② t_ds_process_instance( Process instance table ):
Field | type | notes |
---|---|---|
id | int | Primary key |
name | varchar | Process instance name |
process_definition_id | int | Process definition id |
state | tinyint | Process instance status :0 Submit successfully ,1 Running ,2 Ready to pause ,3 Pause ,4 Ready to stop ,5 stop it ,6 Failure ,7 success ,8 Need fault tolerance ,9 kill,10 Wait for thread ,11 Wait for dependency to complete |
recovery | tinyint | Process instance fault tolerance ID :0 normal ,1 It needs to be restarted by fault tolerance |
start_time | datetime | Process instance start time |
end_time | datetime | Process instance end time |
run_times | int | Number of process instance runs |
host | varchar | The machine where the process instance is located |
command_type | tinyint | Command type :0 Start workflow ,1 Execute from the current node ,2 Restore the fault tolerant workflow ,3 Resume the suspended process ,4 Starting from the failed node ,5 Complement number ,6 Dispatch ,7 Heavy run ,8 Pause ,9 stop it ,10 Thread recovery wait |
command_param | text | Arguments to the command (json Format ) |
task_depend_type | tinyint | Node dependency type :0 Current node ,1 Forward execution ,2 Backward execution |
max_try_times | tinyint | max retries |
failure_strategy | tinyint | Failure strategy 0 End after failure ,1 Continue after failure |
warning_type | tinyint | Alarm type :0 No hair ,1 The process is successfully sent ,2 Process failure occurs ,3 Success or failure will happen |
warning_group_id | int | Alarm group id |
schedule_time | datetime | Expected running time |
command_start_time | datetime | Start command time |
global_params | text | Global parameter ( Parameters defined by solidification process ) |
process_instance_json | longtext | Process instance json(copy Process defined json) |
flag | tinyint | Is it available ,1 You can use ,0 Unavailable |
update_time | timestamp | Update time |
is_sub_process | int | Whether it is a sub workflow 1 yes ,0 No |
executor_id | int | Command execution user |
locations | text | Node coordinate information |
connects | text | Node connection information |
history_cmd | text | History commands , Record all operations on process instances |
dependence_schedule_times | text | Estimated time of dependent nodes |
process_instance_priority | int | Process instance priority :0 Highest,1 High,2 Medium,3 Low,4 Lowest |
worker_group | varchar | The task is specified to run worker grouping |
timeout | int | Timeout time |
tenant_id | int | Tenant id |
③ t_ds_task_instance( Task example table ):
Field | type | notes |
---|---|---|
id | int | Primary key |
name | varchar | The name of the task |
task_type | varchar | Task type |
process_definition_id | int | Process definition id |
process_instance_id | int | Process instance id |
task_json | longtext | Task node json |
state | tinyint | Task instance status :0 Submit successfully ,1 Running ,2 Ready to pause ,3 Pause ,4 Ready to stop ,5 stop it ,6 Failure ,7 success ,8 Need fault tolerance ,9 kill,10 Wait for thread ,11 Wait for dependency to complete |
submit_time | datetime | Task submission time |
start_time | datetime | Mission start time |
end_time | datetime | Mission end time |
host | varchar | A machine that performs tasks |
execute_path | varchar | Task execution path |
log_path | varchar | Task log path |
alert_flag | tinyint | Alarm or not |
retry_times | int | Retry count |
pid | int | process pid |
app_link | varchar | yarn app id |
flag | tinyint | Is it available :0 Unavailable ,1 You can use |
retry_interval | int | Retry interval |
max_retry_times | int | max retries |
task_instance_priority | int | Task instance priority :0 Highest,1 High,2 Medium,3 Low,4 Lowest |
worker_group | varchar | The task is specified to run worker grouping |
④ t_ds_schedules( Process timing schedule ):
Field | type | notes |
---|---|---|
id | int | Primary key |
process_definition_id | int | Process definition id |
start_time | datetime | Scheduling start time |
end_time | datetime | Scheduling end time |
crontab | varchar | crontab expression |
failure_strategy | tinyint | Failure strategy : 0 end ,1 continue |
user_id | int | user id |
release_state | tinyint | state :0 Not online ,1 go online |
warning_type | tinyint | Alarm type :0 No hair ,1 The process is successfully sent ,2 Process failure occurs ,3 Success or failure will happen |
warning_group_id | int | Alarm group id |
process_instance_priority | int | Process instance priority :0 Highest,1 High,2 Medium,3 Low,4 Lowest |
worker_group | varchar | The task is specified to run worker grouping |
create_time | datetime | Creation time |
update_time | datetime | Update time |
⑤ t_ds_command( Execute the command table ):
Field | type | notes |
---|---|---|
id | int | Primary key |
command_type | tinyint | Command type :0 Start workflow ,1 Execute from the current node ,2 Restore the fault tolerant workflow ,3 Resume the suspended process ,4 Starting from the failed node ,5 Complement number ,6 Dispatch ,7 Heavy run ,8 Pause ,9 stop it ,10 Thread recovery wait |
process_definition_id | int | Process definition id |
command_param | text | Arguments to the command (json Format ) |
task_depend_type | tinyint | Node dependency type :0 Current node ,1 Forward execution ,2 Backward execution |
failure_strategy | tinyint | Failure strategy :0 end ,1 continue |
warning_type | tinyint | Alarm type :0 No hair ,1 The process is successfully sent ,2 Process failure occurs ,3 Success or failure will happen |
warning_group_id | int | Alarm group |
schedule_time | datetime | Expected running time |
start_time | datetime | Starting time |
executor_id | int | Perform user id |
dependence | varchar | Dependent fields |
update_time | datetime | Update time |
process_instance_priority | int | Process instance priority :0 Highest,1 High,2 Medium,3 Low,4 Lowest |
worker_group | varchar | The task is specified to run worker grouping |
05 At the end of the article
This article is for personal reading DolphinScheduler
Some insights , It may be updated later ,DolphinScheduler
I feel that the connectivity is not strong , It needs to be combined with the flow chart to understand . Finally, the flow chart is added , Let's review
The next article talks about its configuration .
边栏推荐
- 钓鱼&文件名反转&office远程模板
- Leetcode daily question (1870. minimum speed to arrive on time)
- 成功解决TypeError: data type ‘category‘ not understood
- [Yu Yue education] Dunhuang Literature and art reference materials of Zhejiang Normal University
- Py06 dictionary mapping dictionary nested key does not exist test key sorting
- Office-DOC加载宏-上线CS
- SSO流程分析
- The internationalization of domestic games is inseparable from professional translation companies
- mysql的基础命令
- 详解SQL中Groupings Sets 语句的功能和底层实现逻辑
猜你喜欢
Defense (greed), FBI tree (binary tree)
What are the commonly used English words and sentences about COVID-19?
Lecture 8: 1602 LCD (Guo Tianxiang)
【软件测试进阶第1步】自动化测试基础知识
Office-DOC加载宏-上线CS
翻译公司证件盖章的价格是多少
Use shortcut LNK online CS
翻译影视剧字幕,这些特点务必要了解
Biomedical localization translation services
Apache DolphinScheduler源码分析(超详细)
随机推荐
Day 246/300 ssh连接提示“REMOTE HOST IDENTIFICATION HAS CHANGED! ”
Introduction and underlying analysis of regular expressions
中英对照:You can do this. Best of luck祝你好运
Attributeerror successfully resolved: can only use cat accessor with a ‘category‘ dtype
LeetCode每日一题(1997. First Day Where You Have Been in All the Rooms)
成功解决TypeError: data type ‘category‘ not understood
ECS accessKey key disclosure and utilization
Drug disease association prediction based on multi-scale heterogeneous network topology information and multiple attributes
Cobalt strike feature modification
Black cat takes you to learn UFS Protocol Part 8: UFS initialization (boot operation)
Office-DOC加载宏-上线CS
一文读懂简单查询代价估算
SSO流程分析
MySQL5.72. MSI installation failed
LeetCode 732. My schedule III
Py06 dictionary mapping dictionary nested key does not exist test key sorting
How to convert flv file to MP4 file? A simple solution
Apple has open source, but what about it?
LeetCode每日一题(1870. Minimum Speed to Arrive on Time)
Biomedical English contract translation, characteristics of Vocabulary Translation