当前位置:网站首页>Flink Yarn Per Job - RM启动SlotManager
Flink Yarn Per Job - RM启动SlotManager
2022-08-05 11:05:00 【hyunbar】

ResourceManager
public final void onStart() throws Exception {
try {
startResourceManagerServices();
} catch (Throwable t) {
final ResourceManagerException exception = new ResourceManagerException(String.format("Could not start the ResourceManager %s", getAddress()), t);
onFatalError(exception);
throw exception;
}
}
private void startResourceManagerServices() throws Exception {
try {
leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
// 创建了Yarn的RM和NM的客户端,初始化并启动
initialize();
// 通过选举服务,启动ResourceManager
leaderElectionService.start(this);
jobLeaderIdService.start(new JobLeaderIdActionsImpl());
registerTaskExecutorMetrics();
} catch (Exception e) {
handleStartResourceManagerServicesException(e);
}
}
创建了Yarn的RM和NM的客户端,初始化并启动
通过选举服务,启动ResourceManager
创建了Yarn的RM和NM的客户端
ActiveResourceManager
@Override
protected void initialize() throws ResourceManagerException {
try {
resourceManagerDriver.initialize(
this,
new GatewayMainThreadExecutor(),
ioExecutor);
} catch (Exception e) {
throw new ResourceManagerException("Cannot initialize resource provider.", e);
}
}
AbstractResourceManagerDriver
@Override
public final void initialize(
ResourceEventHandler<WorkerType> resourceEventHandler,
ScheduledExecutor mainThreadExecutor,
Executor ioExecutor) throws Exception {
this.resourceEventHandler = Preconditions.checkNotNull(resourceEventHandler);
this.mainThreadExecutor = Preconditions.checkNotNull(mainThreadExecutor);
this.ioExecutor = Preconditions.checkNotNull(ioExecutor);
// 下追
initializeInternal();
}
protected abstract void initializeInternal() throws Exception;
YarnResourceManagerDriver
@Override
protected void initializeInternal() throws Exception {
final YarnContainerEventHandler yarnContainerEventHandler = new YarnContainerEventHandler();
try {
// 创建Yarn的ResourceManager的客户端,并且初始化和启动
resourceManagerClient = yarnResourceManagerClientFactory.createResourceManagerClient(
yarnHeartbeatIntervalMillis,
yarnContainerEventHandler);
resourceManagerClient.init(yarnConfig);
resourceManagerClient.start();
final RegisterApplicationMasterResponse registerApplicationMasterResponse = registerApplicationMaster();
getContainersFromPreviousAttempts(registerApplicationMasterResponse);
taskExecutorProcessSpecContainerResourcePriorityAdapter =
new TaskExecutorProcessSpecContainerResourcePriorityAdapter(
registerApplicationMasterResponse.getMaximumResourceCapability(),
ExternalResourceUtils.getExternalResources(flinkConfig, YarnConfigOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX));
} catch (Exception e) {
throw new ResourceManagerException("Could not start resource manager client.", e);
}
// 创建yarn的 NodeManager的客户端,并且初始化和启动
nodeManagerClient = yarnNodeManagerClientFactory.createNodeManagerClient(yarnContainerEventHandler);
nodeManagerClient.init(yarnConfig);
nodeManagerClient.start();
}
创建Yarn的ResourceManager的客户端,并且初始化和启动
创建yarn的 NodeManager的客户端,并且初始化和启动
启动SlotManager
StandaloneLeaderElectionService
@Override
public void start(LeaderContender newContender) throws Exception {
if (contender != null) {
// Service was already started
throw new IllegalArgumentException("Leader election service cannot be started multiple times.");
}
contender = Preconditions.checkNotNull(newContender);
// directly grant leadership to the given contender
contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
}
ResourceManager
public void grantLeadership(final UUID newLeaderSessionID) {
final CompletableFuture<Boolean> acceptLeadershipFuture = clearStateFuture
// 下追
.thenComposeAsync((ignored) -> tryAcceptLeadership(newLeaderSessionID), getUnfencedMainThreadExecutor());
... ...
}
private CompletableFuture<Boolean> tryAcceptLeadership(final UUID newLeaderSessionID) {
if (leaderElectionService.hasLeadership(newLeaderSessionID)) {
... ...
startServicesOnLeadership();
return prepareLeadershipAsync().thenApply(ignored -> true);
} else {
return CompletableFuture.completedFuture(false);
}
}
private void startServicesOnLeadership() { // 启动心跳服务:TaskManager、JobMaster startHeartbeatServices(); // 启动slotManager slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl()); onLeadership(); } |
启动心跳服务:TaskManager、JobMaster
启动slotManager
private void startHeartbeatServices() {
taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
resourceId,
new TaskManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
resourceId,
new JobManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
}
作为资源的老大,肯定要跟task小弟和job去通信
SlotManagerImpl
public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions) {
LOG.info("Starting the SlotManager.");
this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
resourceActions = Preconditions.checkNotNull(newResourceActions);
started = true;
taskManagerTimeoutsAndRedundancyCheck = scheduledExecutor.scheduleWithFixedDelay(
() -> mainThreadExecutor.execute(
// 检查超时和多余的TaskManager
() -> checkTaskManagerTimeoutsAndRedundancy()),
0L,
taskManagerTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
slotRequestTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(
() -> mainThreadExecutor.execute(
() -> checkSlotRequestTimeouts()),
0L,
slotRequestTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
registerSlotManagerMetrics();
}
检查超时和多余的TaskManager
void checkTaskManagerTimeoutsAndRedundancy() {
if (!taskManagerRegistrations.isEmpty()) {
long currentTime = System.currentTimeMillis();
ArrayList<TaskManagerRegistration> timedOutTaskManagers = new ArrayList<>(taskManagerRegistrations.size());
// first retrieve the timed out TaskManagers
for (TaskManagerRegistration taskManagerRegistration : taskManagerRegistrations.values()) {
if (currentTime - taskManagerRegistration.getIdleSince() >= taskManagerTimeout.toMilliseconds()) {
// we collect the instance ids first in order to avoid concurrent modifications by the
// ResourceActions.releaseResource call
timedOutTaskManagers.add(taskManagerRegistration);
}
}
int slotsDiff = redundantTaskManagerNum * numSlotsPerWorker - freeSlots.size();
if (freeSlots.size() == slots.size()) {
// No need to keep redundant taskManagers if no job is running.
// 如果没有job在运行,释放taskmanager
releaseTaskExecutors(timedOutTaskManagers, timedOutTaskManagers.size());
} else if (slotsDiff > 0) {
// Keep enough redundant taskManagers from time to time.
// 保证随时有足够额taskmanager
int requiredTaskManagers = MathUtils.divideRoundUp(slotsDiff, numSlotsPerWorker);
allocateRedundantTaskManagers(requiredTaskManagers);
} else {
// second we trigger the release resource callback which can decide upon the resource release
int maxReleaseNum = (-slotsDiff) / numSlotsPerWorker;
releaseTaskExecutors(timedOutTaskManagers, Math.min(maxReleaseNum, timedOutTaskManagers.size()));
}
}
}

边栏推荐
- #yyds干货盘点#JS数组和树相互转化
- R语言使用yardstick包的pr_curve函数评估多分类(Multiclass)模型的性能、查看模型在多分类每个分类上的ROC曲线(precision(精准率),R代表的是recall(召回率)
- 用KUSTO查询语句(KQL)在Azure Data Explorer Database上查询LOG实战
- API 网关简述
- 化繁为简!阿里新产亿级流量系统设计核心原理高级笔记(终极版)
- 【心里效应】98 个著名的心理效应
- The fuse: OAuth 2.0 four authorized login methods must read
- Chapter 4: In the activiti process, variable transmission and acquisition process variables, setting and acquiring multiple process variables, setting and acquiring local process variables "recommende
- Mathcad 15.0软件安装包下载及安装教程
- 并非富人专属,一文让你对NFT改观
猜你喜欢

ECCV 2022 | 视听分割:全新任务,助力视听场景像素级精细化理解

微信小程序标题栏封装

【OpenCV】-仿射变换

手把手教你定位线上MySQL慢查询问题,包教包会

智能算力的枢纽如何构建?中国云都的淮海智算中心打了个样

朴素贝叶斯

365 days challenge LeetCode1000 questions - Day 050 add a row to the binary tree binary tree

Common operations of oracle under linux and daily accumulation of knowledge points (functions, timed tasks)

巴比特 | 元宇宙每日必读:中国1775万件数字藏品分析报告显示,85%的已发行数藏开通了转赠功能...

金融业“限薪令”出台/ 软银出售过半阿里持仓/ DeepMind新实验室成立... 今日更多新鲜事在此...
随机推荐
【心里效应】98 个著名的心理效应
Support Vector Machine SVM
【OpenCV】-仿射变换
What are the standards for electrical engineering
Can't get in to ask questions.I want to ask you a question about the return value (traversal of the graph), please give Xiaobai an answer.
张朝阳对话俞敏洪:一边是手推物理公式,一边是古诗信手拈来
例题 可达性统计+bitset的使用
安全软件Avast与赛门铁克诺顿NortonLifeLock合并获英国批准
七夕来袭!还要做CDH数据迁移怎么办?来看看DistCp
机器学习——集成学习
Linux:记一次CentOS7安装MySQL8(博客合集)
四、kubeadm单master
导火索:OAuth 2.0四种授权登录方式必读
I'm going crazy.Again A few days can not be A problem
5G NR system messages
Android development with Kotlin programming language three loop control
TiDB 6.0 Placement Rules In SQL 使用实践
发现C语言的乐趣
如何用Golang来手写一个Blog - Milu.blog 开发总结
Three methods for extracting facial features