当前位置:网站首页>Flink task exit process and failover mechanism
Flink task exit process and failover mechanism
2022-07-07 03:17:00 【Direction_ Wind】
Here's the catalog title
1 TaskExecutor End Task Exit logic
Task.doRun() guide Task The core method of initializing and executing its related code ,
Construct and instantiate Task The executable object of : AbstractInvokable invokable.
call AbstractInvokable.invoke() To start Task Calculation logic included .
When AbstractInvokable.invoke() After executing exit , Perform the corresponding operation according to the exit type :
- Exit after normal execution : Output ResultPartition Buffer data , And close the buffer , Mark Task by Finished;
- Canceling the operation results in exiting : Mark Task by CANCELED, Close user code ;
- AbstractInvokable.invoke() An exception is thrown during execution to exit : Mark Task by FAILED, Close user code , Record exceptions ;
- AbstractInvokable.invoke() In the process of execution JVM Throw an error : Forcibly terminate the virtual machine , Exit the current process .
Then release Task Related networks 、 Memory 、 File system resources . Finally through Task->TaskManager->JobMaster The delivery link of will Task Notify Leader JobMaster Threads .
Task.notifyFinalState() -> TaskManagerActions.updateTaskExecutionState(TaskExecutionState) -> JobMasterGateway.updateTaskExecutionState(TaskExecutionState)
- TaskExecutionState Key information carried :
TaskExecutionState {
JobID // Mission ID
ExecutionAttemptID // Task The only execution ID, Mark each execution
ExecutionState // Enumerated values ,Task Execution status
SerializedThrowable // if Task Throw an exception , This field records exception stack information
...
}
- Task Perform state transitions :
CREATED -> SCHEDULED -> DEPLOYING -> RUNNING -> FINISHED
| | | |
| | | +------+
| | V V
| | CANCELLING -----+----> CANCELED
| | |
| +-------------------------+
|
| ... -> FAILED
V
RECONCILING -> RUNNING | FINISHED | CANCELED | FAILED
2 JobMaster End failover technological process
2.1 Task Execute State Handle
JobMaster received TaskManager adopt rpc Sent task Execution status change information , Will notify the current Flink Job scheduler (SchedulerNG) Handle , Because they are all called through the same thread , Follow up on ExecutionGraph( Run time execution plan )、failover Count stateful instances such as read/write There will be no thread safety problems in operation .
JobMaster The core processing logic is SchedulerBase.updateTaskExecutionState(TaskExecutionStateTransition) in (TaskExecutionStateTransition Mainly TaskExecutionState Readability encapsulation ).
Processing logic : Try to receive Task The execution status information is updated to ExecutionGraph in . If the update is successful and target Status as FINISHED, According to the concrete SchedulingStrategy Implementation strategy , Select the consumable result partition and schedule the corresponding consumers Task; If the update is successful and target Status as FAILED, Enter specific failover technological process .
- SchedulerBase.updateTaskExecutionState(TaskExecutionStateTransition) :
public final boolean updateTaskExecutionState(
final TaskExecutionStateTransition taskExecutionState) {
final Optional<ExecutionVertexID> executionVertexId =
getExecutionVertexId(taskExecutionState.getID());
boolean updateSuccess = executionGraph.updateState(taskExecutionState);
if (updateSuccess) {
checkState(executionVertexId.isPresent());
if (isNotifiable(executionVertexId.get(), taskExecutionState)) {
updateTaskExecutionStateInternal(executionVertexId.get(), taskExecutionState);
}
return true;
} else {
return false;
}
}
- ExecutionGraph.updateState(TaskExecutionStateTransition): The target cannot be found in the current physical execution topology ExecutionAttemptID when , The update will fail . What needs to be noticed is this ID Used to uniquely identify a Execution, and Execution Then represent ExecutionVertex( Represents a topological vertex subTask plan ) An execution instance of ,ExecutionVertex Can be repeated many times . This means when there is subTask Rerun ,currentExecutions Will no longer hold the last executed ID Information .
/** * Updates the state of one of the ExecutionVertex's Execution attempts. If the new status if * "FINISHED", this also updates the accumulators. * * @param state The state update. * @return True, if the task update was properly applied, false, if the execution attempt was * not found. */
public boolean updateState(TaskExecutionStateTransition state) {
assertRunningInJobMasterMainThread();
final Execution attempt = currentExecutions.get(state.getID());
if (attempt != null) {
try {
final boolean stateUpdated = updateStateInternal(state, attempt);
maybeReleasePartitions(attempt);
return stateUpdated;
} catch (Throwable t) {
......
return false;
}
} else {
return false;
}
}
JobMaster: The central operation class responsible for a task topology , Involving job scheduling , Resource management , External communication, etc …
SchedulerNG: Be responsible for scheduling job topology . All calls to methods of this kind of object will pass ComponentMainThreadExecutor Trigger , There will be no concurrent calls .
ExecutionGraph: The central data structure of the current execution topology , Coordinate the Execution. Describe the various aspects of the whole task SubTask And its partition data , And keep in communication with them .
2.2 Job Failover
2.2.1 Task Failure Handle
- Task The main process of exception is DefaultScheduler.handleTaskFailure(ExecutionVertexID, Throwable), according to RestartBackoffTimeStrategy Determine whether to restart or failed-job; according to FailoverStrategy Select the one that needs to be restarted SubTask; Finally, according to the current task SchedulingStrategy Execute the corresponding scheduling strategy and restart the corresponding Subtask.
private void handleTaskFailure(
final ExecutionVertexID executionVertexId, @Nullable final Throwable error) {
// Update the current task exception information
setGlobalFailureCause(error);
// If the relevant operator (source、sink) There is coordinator, Know its further operation
notifyCoordinatorsAboutTaskFailure(executionVertexId, error);
// Apply the current restart-stratege And get the FailureHandlingResult
final FailureHandlingResult failureHandlingResult =
executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
// Restart according to the result Task Or fail the task
maybeRestartTasks(failureHandlingResult);
}
public class FailureHandlingResult {
// Recover all that needs to be restarted SubTask
Set<ExecutionVertexID> verticesToRestart;
// Restart delay
long restartDelayMS;
// The root of all evil
Throwable error;
// Whether the global failure
boolean globalFailure;
}
- ExecutionFailureHandler: Handling exception information , Return the exception handling result according to the current application policy .
public FailureHandlingResult getFailureHandlingResult(
ExecutionVertexID failedTask, Throwable cause) {
return handleFailure(
cause,
failoverStrategy.getTasksNeedingRestart(failedTask, cause), // Select the one that needs to be restarted SubTask
false);
}
private FailureHandlingResult handleFailure(
final Throwable cause,
final Set<ExecutionVertexID> verticesToRestart,
final boolean globalFailure) {
if (isUnrecoverableError(cause)) {
return FailureHandlingResult.unrecoverable(
new JobException("The failure is not recoverable", cause), globalFailure);
}
restartBackoffTimeStrategy.notifyFailure(cause);
if (restartBackoffTimeStrategy.canRestart()) {
numberOfRestarts++;
return FailureHandlingResult.restartable(
verticesToRestart, restartBackoffTimeStrategy.getBackoffTime(), globalFailure);
} else {
return FailureHandlingResult.unrecoverable(
new JobException(
"Recovery is suppressed by " + restartBackoffTimeStrategy, cause),
globalFailure);
}
}
FailoverStrategy: Failover strategy .
- RestartAllFailoverStrategy: Use this strategy , When a fault occurs , The entire job will be restarted , That is, restart all Subtask.
- RestartPipelinedRegionFailoverStrategy: When a fault occurs , Restart failure occurs Subtask Where Region.
- RestartBackoffTimeStrategy: When Task In case of failure , Decide whether to restart and the delay time of restart .
FixedDelayRestartBackoffTimeStrategy: Allow tasks to restart a fixed number of times with a specified delay .
- FailureRateRestartBackoffTimeStrategy: Allow within a fixed failure frequency , Restart with a fixed delay .
- NoRestartBackoffTimeStrategy: No restart .
SchedulingStrategy: Task Execution instance scheduling policy
- EagerSchedulingStrategy: Hunger scheduling , Dispatch all at the same time Task.
- LazyFromSourcesSchedulingStrategy: When the consumed partition data is ready, it starts to schedule its follow-up Task, For batch tasks .
- PipelinedRegionSchedulingStrategy: With pipline Of the link Task For one Region, As its scheduling granularity .
2.2.2 Restart Task
private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) {
if (failureHandlingResult.canRestart()) {
restartTasksWithDelay(failureHandlingResult);
} else {
failJob(failureHandlingResult.getError());
}
}
private void restartTasksWithDelay(final FailureHandlingResult failureHandlingResult) {
final Set<ExecutionVertexID> verticesToRestart =
failureHandlingResult.getVerticesToRestart();
final Set<ExecutionVertexVersion> executionVertexVersions =
new HashSet<>(
executionVertexVersioner
.recordVertexModifications(verticesToRestart)
.values());
final boolean globalRecovery = failureHandlingResult.isGlobalFailure();
addVerticesToRestartPending(verticesToRestart);
// Cancel all that need to be restarted SubTask
final CompletableFuture<?> cancelFuture = cancelTasksAsync(verticesToRestart);
delayExecutor.schedule(
() ->
FutureUtils.assertNoException(
cancelFuture.thenRunAsync( // Restart after stopping
restartTasks(executionVertexVersions, globalRecovery),
getMainThreadExecutor())),
failureHandlingResult.getRestartDelayMS(),
TimeUnit.MILLISECONDS);
}
2.2.3 Cancel Task:
- Cancellation is waiting Slot The distribution of SubTask, If it is already in deployment / Running state , Notice is required TaskExecutor Perform the stop operation and wait for the operation to complete .
private CompletableFuture<?> cancelTasksAsync(final Set<ExecutionVertexID> verticesToRestart) {
// clean up all the related pending requests to avoid that immediately returned slot
// is used to fulfill the pending requests of these tasks
verticesToRestart.stream().forEach(executionSlotAllocator::cancel); // Cancellation may be waiting for allocation Slot Of SubTask
final List<CompletableFuture<?>> cancelFutures =
verticesToRestart.stream()
.map(this::cancelExecutionVertex) // Start to stop SubTask
.collect(Collectors.toList());
return FutureUtils.combineAll(cancelFutures);
}
public void cancel() {
while (true) {
// If the status change fails, try again
ExecutionState current = this.state;
if (current == CANCELING || current == CANCELED) {
// already taken care of, no need to cancel again
return;
}
else if (current == RUNNING || current == DEPLOYING) {
// The current status is set to CANCELING, And to TaskExecutor send out RPC Ask to stop SubTask
if (startCancelling(NUM_CANCEL_CALL_TRIES)) {
return;
}
} else if (current == FINISHED) {
// Even after running , You can't consume later , Release partition results
sendReleaseIntermediateResultPartitionsRpcCall();
return;
} else if (current == FAILED) {
return;
} else if (current == CREATED || current == SCHEDULED) {
// from here, we can directly switch to cancelled, because no task has been deployed
if (cancelAtomically()) {
return;
}
} else {
throw new IllegalStateException(current.name());
}
}
}
- After the operation, it will execute Task Exit process notification ExecutionGraph Perform the corresponding data update : ExecutionGraph.updateState(TaskExecutionStateTransition)->ExecutionGraph.updateStateInternal(TaskExecutionStateTransition, Execution) -> Execution.completeCancelling(…) -> Execution.finishCancellation(boolean) -> ExecutionGraph.deregisterExecution(Execution) . stay deregisterExecution The operation will be in currentExecutions Remove the stopped execution ExecutionTask.
2.2.4 Start Task
private Runnable restartTasks(
final Set<ExecutionVertexVersion> executionVertexVersions,
final boolean isGlobalRecovery) {
return () -> {
final Set<ExecutionVertexID> verticesToRestart =
executionVertexVersioner.getUnmodifiedExecutionVertices(
executionVertexVersions);
removeVerticesFromRestartPending(verticesToRestart);
// Instantiate a new SubTask Execution examples (Execution)
resetForNewExecutions(verticesToRestart);
try {
// Restore the state
restoreState(verticesToRestart, isGlobalRecovery);
} catch (Throwable t) {
handleGlobalFailure(t);
return;
}
// Start scheduling , apply Slot And deploy
schedulingStrategy.restartTasks(verticesToRestart);
};
}
边栏推荐
- 【无标题】
- 杰理之发射端在接收端关机之后假死机【篇】
- Left path cloud recursion + dynamic planning
- 密码学系列之:在线证书状态协议OCSP详解
- Redis introduction complete tutorial: client case analysis
- Jerry's broadcast has built-in flash prompt tone to control playback pause [chapter]
- 杰理之FM 模式单声道或立体声选择设置【篇】
- SQL中删除数据
- Le tube MOS réalise le circuit de commutation automatique de l'alimentation principale et de l'alimentation auxiliaire, et la chute de tension "zéro", courant statique 20ua
- netperf 而网络性能测量
猜你喜欢
Unity使用MaskableGraphic画一条带箭头的线
The solution of unable to create servlet file after idea restart
Leetcode-02 (linked list question)
Laravel php artisan 自动生成Model+Migrate+Controller 命令大全
Starting from 1.5, build a micro Service Framework -- log tracking traceid
2022 spring recruitment begins, and a collection of 10000 word interview questions will help you
Babbitt | metauniverse daily must read: is IP authorization the way to break the circle of NFT? What are the difficulties? How should holder choose the cooperation platform
[2022 national tournament simulation] polygon - computational geometry, binary answer, multiplication
Le tube MOS réalise le circuit de commutation automatique de l'alimentation principale et de l'alimentation auxiliaire, et la chute de tension "zéro", courant statique 20ua
Uniapp adaptation problem
随机推荐
Another million qubits! Israel optical quantum start-up company completed $15million financing
Construction of knowledge map of mall commodities
如何分析粉丝兴趣?
Jerry's ble exiting Bluetooth mode card machine [chapter]
杰理之在非蓝牙模式下,手机连接蓝牙不要跳回蓝牙模式处理方法【篇】
【安全的办公和生产力应用程序】上海道宁为您提供ONLYOFFICE下载、试用、教程
【基于 RT-Thread Studio的CPK-RA6M4 开发板环境搭建】
Domcontentloaded and window onload
Redis getting started complete tutorial: replication configuration
leetcode-02(链表题)
OC, OD, push-pull explanation of hardware
Room rate system - login optimization
美国空军研究实验室《探索深度学习系统的脆弱性和稳健性》2022年最新85页技术报告
Redis introduction complete tutorial: replication principle
[secretly kill little partner pytorch20 days] - [Day1] - [example of structured data modeling process]
Cglib agent in agent mode
cocos3——8.实现初学者指南
Starting from 1.5, build a micro Service Framework -- log tracking traceid
input_ delay
杰理之播内置 flash 提示音控制播放暂停【篇】