当前位置:网站首页>Apache dolphin scheduler source code analysis (super detailed)

Apache dolphin scheduler source code analysis (super detailed)

2022-07-06 06:44:00 Yang Linwei

01 introduction

Apache DolphinScheduler Official document address :https://dolphinscheduler.apache.org/zh-cn/index.html

Apache DolphinScheduler It's a distributed decentralization , Extensible visualization DAG Workflow task scheduling platform . Committed to solving the complex dependencies in the data processing process , Make scheduling system use out of the box in data processing flow .

The schematic diagram is as follows :
 Insert picture description here

Next , This article explains its source code in detail step by step .

02 DolphinScheduler Project structure

2.1 structural analysis

use first IDEA from Github clone Project to local , Project address https://github.com/apache/dolphinscheduler
 Insert picture description here


After importing the project , You can see Its main core modules are as follows :

modular describe
dolphinscheduler-alert Alarm module , Provide AlertServer service .
dolphinscheduler-apiweb Application module , Provide ApiServer service .
dolphinscheduler-common General constant enumeration 、 Tool class 、 Data structure or base class
dolphinscheduler-dao Provide database access and other operations .
dolphinscheduler-remote be based on netty The client of 、 Server side
dolphinscheduler-serverMasterServer and WorkerServer service
dolphinscheduler-serviceservice modular , contain Quartz、Zookeeper、 Log client access service , Easy server Module and api Module call
dolphinscheduler-ui Front end module

2.2 Table analysis

In the project /dolphinscheduler-dao/src/main/resources/sql/create/release-1.0.0_schema/mysql Under the table of contents , There are database initialization scripts dolphinscheduler_ddl.sql And dolphinscheduler_dml.sql Scripts and some upgrade scripts :

 Insert picture description here
After the execution , You can see the following table in the database :

Table name Table information
t_ds_access_token visit ds Back end token
t_ds_alert The alarm information
t_ds_alertgroup Alarm group
t_ds_command Carry out orders
t_ds_datasource data source
t_ds_error_command( Core table ) Wrong command
t_ds_process_definition( Core table ) Process definition
t_ds_process_instance( Core table ) Process instance
t_ds_project project
t_ds_queue queue
t_ds_relation_datasource_user User associated data source
t_ds_relation_process_instance Sub process
t_ds_relation_project_user User associated items
t_ds_relation_resources_user User associated resources
t_ds_relation_udfs_user User Association UDF function
t_ds_relation_user_alertgroup User associated alarm group
t_ds_resources Resource file
t_ds_schedules( Core table ) Process scheduling
t_ds_session User logged in session
t_ds_task_instance( Core table ) Task instance
t_ds_tenant Tenant
t_ds_udfs UDF resources
t_ds_user user
t_ds_version ds Version information

The core table can be seen directly in the appendix at the end of the text .

2.2.1 Class diagram ( user / queue / data source )

 Insert picture description here
Described below :

  • There can be multiple users under a tenant ;
  • t_ds_user Medium queue The field stores queue_name Information ;
  • t_ds_tenant What is saved is queue_id, During the implementation of process definition , User queue has the highest priority , If the user queue is empty, the tenant queue ;
  • t_ds_datasource In the table user_id Field indicates the user who created the data source ;
  • t_ds_relation_datasource_user Medium user_id Express , Users who have permissions on the data source .

2.2.2 Class diagram ( project / resources / The alarm )

 Insert picture description here
Described below :

  • A user can have multiple projects , User project authorization passed t_ds_relation_project_user Table complete project_id and user_id The relationship binding of ;
  • t_ds_projcet In the table user_id Indicates the user who created the project ;
  • t_ds_relation_project_user In the table user_id Indicates the user who has permission to the project ;
  • t_ds_resources In the table user_id Indicates the user who created the resource ;
  • t_ds_relation_resources_user Medium user_id Indicates the user who has permission to the resource ;
  • t_ds_udfs In the table user_id Indicates that the UDF Users of ;
  • t_ds_relation_udfs_user In the table user_id Said to UDF Users with permissions .

2.2.3 Class diagram ( command / technological process / Mission )

 Insert picture description here


 Insert picture description here
Described below :

  • A project has multiple process definitions , A process definition can generate multiple process instances , A process instance can generate multiple task instances ;
  • t_ds_schedulers Table stores the scheduled scheduling information defined by the process ;
  • t_ds_relation_process_instance The data stored in the table is used to deal with the situation that there are sub processes in the process definition ,parent_process_instance_id Represents an instance of the main process with sub processes id,process_instance_id That represents a subprocess instance id,parent_task_instance_id Represents the task instance of the sub process node id, The process instance table and task instance table correspond to t_ds_process_instance Table and t_ds_task_instance surface

03 DolphinScheduler Source code analysis

Before explaining the source code , First post a startup flow chart of the official website :
 Insert picture description here

3.1 ExecutorController

According to the above flow chart , You can see , We are UI Layer Click “ Start button ” after , Will visit dophinscheduler-api The directory Api Server service , Will enter the ExecutorController( Full classpath :org.apache.dolphinscheduler.api.controller.ExecutorController), Let's see below. ExecutorController What interface methods are provided ?

 Insert picture description here

The following is a description of each interface :

Interface describe
/start-process-instance Execution process instance
/batch-start-process-instance Batch execution process instances
/execute Operation process instance , Such as : Pause , stop it , Heavy run , Resume from pause , Resume from stop
/batch-execute Batch operation process instance
/start-check Check the process definition or check whether all sub process definitions are online

Next, let's take a look at the core execute Method :

 /** * do action to process instance: pause, stop, repeat, recover from pause, recover from stop * * @param loginUser login user * @param projectCode project code * @param processInstanceId process instance id * @param executeType execute type * @return execute result code */
    @ApiOperation(value = "execute", notes = "EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES")
    @ApiImplicitParams({
    
            @ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"),
            @ApiImplicitParam(name = "executeType", value = "EXECUTE_TYPE", required = true, dataType = "ExecuteType")
    })
    @PostMapping(value = "/execute")
    @ResponseStatus(HttpStatus.OK)
    @ApiException(EXECUTE_PROCESS_INSTANCE_ERROR)
    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
    public Result execute(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
                          @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
                          @RequestParam("processInstanceId") Integer processInstanceId,
                          @RequestParam("executeType") ExecuteType executeType
    ) {
    
        Map<String, Object> result = execService.execute(loginUser, projectCode, processInstanceId, executeType);
        return returnDataList(result);
    }

You can see execute Interface , Direct use ExecService To execute , Let's analyze .

3.2 ExecService

Let's take a look inside execute Method , Comments have been added :

/** *  Operate workflow instances  * * @param loginUser  The logged in user  * @param projectCode  Project code  * @param processInstanceId  Process instance ID * @param executeType  Execution type (repeat running、resume pause、resume failure、stop、pause) * @return  Execution results  */
@Override
public Map<String, Object> execute(User loginUser, long projectCode, Integer processInstanceId, ExecuteType executeType) {
    

    /***  Query project information  **/
    Project project = projectMapper.queryByCode(projectCode);
    //check user access for project

    /***  Judge whether the current user has operation permission  **/
    Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, ApiFuncIdentificationConstant.map.get(executeType));
    if (result.get(Constants.STATUS) != Status.SUCCESS) {
    
        return result;
    }

    /***  Check Master Whether the node exists  **/
    if (!checkMasterExists(result)) {
    
        return result;
    }

    /***  Query workflow instance details  **/
    ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
    if (processInstance == null) {
    
        putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
        return result;
    }

    /***  According to the process definition bound by the workflow instance ID Query process definition  **/
    ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
            processInstance.getProcessDefinitionVersion());
    if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) {
    
        /***  Verify whether the workflow definition can be executed ( Whether the workflow exists ? Whether it is online ? There are sub workflow definitions that are not online ?) **/
        result = checkProcessDefinitionValid(projectCode, processDefinition, processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
        if (result.get(Constants.STATUS) != Status.SUCCESS) {
    
            return result;
        }
    }

    /***  According to the status of the current workflow instance, judge whether the corresponding executeType Type of operation  **/
    result = checkExecuteType(processInstance, executeType);
    if (result.get(Constants.STATUS) != Status.SUCCESS) {
    
        return result;
    }

    /***  Judge whether the right tenant has been selected  **/
    if (!checkTenantSuitable(processDefinition)) {
    
        logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ",
                processDefinition.getId(), processDefinition.getName());
        putMsg(result, Status.TENANT_NOT_SUITABLE);
    }

    /***  stay executeType In the state of re running , Get the startup parameters specified by the user  **/
    Map<String, Object> commandMap = JSONUtils.parseObject(processInstance.getCommandParam(), new TypeReference<Map<String, Object>>() {
    
    });
    String startParams = null;
    if (MapUtils.isNotEmpty(commandMap) && executeType == ExecuteType.REPEAT_RUNNING) {
    
        Object startParamsJson = commandMap.get(Constants.CMD_PARAM_START_PARAMS);
        if (startParamsJson != null) {
    
            startParams = startParamsJson.toString();
        }
    }

    /***  According to ExecuteType To perform the corresponding operation  **/
    switch (executeType) {
    
        case REPEAT_RUNNING: //  Heavy run 
            result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams);
            break;
        case RECOVER_SUSPENDED_PROCESS: //  Resume the mounted workflow 
            result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams);
            break;
        case START_FAILURE_TASK_PROCESS: //  Start the failed workflow 
            result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams);
            break;
        case STOP: //  stop it 
            if (processInstance.getState() == ExecutionStatus.READY_STOP) {
    
                putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
            } else {
    
                result = updateProcessInstancePrepare(processInstance, CommandType.STOP, ExecutionStatus.READY_STOP);
            }
            break;
        case PAUSE: //  Pause 
            if (processInstance.getState() == ExecutionStatus.READY_PAUSE) {
    
                putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
            } else {
    
                result = updateProcessInstancePrepare(processInstance, CommandType.PAUSE, ExecutionStatus.READY_PAUSE);
            }
            break;
        default:
            logger.error("unknown execute type : {}", executeType);
            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type");

            break;
    }
    return result;
}

You can see , The first half of the above code is mainly for verification , The second half is to do different operations according to the execution type , The operation is mainly divided into two parts :insertCommand as well as updateProcessInstancePrepare.

3.2.1 insertCommand

The method code is as follows , In fact, the main thing is to insert the generated command t_ds_command( Execute the command table ), Insert comments that have been added :

/** *  Insert command (re run, recovery (pause / failure) execution) * * @param loginUser  The logged in user  * @param instanceId  Workflow instance id * @param processDefinitionCode  Workflow definition id * @param processVersion  Workflow version  * @param commandType  Command type  * @return  Operating results  */
private Map<String, Object> insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, int processVersion, CommandType commandType, String startParams) {
    
    Map<String, Object> result = new HashMap<>();

    /***  Encapsulate startup parameters  **/
    Map<String, Object> cmdParam = new HashMap<>();
    cmdParam.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, instanceId);
    if (!StringUtils.isEmpty(startParams)) {
    
        cmdParam.put(CMD_PARAM_START_PARAMS, startParams);
    }

    Command command = new Command();
    command.setCommandType(commandType);
    command.setProcessDefinitionCode(processDefinitionCode);
    command.setCommandParam(JSONUtils.toJsonString(cmdParam));
    command.setExecutorId(loginUser.getId());
    command.setProcessDefinitionVersion(processVersion);
    command.setProcessInstanceId(instanceId);

    /***  Determine whether the workflow instance is executing  **/
    if (!processService.verifyIsNeedCreateCommand(command)) {
    
        putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, String.valueOf(processDefinitionCode));
        return result;
    }

    /***  Save command  **/
    int create = processService.createCommand(command);

    if (create > 0) {
    
        putMsg(result, Status.SUCCESS);
    } else {
    
        putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
    }

    return result;
}

3.2.2 updateProcessInstancePrepare

The method code is as follows , Comments have been added

/** *  Prepare to update the command type and status of the workflow instance  * * @param processInstance  Workflow instance  * @param commandType  Command type  * @param executionStatus  Execution status  * @return  Update results  */
private Map<String, Object> updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType, ExecutionStatus executionStatus) {
    
    Map<String, Object> result = new HashMap<>();

    processInstance.setCommandType(commandType);
    processInstance.addHistoryCmd(commandType);
    processInstance.setState(executionStatus);
    int update = processService.updateProcessInstance(processInstance);

    //  Judge whether the process is normal 
    if (update > 0) {
    
        StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
                processInstance.getId(), 0, processInstance.getState(), processInstance.getId(), 0
        );
        Host host = new Host(processInstance.getHost());
        stateEventCallbackService.sendResult(host, stateEventChangeCommand.convert2Command());
        putMsg(result, Status.SUCCESS);
    } else {
    
        putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
    }
    return result;
}

According to the flow chart , We can see that the code with the following red box has been executed , That is to put our command It's cached to DB. Next we need to see Master Code for .
 Insert picture description here

3.3 MasterServer

MasterSerer In the project /dolphinscheduler-dev/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java Catalog . It corresponds to this part of the architecture diagram :
 Insert picture description here
The code and comments are as follows :

@SpringBootApplication
@ComponentScan("org.apache.dolphinscheduler")
@EnableTransactionManagement
@EnableCaching
public class MasterServer implements IStoppable {
    
    private static final Logger logger = LoggerFactory.getLogger(MasterServer.class);

    @Autowired
    private SpringApplicationContext springApplicationContext;

    @Autowired
    private MasterRegistryClient masterRegistryClient;

    @Autowired
    private TaskPluginManager taskPluginManager;

    @Autowired
    private MasterSchedulerService masterSchedulerService;

    @Autowired
    private SchedulerApi schedulerApi;

    @Autowired
    private EventExecuteService eventExecuteService;

    @Autowired
    private FailoverExecuteThread failoverExecuteThread;

    @Autowired
    private MasterRPCServer masterRPCServer;

    public static void main(String[] args) {
    
        Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
        SpringApplication.run(MasterServer.class);
    }

    /** *  start-up  master server */
    @PostConstruct
    public void run() throws SchedulerException {
    

        //  initialization  RPC service 
        this.masterRPCServer.start();

        // Install the task plug-in 
        this.taskPluginManager.installPlugin();

        /*** MasterServer  Register client , Used to connect to the registry and pass registry events . *  When the master node starts , It will be registered in the registry , And dispatch a {@link HeartBeatTask} To update metadata in the registry **/
        this.masterRegistryClient.init();
        this.masterRegistryClient.start();
        this.masterRegistryClient.setRegistryStoppable(this);

        //  Main scheduler thread , This thread will use the command from the database and trigger the execution processInstance.
        this.masterSchedulerService.init();
        this.masterSchedulerService.start();

        this.eventExecuteService.start();
        this.failoverExecuteThread.start();

        // This is the interface of the scheduler , Contains methods for operating scheduling tasks .
        this.schedulerApi.start();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    
            if (Stopper.isRunning()) {
    
                close("MasterServer shutdownHook");
            }
        }));
    }

    /** *  Elegant closing method  * * @param cause  The reason for closing  */
    public void close(String cause) {
    

        try {
    
            // set stop signal is true
            // execute only once
            if (!Stopper.stop()) {
    
                logger.warn("MasterServer is already stopped, current cause: {}", cause);
                return;
            }

            logger.info("Master server is stopping, current cause : {}", cause);

            // thread sleep 3 seconds for thread quietly stop
            ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
            // close
            this.schedulerApi.close();
            this.masterSchedulerService.close();
            this.masterRPCServer.close();
            this.masterRegistryClient.closeRegistry();
            // close spring Context and will invoke method with @PreDestroy annotation to destroy beans.
            // like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
            springApplicationContext.close();

            logger.info("MasterServer stopped, current cause: {}", cause);
        } catch (Exception e) {
    
            logger.error("MasterServer stop failed, current cause: {}", cause, e);
        }
    }

    @Override
    public void stop(String cause) {
    
        close(cause);
    }
}

stay run Method inside , You can see , Mainly in turn :

  • ① MasterRPCServer.start(): start-up master Of rpc service ;
  • ② TaskPluginManager.installPlugin(): Install the task plug-in ;
  • ③ MasterRegistryClient.start(): towards Zookeeper register MasterServer;
  • ④ MasterSchedulerService.start(): Main scheduler thread , This thread will use the command from the database and trigger the execution processInstance.
  • ⑤ EventExecuteService.start(): Workflow instance execution
  • ⑥ FailoverExecuteThread(): Failover detection
  • ⑦ SchedulerApi.start():scheduler Interface to operate task instances

3.3.1 MasterRPCServer

Master RPC Server It is mainly used to send or receive requests to other systems .

The initialization method is as follows :

@PostConstruct
private void init() {
    
    //  Initialize the remote service 
    NettyServerConfig serverConfig = new NettyServerConfig();
    serverConfig.setListenPort(masterConfig.getListenPort());
    this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL, taskRecallProcessor);

    //  The log service 
    this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);

    this.nettyRemotingServer.start();
}

3.3.2 TaskPluginManager

Plug in installation manager , adopt SPI Mechanism to load data source plug-ins , The plug-ins are in the following directory :
 Insert picture description here
As long as each data source plug-in implements the specified factory , Such as Spark:
 Insert picture description here
The core code is as follows , And pass loadTaskChannel () Method , hold factory Cache into Map<String, TaskChannel> aggregate :
 Insert picture description here

3.3.3 MasterRegistryClient

We know DolphinScheduler It uses Decentralized thinking , therefore MasterRegistryClient The main function is to register MasterServer client , Used to connect to the registry and pass registry events . When Master When the node starts , It will be registered in the registry , And dispatch a HeartBeatTask To update metadata in the registry .
 Insert picture description here
Get into RegistryClient Inside subscribe Method , You can see that there are two kinds of registration centers , One is MySQL Of , The other is Zookeeper Of , It should be used by default here ZookeeperRegistry.
 Insert picture description here

3.3.4 MasterSchedulerService

Its init and run The method is as follows ,init It mainly initializes the queue of a workflow instance

 Insert picture description here

Look at the actual operation inside run Method , You can see that there is an endless cycle , Keep doing it scheduleWorkflow() Method :
 Insert picture description here
Look at the inside scheduleWorkflow() Method , Notes have been written :

/** *  Query commands by slot from the database , Convert to workflow instance , And submit it to workflowExecuteThreadPool. */
private void scheduleWorkflow() throws InterruptedException, MasterException {
    
    //  Query commands by slot from the database 
    List<Command> commands = findCommands();
    if (CollectionUtils.isEmpty(commands)) {
    
        // indicate that no command ,sleep for 1s
        Thread.sleep(Constants.SLEEP_TIME_MILLIS);
        return;
    }

    //  Convert to workflow instance 
    List<ProcessInstance> processInstances = command2ProcessInstance(commands);
    if (CollectionUtils.isEmpty(processInstances)) {
    
        // indicate that the command transform to processInstance error, sleep for 1s
        Thread.sleep(Constants.SLEEP_TIME_MILLIS);
        return;
    }
    MasterServerMetrics.incMasterConsumeCommand(commands.size());

    for (ProcessInstance processInstance : processInstances) {
    
        // Submit to workflowExecuteThreadPool
        submitProcessInstance(processInstance);
    }
}

The method of submitting workflow instances is as follows , Notice the submission workflowExecuteThreadPool

/** *  Submit workflow instances to  workflowExecuteThreadPool * * @param processInstance  Workflow instance  */
private void submitProcessInstance(@NonNull ProcessInstance processInstance) {
    
    try {
    
        LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
        logger.info("Master schedule service starting workflow instance");

        //  Encapsulate workflow instances Runnable
        final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
                processInstance
                , processService
                , nettyExecutorManager
                , processAlertManager
                , masterConfig
                , stateWheelExecuteThread
                , curingGlobalParamsService);

        this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
        if (processInstance.getTimeout() > 0) {
    
            stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
        }
        ProcessInstanceMetrics.incProcessInstanceSubmit();

        //  Submit the encapsulated workflow instance Runnable to workflowExecuteThreadPool
        CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture = CompletableFuture.supplyAsync(
                workflowExecuteRunnable::call, workflowExecuteThreadPool);
        workflowSubmitFuture.thenAccept(workflowSubmitStatue -> {
    
            if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) {
    
                // submit failed
                processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
                stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
                submitFailedProcessInstances.add(processInstance);
            }
        });
        logger.info("Master schedule service started workflow instance");

    } catch (Exception ex) {
    
        processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
        stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
        logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex);
    } finally {
    
        LoggerUtils.removeWorkflowInstanceIdMDC();
    }
}

3.3.5 EventExecuteService

EventExecuteService There is no class comment , Guess should be used to pass the execution event of each workflow instance .
 Insert picture description here

3.3.6 FailoverExecuteThread

FailoverExecuteThread Detect threads for failover . every other 10 Check once per second , The core approach is FailoverService Inside failoverMasterWithLock() And failoverMaster() Method :

 Insert picture description here

3.3.7 SchedulerApi

SchedulerApi It is the interface to operate scheduling task instances , Its main functions are Start the scheduler 、 Insert or update scheduling tasks 、 Delete scheduled task 、 Close scheduling tasks and release resources .

3.3.8 TaskPriorityQueueConsumer

According to the flow chart , We can know TaskConsumer Get split tasks from the queue , Then forward it to Worker perform .
 Insert picture description here

Let's see. TaskPriorityQueueConsumer The core code in , You can see the batch distribution task method :
 Insert picture description here
Get into batchDispatch Method , You can see that the task is obtained from the queue and distributed :
 Insert picture description here
Finally, distribute the task to worker:
 Insert picture description here

ok, Come here , We can see worker Part of the code .

3.4 WorkerServer

MasterSerer In the project /dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java Catalog . Its main program code is similar to MasterServer Almost the same , The code and comments are as follows :

@PostConstruct
public void run() {
    
	// worker rpc service 
    this.workerRpcServer.start();

	//  Task plug-in installation 
    this.taskPluginManager.installPlugin();

    //  towards Zookeeper Register client 
    this.workerRegistryClient.registry();
    this.workerRegistryClient.setRegistryStoppable(this);
    Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths();
    this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);

   //  management Worker Threads 
    this.workerManagerThread.start();
    
    //  Reporting status thread 
    this.retryReportTaskStatusThread.start();

    /* * registry hooks, which are called before the process exits */
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    
        if (Stopper.isRunning()) {
    
            close("WorkerServer shutdown hook");
        }
    }));
}

According to the flow chart provided on the official website , What we need to pay attention to is TaskExecutePorcessor:
 Insert picture description here

3.4.1 TaskExecutePorcessor

TaskExecuteProcessor stay /dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java Catalog , Beyond all doubt , Its core method is :

@Counted(value = "ds.task.execution.count", description = "task execute total count")
@Timed(value = "ds.task.execution.duration", percentiles = {
    0.5, 0.75, 0.95, 0.99}, histogram = true)
@Override
public void process(Channel channel, Command command) {
    
 // code ...
}

There is a core code in it , Submit tasks TaskExecuteThread To the manager :
 Insert picture description here
So let's see TaskExecuteThread Code for .

3.4.2 TaskExecuteThread

TaskExecuteThread It's the code that finally executes the task , Inside run The method is as follows , Annotated :


@Override
public void run() {
    
    // dry run  Preview mode 
    if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
    
        taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS);
        taskExecutionContext.setStartTime(new Date());
        taskExecutionContext.setEndTime(new Date());
        TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
        taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
        logger.info("[WorkflowInstance-{}][TaskInstance-{}] Task dry run success",
            taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
        return;
    }
    try {
    
        LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
        logger.info("script path : {}", taskExecutionContext.getExecutePath());
        if (taskExecutionContext.getStartTime() == null) {
    
            taskExecutionContext.setStartTime(new Date());
        }
        logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());

        // Callback task execution status 
        taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
        taskCallbackService.sendTaskExecuteRunningCommand(taskExecutionContext);

        //  Copy  hdfs/minio  File to local 
        List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources());
        if (!fileDownloads.isEmpty()) {
    
            downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads);
        }

        taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());

        taskExecutionContext.setTaskAppId(String.format("%s_%s",
                taskExecutionContext.getProcessInstanceId(),
                taskExecutionContext.getTaskInstanceId()));

        TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
        if (null == taskChannel) {
    
            throw new ServiceException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType()));
        }
        String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(),
                taskExecutionContext.getProcessDefineCode(),
                taskExecutionContext.getProcessDefineVersion(),
                taskExecutionContext.getProcessInstanceId(),
                taskExecutionContext.getTaskInstanceId());
        taskExecutionContext.setTaskLogName(taskLogName);

        //  Set the name of the current thread 
        Thread.currentThread().setName(taskLogName);

        task = taskChannel.createTask(taskExecutionContext);

        //  Execute task plug-in method  - init
        this.task.init();

        //init varPool
        this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());

        //  Execute task plug-in method  - handle
        this.task.handle();

        //  Determine whether to send an alarm 
        if (this.task.getNeedAlert()) {
    
            sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus().getCode());
        }

        taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.of(this.task.getExitStatus().getCode()));
        taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
        taskExecutionContext.setProcessId(this.task.getProcessId());
        taskExecutionContext.setAppIds(this.task.getAppIds());
        taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
        logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus());
    } catch (Throwable e) {
    
        logger.error("task scheduler failure", e);
        kill();
        taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
        taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
        taskExecutionContext.setProcessId(this.task.getProcessId());
        taskExecutionContext.setAppIds(this.task.getAppIds());
    } finally {
    
        TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
        taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
        clearTaskExecPath();
        LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
    }
}

04 appendix

4.1 Core table

① t_ds_process_definition( Process definition form ):

Field type notes
idint Primary key
namevarchar Process definition name
versionint Process definition version
release_statetinyint Release status of process definition :0 Not online 1 Online
project_idint project id
user_idint The user of the process definition id
process_definition_jsonlongtext Process definition json strand
descriptiontext Process definition description
global_paramstext Global parameter
flagtinyint Whether the process is available :0 Unavailable ,1 You can use
locationstext Node coordinate information
connectstext Node connection information
receiverstext The recipient
receivers_cctext Cc people
create_timedatetime Creation time
timeoutint Timeout time
tenant_idint Tenant id
update_timedatetime Update time
modify_byvarchar Modify the user
resource_idsvarchar resources id Set

② t_ds_process_instance( Process instance table ):

Field type notes
idint Primary key
namevarchar Process instance name
process_definition_idint Process definition id
statetinyint Process instance status :0 Submit successfully ,1 Running ,2 Ready to pause ,3 Pause ,4 Ready to stop ,5 stop it ,6 Failure ,7 success ,8 Need fault tolerance ,9 kill,10 Wait for thread ,11 Wait for dependency to complete
recoverytinyint Process instance fault tolerance ID :0 normal ,1 It needs to be restarted by fault tolerance
start_timedatetime Process instance start time
end_timedatetime Process instance end time
run_timesint Number of process instance runs
hostvarchar The machine where the process instance is located
command_typetinyint Command type :0 Start workflow ,1 Execute from the current node ,2 Restore the fault tolerant workflow ,3 Resume the suspended process ,4 Starting from the failed node ,5 Complement number ,6 Dispatch ,7 Heavy run ,8 Pause ,9 stop it ,10 Thread recovery wait
command_paramtext Arguments to the command (json Format )
task_depend_typetinyint Node dependency type :0 Current node ,1 Forward execution ,2 Backward execution
max_try_timestinyint max retries
failure_strategytinyint Failure strategy 0 End after failure ,1 Continue after failure
warning_typetinyint Alarm type :0 No hair ,1 The process is successfully sent ,2 Process failure occurs ,3 Success or failure will happen
warning_group_idint Alarm group id
schedule_timedatetime Expected running time
command_start_timedatetime Start command time
global_paramstext Global parameter ( Parameters defined by solidification process )
process_instance_jsonlongtext Process instance json(copy Process defined json)
flagtinyint Is it available ,1 You can use ,0 Unavailable
update_timetimestamp Update time
is_sub_processint Whether it is a sub workflow 1 yes ,0 No
executor_idint Command execution user
locationstext Node coordinate information
connectstext Node connection information
history_cmdtext History commands , Record all operations on process instances
dependence_schedule_timestext Estimated time of dependent nodes
process_instance_priorityint Process instance priority :0 Highest,1 High,2 Medium,3 Low,4 Lowest
worker_groupvarchar The task is specified to run worker grouping
timeoutint Timeout time
tenant_idint Tenant id

③ t_ds_task_instance( Task example table ):

Field type notes
idint Primary key
namevarchar The name of the task
task_typevarchar Task type
process_definition_idint Process definition id
process_instance_idint Process instance id
task_jsonlongtext Task node json
statetinyint Task instance status :0 Submit successfully ,1 Running ,2 Ready to pause ,3 Pause ,4 Ready to stop ,5 stop it ,6 Failure ,7 success ,8 Need fault tolerance ,9 kill,10 Wait for thread ,11 Wait for dependency to complete
submit_timedatetime Task submission time
start_timedatetime Mission start time
end_timedatetime Mission end time
hostvarchar A machine that performs tasks
execute_pathvarchar Task execution path
log_pathvarchar Task log path
alert_flagtinyint Alarm or not
retry_timesint Retry count
pidint process pid
app_linkvarcharyarn app id
flagtinyint Is it available :0 Unavailable ,1 You can use
retry_intervalint Retry interval
max_retry_timesint max retries
task_instance_priorityint Task instance priority :0 Highest,1 High,2 Medium,3 Low,4 Lowest
worker_groupvarchar The task is specified to run worker grouping

④ t_ds_schedules( Process timing schedule ):

Field type notes
idint Primary key
process_definition_idint Process definition id
start_timedatetime Scheduling start time
end_timedatetime Scheduling end time
crontabvarcharcrontab expression
failure_strategytinyint Failure strategy : 0 end ,1 continue
user_idint user id
release_statetinyint state :0 Not online ,1 go online
warning_typetinyint Alarm type :0 No hair ,1 The process is successfully sent ,2 Process failure occurs ,3 Success or failure will happen
warning_group_idint Alarm group id
process_instance_priorityint Process instance priority :0 Highest,1 High,2 Medium,3 Low,4 Lowest
worker_groupvarchar The task is specified to run worker grouping
create_timedatetime Creation time
update_timedatetime Update time

⑤ t_ds_command( Execute the command table ):

Field type notes
idint Primary key
command_typetinyint Command type :0 Start workflow ,1 Execute from the current node ,2 Restore the fault tolerant workflow ,3 Resume the suspended process ,4 Starting from the failed node ,5 Complement number ,6 Dispatch ,7 Heavy run ,8 Pause ,9 stop it ,10 Thread recovery wait
process_definition_idint Process definition id
command_paramtext Arguments to the command (json Format )
task_depend_typetinyint Node dependency type :0 Current node ,1 Forward execution ,2 Backward execution
failure_strategytinyint Failure strategy :0 end ,1 continue
warning_typetinyint Alarm type :0 No hair ,1 The process is successfully sent ,2 Process failure occurs ,3 Success or failure will happen
warning_group_idint Alarm group
schedule_timedatetime Expected running time
start_timedatetime Starting time
executor_idint Perform user id
dependencevarchar Dependent fields
update_timedatetime Update time
process_instance_priorityint Process instance priority :0 Highest,1 High,2 Medium,3 Low,4 Lowest
worker_groupvarchar The task is specified to run worker grouping

05 At the end of the article

This article is for personal reading DolphinScheduler Some insights , It may be updated later ,DolphinScheduler I feel that the connectivity is not strong , It needs to be combined with the flow chart to understand . Finally, the flow chart is added , Let's review
 Insert picture description here
The next article talks about its configuration .

原网站

版权声明
本文为[Yang Linwei]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/187/202207060632591094.html