当前位置:网站首页>Flink task exit process and failover mechanism
Flink task exit process and failover mechanism
2022-07-07 03:17:00 【Direction_ Wind】
Here's the catalog title
1 TaskExecutor End Task Exit logic
Task.doRun() guide Task The core method of initializing and executing its related code ,
Construct and instantiate Task The executable object of : AbstractInvokable invokable.
call AbstractInvokable.invoke() To start Task Calculation logic included .
When AbstractInvokable.invoke() After executing exit , Perform the corresponding operation according to the exit type :
- Exit after normal execution : Output ResultPartition Buffer data , And close the buffer , Mark Task by Finished;
- Canceling the operation results in exiting : Mark Task by CANCELED, Close user code ;
- AbstractInvokable.invoke() An exception is thrown during execution to exit : Mark Task by FAILED, Close user code , Record exceptions ;
- AbstractInvokable.invoke() In the process of execution JVM Throw an error : Forcibly terminate the virtual machine , Exit the current process .
Then release Task Related networks 、 Memory 、 File system resources . Finally through Task->TaskManager->JobMaster The delivery link of will Task Notify Leader JobMaster Threads .
Task.notifyFinalState() -> TaskManagerActions.updateTaskExecutionState(TaskExecutionState) -> JobMasterGateway.updateTaskExecutionState(TaskExecutionState)
- TaskExecutionState Key information carried :
TaskExecutionState {
JobID // Mission ID
ExecutionAttemptID // Task The only execution ID, Mark each execution
ExecutionState // Enumerated values ,Task Execution status
SerializedThrowable // if Task Throw an exception , This field records exception stack information
...
}
- Task Perform state transitions :
CREATED -> SCHEDULED -> DEPLOYING -> RUNNING -> FINISHED
| | | |
| | | +------+
| | V V
| | CANCELLING -----+----> CANCELED
| | |
| +-------------------------+
|
| ... -> FAILED
V
RECONCILING -> RUNNING | FINISHED | CANCELED | FAILED
2 JobMaster End failover technological process
2.1 Task Execute State Handle
JobMaster received TaskManager adopt rpc Sent task Execution status change information , Will notify the current Flink Job scheduler (SchedulerNG) Handle , Because they are all called through the same thread , Follow up on ExecutionGraph( Run time execution plan )、failover Count stateful instances such as read/write There will be no thread safety problems in operation .
JobMaster The core processing logic is SchedulerBase.updateTaskExecutionState(TaskExecutionStateTransition) in (TaskExecutionStateTransition Mainly TaskExecutionState Readability encapsulation ).
Processing logic : Try to receive Task The execution status information is updated to ExecutionGraph in . If the update is successful and target Status as FINISHED, According to the concrete SchedulingStrategy Implementation strategy , Select the consumable result partition and schedule the corresponding consumers Task; If the update is successful and target Status as FAILED, Enter specific failover technological process .
- SchedulerBase.updateTaskExecutionState(TaskExecutionStateTransition) :
public final boolean updateTaskExecutionState(
final TaskExecutionStateTransition taskExecutionState) {
final Optional<ExecutionVertexID> executionVertexId =
getExecutionVertexId(taskExecutionState.getID());
boolean updateSuccess = executionGraph.updateState(taskExecutionState);
if (updateSuccess) {
checkState(executionVertexId.isPresent());
if (isNotifiable(executionVertexId.get(), taskExecutionState)) {
updateTaskExecutionStateInternal(executionVertexId.get(), taskExecutionState);
}
return true;
} else {
return false;
}
}
- ExecutionGraph.updateState(TaskExecutionStateTransition): The target cannot be found in the current physical execution topology ExecutionAttemptID when , The update will fail . What needs to be noticed is this ID Used to uniquely identify a Execution, and Execution Then represent ExecutionVertex( Represents a topological vertex subTask plan ) An execution instance of ,ExecutionVertex Can be repeated many times . This means when there is subTask Rerun ,currentExecutions Will no longer hold the last executed ID Information .
/** * Updates the state of one of the ExecutionVertex's Execution attempts. If the new status if * "FINISHED", this also updates the accumulators. * * @param state The state update. * @return True, if the task update was properly applied, false, if the execution attempt was * not found. */
public boolean updateState(TaskExecutionStateTransition state) {
assertRunningInJobMasterMainThread();
final Execution attempt = currentExecutions.get(state.getID());
if (attempt != null) {
try {
final boolean stateUpdated = updateStateInternal(state, attempt);
maybeReleasePartitions(attempt);
return stateUpdated;
} catch (Throwable t) {
......
return false;
}
} else {
return false;
}
}
JobMaster: The central operation class responsible for a task topology , Involving job scheduling , Resource management , External communication, etc …
SchedulerNG: Be responsible for scheduling job topology . All calls to methods of this kind of object will pass ComponentMainThreadExecutor Trigger , There will be no concurrent calls .
ExecutionGraph: The central data structure of the current execution topology , Coordinate the Execution. Describe the various aspects of the whole task SubTask And its partition data , And keep in communication with them .
2.2 Job Failover
2.2.1 Task Failure Handle
- Task The main process of exception is DefaultScheduler.handleTaskFailure(ExecutionVertexID, Throwable), according to RestartBackoffTimeStrategy Determine whether to restart or failed-job; according to FailoverStrategy Select the one that needs to be restarted SubTask; Finally, according to the current task SchedulingStrategy Execute the corresponding scheduling strategy and restart the corresponding Subtask.
private void handleTaskFailure(
final ExecutionVertexID executionVertexId, @Nullable final Throwable error) {
// Update the current task exception information
setGlobalFailureCause(error);
// If the relevant operator (source、sink) There is coordinator, Know its further operation
notifyCoordinatorsAboutTaskFailure(executionVertexId, error);
// Apply the current restart-stratege And get the FailureHandlingResult
final FailureHandlingResult failureHandlingResult =
executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
// Restart according to the result Task Or fail the task
maybeRestartTasks(failureHandlingResult);
}
public class FailureHandlingResult {
// Recover all that needs to be restarted SubTask
Set<ExecutionVertexID> verticesToRestart;
// Restart delay
long restartDelayMS;
// The root of all evil
Throwable error;
// Whether the global failure
boolean globalFailure;
}
- ExecutionFailureHandler: Handling exception information , Return the exception handling result according to the current application policy .
public FailureHandlingResult getFailureHandlingResult(
ExecutionVertexID failedTask, Throwable cause) {
return handleFailure(
cause,
failoverStrategy.getTasksNeedingRestart(failedTask, cause), // Select the one that needs to be restarted SubTask
false);
}
private FailureHandlingResult handleFailure(
final Throwable cause,
final Set<ExecutionVertexID> verticesToRestart,
final boolean globalFailure) {
if (isUnrecoverableError(cause)) {
return FailureHandlingResult.unrecoverable(
new JobException("The failure is not recoverable", cause), globalFailure);
}
restartBackoffTimeStrategy.notifyFailure(cause);
if (restartBackoffTimeStrategy.canRestart()) {
numberOfRestarts++;
return FailureHandlingResult.restartable(
verticesToRestart, restartBackoffTimeStrategy.getBackoffTime(), globalFailure);
} else {
return FailureHandlingResult.unrecoverable(
new JobException(
"Recovery is suppressed by " + restartBackoffTimeStrategy, cause),
globalFailure);
}
}
FailoverStrategy: Failover strategy .
- RestartAllFailoverStrategy: Use this strategy , When a fault occurs , The entire job will be restarted , That is, restart all Subtask.
- RestartPipelinedRegionFailoverStrategy: When a fault occurs , Restart failure occurs Subtask Where Region.
- RestartBackoffTimeStrategy: When Task In case of failure , Decide whether to restart and the delay time of restart .
FixedDelayRestartBackoffTimeStrategy: Allow tasks to restart a fixed number of times with a specified delay .
- FailureRateRestartBackoffTimeStrategy: Allow within a fixed failure frequency , Restart with a fixed delay .
- NoRestartBackoffTimeStrategy: No restart .
SchedulingStrategy: Task Execution instance scheduling policy
- EagerSchedulingStrategy: Hunger scheduling , Dispatch all at the same time Task.
- LazyFromSourcesSchedulingStrategy: When the consumed partition data is ready, it starts to schedule its follow-up Task, For batch tasks .
- PipelinedRegionSchedulingStrategy: With pipline Of the link Task For one Region, As its scheduling granularity .
2.2.2 Restart Task
private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) {
if (failureHandlingResult.canRestart()) {
restartTasksWithDelay(failureHandlingResult);
} else {
failJob(failureHandlingResult.getError());
}
}
private void restartTasksWithDelay(final FailureHandlingResult failureHandlingResult) {
final Set<ExecutionVertexID> verticesToRestart =
failureHandlingResult.getVerticesToRestart();
final Set<ExecutionVertexVersion> executionVertexVersions =
new HashSet<>(
executionVertexVersioner
.recordVertexModifications(verticesToRestart)
.values());
final boolean globalRecovery = failureHandlingResult.isGlobalFailure();
addVerticesToRestartPending(verticesToRestart);
// Cancel all that need to be restarted SubTask
final CompletableFuture<?> cancelFuture = cancelTasksAsync(verticesToRestart);
delayExecutor.schedule(
() ->
FutureUtils.assertNoException(
cancelFuture.thenRunAsync( // Restart after stopping
restartTasks(executionVertexVersions, globalRecovery),
getMainThreadExecutor())),
failureHandlingResult.getRestartDelayMS(),
TimeUnit.MILLISECONDS);
}
2.2.3 Cancel Task:
- Cancellation is waiting Slot The distribution of SubTask, If it is already in deployment / Running state , Notice is required TaskExecutor Perform the stop operation and wait for the operation to complete .
private CompletableFuture<?> cancelTasksAsync(final Set<ExecutionVertexID> verticesToRestart) {
// clean up all the related pending requests to avoid that immediately returned slot
// is used to fulfill the pending requests of these tasks
verticesToRestart.stream().forEach(executionSlotAllocator::cancel); // Cancellation may be waiting for allocation Slot Of SubTask
final List<CompletableFuture<?>> cancelFutures =
verticesToRestart.stream()
.map(this::cancelExecutionVertex) // Start to stop SubTask
.collect(Collectors.toList());
return FutureUtils.combineAll(cancelFutures);
}
public void cancel() {
while (true) {
// If the status change fails, try again
ExecutionState current = this.state;
if (current == CANCELING || current == CANCELED) {
// already taken care of, no need to cancel again
return;
}
else if (current == RUNNING || current == DEPLOYING) {
// The current status is set to CANCELING, And to TaskExecutor send out RPC Ask to stop SubTask
if (startCancelling(NUM_CANCEL_CALL_TRIES)) {
return;
}
} else if (current == FINISHED) {
// Even after running , You can't consume later , Release partition results
sendReleaseIntermediateResultPartitionsRpcCall();
return;
} else if (current == FAILED) {
return;
} else if (current == CREATED || current == SCHEDULED) {
// from here, we can directly switch to cancelled, because no task has been deployed
if (cancelAtomically()) {
return;
}
} else {
throw new IllegalStateException(current.name());
}
}
}
- After the operation, it will execute Task Exit process notification ExecutionGraph Perform the corresponding data update : ExecutionGraph.updateState(TaskExecutionStateTransition)->ExecutionGraph.updateStateInternal(TaskExecutionStateTransition, Execution) -> Execution.completeCancelling(…) -> Execution.finishCancellation(boolean) -> ExecutionGraph.deregisterExecution(Execution) . stay deregisterExecution The operation will be in currentExecutions Remove the stopped execution ExecutionTask.
2.2.4 Start Task
private Runnable restartTasks(
final Set<ExecutionVertexVersion> executionVertexVersions,
final boolean isGlobalRecovery) {
return () -> {
final Set<ExecutionVertexID> verticesToRestart =
executionVertexVersioner.getUnmodifiedExecutionVertices(
executionVertexVersions);
removeVerticesFromRestartPending(verticesToRestart);
// Instantiate a new SubTask Execution examples (Execution)
resetForNewExecutions(verticesToRestart);
try {
// Restore the state
restoreState(verticesToRestart, isGlobalRecovery);
} catch (Throwable t) {
handleGlobalFailure(t);
return;
}
// Start scheduling , apply Slot And deploy
schedulingStrategy.restartTasks(verticesToRestart);
};
}
边栏推荐
- 杰理之发射端在接收端关机之后假死机【篇】
- Babbitt | metauniverse daily must read: is IP authorization the way to break the circle of NFT? What are the difficulties? How should holder choose the cooperation platform
- 首届“量子计算+金融科技应用”研讨会在京成功举办
- Development of wireless communication technology, cv5200 long-distance WiFi module, UAV WiFi image transmission application
- Left path cloud recursion + dynamic planning
- QT Bluetooth: qbluetooth DeviceInfo
- Change your posture to do operation and maintenance! GOPs 2022 Shenzhen station highlights first!
- 从 1.5 开始搭建一个微服务框架——日志追踪 traceId
- Codeforces round 264 (Div. 2) C gargari and Bishop [violence]
- mos管實現主副電源自動切換電路,並且“零”壓降,靜態電流20uA
猜你喜欢
源代码保密的意义和措施
[tools] basic concept of database and MySQL installation
Leetcode-02 (linked list question)
【安全的办公和生产力应用程序】上海道宁为您提供ONLYOFFICE下载、试用、教程
mos管實現主副電源自動切換電路,並且“零”壓降,靜態電流20uA
leetcode
Appx代码签名指南
制作(转换)ico图标
Le tube MOS réalise le circuit de commutation automatique de l'alimentation principale et de l'alimentation auxiliaire, et la chute de tension "zéro", courant statique 20ua
OC, OD, push-pull explanation of hardware
随机推荐
leetcode
Es6中Promise的使用
c语言(字符串)如何把字符串中某个指定的字符删除?
简单冒泡排序
“去虚向实”大潮下,百度智能云向实而生
Jerry's ble exiting Bluetooth mode card machine [chapter]
[tools] basic concept of database and MySQL installation
Hazel engine learning (V)
sshd[12282]: fatal: matching cipher is not supported: [email protected] [preauth]
Centerx: open centernet in the way of socialism with Chinese characteristics
c语言字符串排序
OC, OD, push-pull explanation of hardware
Detailed explanation of 19 dimensional integrated navigation module sinsgps in psins (time synchronization part)
Room rate system - login optimization
尚硅谷JVM-第一章 类加载子系统
Utilisation de la promesse dans es6
SSL证书错误怎么办?浏览器常见SSL证书报错解决办法
How to verify accesstoken in oauth2 protocol
Redis introduction complete tutorial: replication principle
Matlab Error (Matrix dimensions must agree)