当前位置:网站首页>Flink task exit process and failover mechanism

Flink task exit process and failover mechanism

2022-07-07 03:17:00 Direction_ Wind

1 TaskExecutor End Task Exit logic

Task.doRun() guide Task The core method of initializing and executing its related code ,
Construct and instantiate Task The executable object of : AbstractInvokable invokable.
call AbstractInvokable.invoke() To start Task Calculation logic included .
 Insert picture description here
 Insert picture description here

When AbstractInvokable.invoke() After executing exit , Perform the corresponding operation according to the exit type :

  1. Exit after normal execution : Output ResultPartition Buffer data , And close the buffer , Mark Task by Finished;
  2. Canceling the operation results in exiting : Mark Task by CANCELED, Close user code ;
  3. AbstractInvokable.invoke() An exception is thrown during execution to exit : Mark Task by FAILED, Close user code , Record exceptions ;
  4. AbstractInvokable.invoke() In the process of execution JVM Throw an error : Forcibly terminate the virtual machine , Exit the current process .

Then release Task Related networks 、 Memory 、 File system resources . Finally through Task->TaskManager->JobMaster The delivery link of will Task Notify Leader JobMaster Threads .

Task.notifyFinalState() -> TaskManagerActions.updateTaskExecutionState(TaskExecutionState) -> JobMasterGateway.updateTaskExecutionState(TaskExecutionState)

  • TaskExecutionState Key information carried :
TaskExecutionState {
    
    JobID //  Mission ID
    ExecutionAttemptID  // Task The only execution ID, Mark each execution 
    ExecutionState  //  Enumerated values ,Task Execution status 
    SerializedThrowable //  if Task Throw an exception , This field records exception stack information 
    ...
}
  • Task Perform state transitions :
   CREATED  -> SCHEDULED -> DEPLOYING -> RUNNING -> FINISHED
      |            |            |          |
      |            |            |   +------+
      |            |            V   V
      |            |         CANCELLING -----+----> CANCELED
      |            |                         |
      |            +-------------------------+
      |
      |                                   ... -> FAILED
      V
  RECONCILING  -> RUNNING | FINISHED | CANCELED | FAILED

2 JobMaster End failover technological process

2.1 Task Execute State Handle

JobMaster received TaskManager adopt rpc Sent task Execution status change information , Will notify the current Flink Job scheduler (SchedulerNG) Handle , Because they are all called through the same thread , Follow up on ExecutionGraph( Run time execution plan )、failover Count stateful instances such as read/write There will be no thread safety problems in operation .

JobMaster The core processing logic is SchedulerBase.updateTaskExecutionState(TaskExecutionStateTransition) in (TaskExecutionStateTransition Mainly TaskExecutionState Readability encapsulation ).
Processing logic : Try to receive Task The execution status information is updated to ExecutionGraph in . If the update is successful and target Status as FINISHED, According to the concrete SchedulingStrategy Implementation strategy , Select the consumable result partition and schedule the corresponding consumers Task; If the update is successful and target Status as FAILED, Enter specific failover technological process .

  • SchedulerBase.updateTaskExecutionState(TaskExecutionStateTransition) :
    public final boolean updateTaskExecutionState(
            final TaskExecutionStateTransition taskExecutionState) {
    
        final Optional<ExecutionVertexID> executionVertexId =
                getExecutionVertexId(taskExecutionState.getID());

        boolean updateSuccess = executionGraph.updateState(taskExecutionState);

        if (updateSuccess) {
    
            checkState(executionVertexId.isPresent());
            if (isNotifiable(executionVertexId.get(), taskExecutionState)) {
    
                updateTaskExecutionStateInternal(executionVertexId.get(), taskExecutionState);
            }
            return true;
        } else {
    
            return false;
        }
    }

  • ExecutionGraph.updateState(TaskExecutionStateTransition): The target cannot be found in the current physical execution topology ExecutionAttemptID when , The update will fail . What needs to be noticed is this ID Used to uniquely identify a Execution, and Execution Then represent ExecutionVertex( Represents a topological vertex subTask plan ) An execution instance of ,ExecutionVertex Can be repeated many times . This means when there is subTask Rerun ,currentExecutions Will no longer hold the last executed ID Information .
   /** * Updates the state of one of the ExecutionVertex's Execution attempts. If the new status if * "FINISHED", this also updates the accumulators. * * @param state The state update. * @return True, if the task update was properly applied, false, if the execution attempt was * not found. */
    public boolean updateState(TaskExecutionStateTransition state) {
    
       assertRunningInJobMasterMainThread();
        final Execution attempt = currentExecutions.get(state.getID());
        if (attempt != null) {
    
            try {
    
                final boolean stateUpdated = updateStateInternal(state, attempt);
                maybeReleasePartitions(attempt);
                return stateUpdated;
            } catch (Throwable t) {
    
                ......
                return false;
            }
        } else {
    
            return false;
        }
    }

  • JobMaster: The central operation class responsible for a task topology , Involving job scheduling , Resource management , External communication, etc …

  • SchedulerNG: Be responsible for scheduling job topology . All calls to methods of this kind of object will pass ComponentMainThreadExecutor Trigger , There will be no concurrent calls .

  • ExecutionGraph: The central data structure of the current execution topology , Coordinate the Execution. Describe the various aspects of the whole task SubTask And its partition data , And keep in communication with them .

2.2 Job Failover

2.2.1 Task Failure Handle

  • Task The main process of exception is DefaultScheduler.handleTaskFailure(ExecutionVertexID, Throwable), according to RestartBackoffTimeStrategy Determine whether to restart or failed-job; according to FailoverStrategy Select the one that needs to be restarted SubTask; Finally, according to the current task SchedulingStrategy Execute the corresponding scheduling strategy and restart the corresponding Subtask.
    private void handleTaskFailure(
            final ExecutionVertexID executionVertexId, @Nullable final Throwable error) {
    
        //  Update the current task exception information 
        setGlobalFailureCause(error);
        //  If the relevant operator (source、sink) There is coordinator, Know its further operation 
        notifyCoordinatorsAboutTaskFailure(executionVertexId, error);
        //  Apply the current restart-stratege And get the FailureHandlingResult
        final FailureHandlingResult failureHandlingResult =
                executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
        //  Restart according to the result Task Or fail the task 
        maybeRestartTasks(failureHandlingResult);
    }


    public class FailureHandlingResult {
    
        // Recover all that needs to be restarted SubTask
          Set<ExecutionVertexID> verticesToRestart;
        // Restart delay 
          long restartDelayMS;
        // The root of all evil 
          Throwable error;
        // Whether the global failure 
          boolean globalFailure;
    }

  • ExecutionFailureHandler: Handling exception information , Return the exception handling result according to the current application policy .
    public FailureHandlingResult getFailureHandlingResult(
            ExecutionVertexID failedTask, Throwable cause) {
    
        return handleFailure(
                cause, 
                failoverStrategy.getTasksNeedingRestart(failedTask, cause),  //  Select the one that needs to be restarted SubTask
                false); 
    }

    private FailureHandlingResult handleFailure(
            final Throwable cause,
            final Set<ExecutionVertexID> verticesToRestart,
            final boolean globalFailure) {
    

        if (isUnrecoverableError(cause)) {
    
            return FailureHandlingResult.unrecoverable(
                    new JobException("The failure is not recoverable", cause), globalFailure);
        }

        restartBackoffTimeStrategy.notifyFailure(cause);
        if (restartBackoffTimeStrategy.canRestart()) {
    
            numberOfRestarts++;

            return FailureHandlingResult.restartable(
                    verticesToRestart, restartBackoffTimeStrategy.getBackoffTime(), globalFailure);
        } else {
    
            return FailureHandlingResult.unrecoverable(
                    new JobException(
                            "Recovery is suppressed by " + restartBackoffTimeStrategy, cause),
                    globalFailure);
        }
    }

  • FailoverStrategy: Failover strategy .

    • RestartAllFailoverStrategy: Use this strategy , When a fault occurs , The entire job will be restarted , That is, restart all Subtask.
    • RestartPipelinedRegionFailoverStrategy: When a fault occurs , Restart failure occurs Subtask Where Region.
    • RestartBackoffTimeStrategy: When Task In case of failure , Decide whether to restart and the delay time of restart .
  • FixedDelayRestartBackoffTimeStrategy: Allow tasks to restart a fixed number of times with a specified delay .

    • FailureRateRestartBackoffTimeStrategy: Allow within a fixed failure frequency , Restart with a fixed delay .
    • NoRestartBackoffTimeStrategy: No restart .
  • SchedulingStrategy: Task Execution instance scheduling policy

    • EagerSchedulingStrategy: Hunger scheduling , Dispatch all at the same time Task.
    • LazyFromSourcesSchedulingStrategy: When the consumed partition data is ready, it starts to schedule its follow-up Task, For batch tasks .
    • PipelinedRegionSchedulingStrategy: With pipline Of the link Task For one Region, As its scheduling granularity .

2.2.2 Restart Task


    private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) {
    
        if (failureHandlingResult.canRestart()) {
    
            restartTasksWithDelay(failureHandlingResult);
        } else {
    
            failJob(failureHandlingResult.getError());
        }
    }

    private void restartTasksWithDelay(final FailureHandlingResult failureHandlingResult) {
    
        final Set<ExecutionVertexID> verticesToRestart =
                failureHandlingResult.getVerticesToRestart();

        final Set<ExecutionVertexVersion> executionVertexVersions =
                new HashSet<>(
                        executionVertexVersioner
                                .recordVertexModifications(verticesToRestart)
                                .values());
        final boolean globalRecovery = failureHandlingResult.isGlobalFailure();

        addVerticesToRestartPending(verticesToRestart);

        //  Cancel all that need to be restarted SubTask
        final CompletableFuture<?> cancelFuture = cancelTasksAsync(verticesToRestart);

        delayExecutor.schedule(
                () ->
                        FutureUtils.assertNoException(
                                cancelFuture.thenRunAsync(   //  Restart after stopping 
                                        restartTasks(executionVertexVersions, globalRecovery), 
                                        getMainThreadExecutor())),
                failureHandlingResult.getRestartDelayMS(),
                TimeUnit.MILLISECONDS);
    }


2.2.3 Cancel Task:

  • Cancellation is waiting Slot The distribution of SubTask, If it is already in deployment / Running state , Notice is required TaskExecutor Perform the stop operation and wait for the operation to complete .
    private CompletableFuture<?> cancelTasksAsync(final Set<ExecutionVertexID> verticesToRestart) {
    
        // clean up all the related pending requests to avoid that immediately returned slot
        // is used to fulfill the pending requests of these tasks
        verticesToRestart.stream().forEach(executionSlotAllocator::cancel); //  Cancellation may be waiting for allocation Slot Of SubTask

        final List<CompletableFuture<?>> cancelFutures =
                verticesToRestart.stream()
                        .map(this::cancelExecutionVertex) //  Start to stop SubTask
                        .collect(Collectors.toList());

        return FutureUtils.combineAll(cancelFutures);
    }

    public void cancel() {
    
        while (true) {
     //  If the status change fails, try again 
            ExecutionState current = this.state;
            if (current == CANCELING || current == CANCELED) {
    
                // already taken care of, no need to cancel again
                return;
            }
            else if (current == RUNNING || current == DEPLOYING) {
    
                //  The current status is set to CANCELING, And to TaskExecutor send out RPC Ask to stop SubTask
                if (startCancelling(NUM_CANCEL_CALL_TRIES)) {
    
                    return;
                }
            } else if (current == FINISHED) {
    
                //  Even after running , You can't consume later , Release partition results 
                sendReleaseIntermediateResultPartitionsRpcCall();
                return;
            } else if (current == FAILED) {
    
                return;
            } else if (current == CREATED || current == SCHEDULED) {
    
                // from here, we can directly switch to cancelled, because no task has been deployed
                if (cancelAtomically()) {
    
                    return;
                }
            } else {
    
                throw new IllegalStateException(current.name());
            }
        }
    }

  • After the operation, it will execute Task Exit process notification ExecutionGraph Perform the corresponding data update : ExecutionGraph.updateState(TaskExecutionStateTransition)->ExecutionGraph.updateStateInternal(TaskExecutionStateTransition, Execution) -> Execution.completeCancelling(…) -> Execution.finishCancellation(boolean) -> ExecutionGraph.deregisterExecution(Execution) . stay deregisterExecution The operation will be in currentExecutions Remove the stopped execution ExecutionTask.

2.2.4 Start Task

        private Runnable restartTasks(
                final Set<ExecutionVertexVersion> executionVertexVersions,
                final boolean isGlobalRecovery) {
    
            return () -> {
    
                final Set<ExecutionVertexID> verticesToRestart =
                        executionVertexVersioner.getUnmodifiedExecutionVertices(
                                executionVertexVersions);
    
                removeVerticesFromRestartPending(verticesToRestart);
                //  Instantiate a new SubTask Execution examples (Execution)
                resetForNewExecutions(verticesToRestart);
    
                try {
    
                    //  Restore the state 
                    restoreState(verticesToRestart, isGlobalRecovery);
                } catch (Throwable t) {
    
                    handleGlobalFailure(t);
                    return;
                }
                //  Start scheduling , apply Slot And deploy 
                schedulingStrategy.restartTasks(verticesToRestart);
            };
        }

原网站

版权声明
本文为[Direction_ Wind]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/188/202207061954332122.html