当前位置:网站首页>Flink Yarn Per Job - JobManger 申请 Slot
Flink Yarn Per Job - JobManger 申请 Slot
2022-08-05 11:05:00 【hyunbar】
JobMaster 启动时,启动 SlotPool,向 ResourceManager 注册
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4wXufj8G-1659612338059)(https://mmbiz.qpic.cn/mmbiz_svg/lpHDr05YrIRQgT2sib9SWGE99gWsPW7x8FP6LoMYvgkmO9jSQjFqfcMVBUsiaSKickqickc7k2Kwah52tbzMnA316k2WA88rXLAX/640?wx_fmt=svg)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-v0klERL9-1659612338059)(https://mmbiz.qpic.cn/mmbiz_svg/ibKHP1TZZeXLuZkD53jFWzc8iauhHlerlWDib9Dgm1JiaSF9LB4RGxxD4cSFrUoIeI4fvic7VPGpKGv8AqCJgcUeqLoOAXBW6kKov/640?wx_fmt=svg)]
启动SlotPool
JobMaster
private void startJobMasterServices() throws Exception {
// 启动心跳服务:taskmanager、resourcemanager
startHeartbeatServices();
// start the slot pool make sure the slot pool now accepts messages for this leader
// 启动 slotpool
slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());
//TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start
// try to reconnect to previously known leader
// 连接到之前已知的 ResourceManager
reconnectToResourceManager(new FlinkException("Starting JobMaster component."));
// job is ready to go, try to establish connection with resource manager
// - activate leader retrieval for the resource manager
// - on notification of the leader, the connection will be established and
// the slot pool will start requesting slots
// 与ResourceManager建立连接,slotpool开始请求资源
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
}
启动心跳服务:taskmanager、resourcemanager
启动 slotpool
连接到之前已知的 ResourceManager
slotpool开始请求资源
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Aw0jORrK-1659612338059)(https://mmbiz.qpic.cn/mmbiz_svg/lpHDr05YrIRQgT2sib9SWGE99gWsPW7x8FP6LoMYvgkmO9jSQjFqfcMVBUsiaSKickqickc7k2Kwah52tbzMnA316k2WA88rXLAX/640?wx_fmt=svg)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-z1OedAhy-1659612338060)(https://mmbiz.qpic.cn/mmbiz_svg/ibKHP1TZZeXLuZkD53jFWzc8iauhHlerlWDib9Dgm1JiaSF9LB4RGxxD4cSFrUoIeI4fvic7VPGpKGv8AqCJgcUeqLoOAXBW6kKov/640?wx_fmt=svg)]
向RM注册
StandaloneLeaderRetrievalService
@Override
public void start(LeaderRetrievalListener listener) {
checkNotNull(listener, "Listener must not be null.");
synchronized (startStopLock) {
checkState(!started, "StandaloneLeaderRetrievalService can only be started once.");
started = true;
// directly notify the listener, because we already know the leading JobManager's address
listener.notifyLeaderAddress(leaderAddress, leaderId);
}
}
ResourceManagerLeaderListener in TaskExecutor
private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {
@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
runAsync(
() -> notifyOfNewResourceManagerLeader(
leaderAddress,
ResourceManagerId.fromUuidOrNull(leaderSessionID)));
}
}
private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {
resourceManagerAddress = createResourceManagerAddress(newLeaderAddress, newResourceManagerId);
reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", resourceManagerAddress)));
}
private void reconnectToResourceManager(Exception cause) {
closeResourceManagerConnection(cause);
startRegistrationTimeout();
tryConnectToResourceManager();
}
private void tryConnectToResourceManager() {
if (resourceManagerAddress != null) {
connectToResourceManager();
}
}
private void connectToResourceManager() {
... ...
resourceManagerConnection.start();
}
RegisteredRpcConnection
public void start() {
checkState(!closed, "The RPC connection is already closed");
checkState(!isConnected() && pendingRegistration == null, "The RPC connection is already started");
// 创建注册对象
final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();
if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
// 开始注册,注册成功之后,调用 onRegistrationSuccess()
newRegistration.startRegistration();
} else {
// concurrent start operation
newRegistration.cancel();
}
}
private RetryingRegistration<F, G, S> createNewRegistration() {
RetryingRegistration<F, G, S> newRegistration = checkNotNull(generateRegistration());
... ...
return newRegistration;
}
创建注册对象
开始注册,注册成功之后,调用 onRegistrationSuccess()
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DDQ64O0N-1659612338060)(https://mmbiz.qpic.cn/mmbiz_svg/lpHDr05YrIRQgT2sib9SWGE99gWsPW7x8FP6LoMYvgkmO9jSQjFqfcMVBUsiaSKickqickc7k2Kwah52tbzMnA316k2WA88rXLAX/640?wx_fmt=svg)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xC5Swm4q-1659612338060)(https://mmbiz.qpic.cn/mmbiz_svg/ibKHP1TZZeXLuZkD53jFWzc8iauhHlerlWDib9Dgm1JiaSF9LB4RGxxD4cSFrUoIeI4fvic7VPGpKGv8AqCJgcUeqLoOAXBW6kKov/640?wx_fmt=svg)]
SlotPool 申请 slot
ResourceManagerConnection in JobMaster
protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
runAsync(() -> {
// filter out outdated connections
//noinspection ObjectEquality
if (this == resourceManagerConnection) {
establishResourceManagerConnection(success);
}
});
}
private void establishResourceManagerConnection(final JobMasterRegistrationSuccess success) {
final ResourceManagerId resourceManagerId = success.getResourceManagerId();
... ...
// slotpool连接到ResourceManager,请求资源
slotPool.connectToResourceManager(resourceManagerGateway);
... ...
}
slotpool连接到ResourceManager,请求资源
SlotPoolImpl
public void connectToResourceManager(@Nonnull ResourceManagerGateway resourceManagerGateway) {
this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
// work on all slots waiting for this connection
for (PendingRequest pendingRequest : waitingForResourceManager.values()) {
requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
}
// all sent off
waitingForResourceManager.clear();
}
private void requestSlotFromResourceManager(
final ResourceManagerGateway resourceManagerGateway,
final PendingRequest pendingRequest) {
... ...
CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(
jobMasterId,
new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),
rpcTimeout);
... ...
}
从RM中申请slot
ResourceManager
@Override
public CompletableFuture<Acknowledge> requestSlot(
JobMasterId jobMasterId,
SlotRequest slotRequest,
final Time timeout) {
JobID jobId = slotRequest.getJobId();
JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
if (null != jobManagerRegistration) {
if (Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) {
log.info("Request slot with profile {} for job {} with allocation id {}.",
slotRequest.getResourceProfile(),
slotRequest.getJobId(),
slotRequest.getAllocationId());
try {
// RM内部的 slotManager去向 Yarn的ResourceManager申请资源
slotManager.registerSlotRequest(slotRequest);
} catch (ResourceManagerException e) {
return FutureUtils.completedExceptionally(e);
}
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
return FutureUtils.completedExceptionally(new ResourceManagerException("The job leader's id " +
jobManagerRegistration.getJobMasterId() + " does not match the received id " + jobMasterId + '.'));
}
} else {
return FutureUtils.completedExceptionally(new ResourceManagerException("Could not find registered job manager for job " + jobId + '.'));
}
}
RM内部的 slotManager去向 Yarn的ResourceManager申请资源
SlotManagerImpl
public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException {
checkInit();
... ...
try {
internalRequestSlot(pendingSlotRequest);
} catch (ResourceManagerException e) {
// requesting the slot failed --> remove pending slot request
pendingSlotRequests.remove(slotRequest.getAllocationId());
throw new ResourceManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);
}
return true;
}
}
private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
OptionalConsumer.of(findMatchingSlot(resourceProfile))
.ifPresent(taskManagerSlot -> allocateSlot(taskManagerSlot, pendingSlotRequest))
.ifNotPresent(() -> fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest));
}
private void fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
Optional<PendingTaskManagerSlot> pendingTaskManagerSlotOptional = findFreeMatchingPendingTaskManagerSlot(resourceProfile);
if (!pendingTaskManagerSlotOptional.isPresent()) {
pendingTaskManagerSlotOptional = allocateResource(resourceProfile);
}
... ...
}
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-s7gTM4KK-1659612338060)(https://mmbiz.qpic.cn/mmbiz_svg/lpHDr05YrIRQgT2sib9SWGE99gWsPW7x8FP6LoMYvgkmO9jSQjFqfcMVBUsiaSKickqickc7k2Kwah52tbzMnA316k2WA88rXLAX/640?wx_fmt=svg)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dtgk4Mxi-1659612338060)(https://mmbiz.qpic.cn/mmbiz_svg/ibKHP1TZZeXLuZkD53jFWzc8iauhHlerlWDib9Dgm1JiaSF9LB4RGxxD4cSFrUoIeI4fvic7VPGpKGv8AqCJgcUeqLoOAXBW6kKov/640?wx_fmt=svg)]
Flink内的RM向Yarn的RM申请资源
SlotManagerImpl
private Optional<PendingTaskManagerSlot> allocateResource(ResourceProfile requestedSlotResourceProfile) {
... ...
if (!resourceActions.allocateResource(defaultWorkerResourceSpec)) {
// resource cannot be allocated
return Optional.empty();
}
... ...
}
ResourceActionsImpl in ResourceManager
public boolean allocateResource(WorkerResourceSpec workerResourceSpec) {
validateRunsInMainThread();
return startNewWorker(workerResourceSpec);
}
ActiveResourceManager
public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
requestNewWorker(workerResourceSpec);
return true;
}
// 从配置中获取 taskexecutor 配置
private void requestNewWorker(WorkerResourceSpec workerResourceSpec) {
final TaskExecutorProcessSpec taskExecutorProcessSpec =
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec);
final int pendingCount = pendingWorkerCounter.increaseAndGet(workerResourceSpec);
log.info("Requesting new worker with resource spec {}, current pending count: {}.",
workerResourceSpec,
pendingCount);
// 申请资源
CompletableFuture<WorkerType> requestResourceFuture = resourceManagerDriver.requestResource(taskExecutorProcessSpec);
FutureUtils.assertNoException(
requestResourceFuture.handle((worker, exception) -> {
if (exception != null) {
final int count = pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
log.warn("Failed requesting worker with resource spec {}, current pending count: {}, exception: {}",
workerResourceSpec,
count,
exception);
requestWorkerIfRequired();
} else {
final ResourceID resourceId = worker.getResourceID();
workerNodeMap.put(resourceId, worker);
currentAttemptUnregisteredWorkers.put(resourceId, workerResourceSpec);
log.info("Requested worker {} with resource spec {}.",
resourceId.getStringWithMetadata(),
workerResourceSpec);
}
return null;
}));
}
从配置中获取 taskexecutor 配置
申请资源
YarnResourceManagerDriver
public CompletableFuture<YarnWorkerNode> requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
checkInitialized();
final CompletableFuture<YarnWorkerNode> requestResourceFuture = new CompletableFuture<>();
final Optional<TaskExecutorProcessSpecContainerResourcePriorityAdapter.PriorityAndResource> priorityAndResourceOpt =
taskExecutorProcessSpecContainerResourcePriorityAdapter.getPriorityAndResource(taskExecutorProcessSpec);
if (!priorityAndResourceOpt.isPresent()) {
requestResourceFuture.completeExceptionally(
new ResourceManagerException(
String.format("Could not compute the container Resource from the given TaskExecutorProcessSpec %s. " +
"This usually indicates the requested resource is larger than Yarn's max container resource limit.",
taskExecutorProcessSpec)));
} else {
final Priority priority = priorityAndResourceOpt.get().getPriority();
final Resource resource = priorityAndResourceOpt.get().getResource();
resourceManagerClient.addContainerRequest(getContainerRequest(resource, priority));
// make sure we transmit the request fast and receive fast news of granted allocations
resourceManagerClient.setHeartbeatInterval(containerRequestHeartbeatIntervalMillis);
requestResourceFutures.computeIfAbsent(taskExecutorProcessSpec, ignore -> new LinkedList<>()).add(requestResourceFuture);
log.info("Requesting new TaskExecutor container with resource {}, priority {}.", taskExecutorProcessSpec, priority);
}
return requestResourceFuture;
}
边栏推荐
- 拓朴排序例题
- Chapter 4: activiti RuntimeService settings get and get process variables, and the difference from taskService, set process variables when starting and completing tasks [easy to understand]
- 例题 可达性统计+bitset的使用
- Leetcode刷题——623. 在二叉树中增加一行
- SMB + SMB2: Accessing shares return an error after prolonged idle period
- PostgreSQL 2022 Report: Rising popularity, open source, reliability and scaling key
- 软件测试之集成测试
- PPOCR 检测器配置文件参数详解
- TiDB 6.0 Placement Rules In SQL Usage Practice
- 我要抓狂了。。又回到了几天不能A一道题的时候
猜你喜欢
使用Windbg过程中两个使用细节分享
MySQL 中 auto_increment 自动插入主键值
微信小程序标题栏封装
Naive bayes
DocuWare平台——文档管理的内容服务和工作流自动化的平台详细介绍(下)
Nature:猪死亡1小时后,器官再次运转
This notebook of concurrent programming knowledge points strongly recommended by Ali will be a breakthrough for you to get an offer from a big factory
012_SSS_ Improving Diffusion Model Efficiency Through Patching
Letter from Silicon Valley: Act fast, Facebook, Quora and other successful "artifacts"!
Leetcode刷题——623. 在二叉树中增加一行
随机推荐
数据可视化(二)
shell编程流程控制练习
Header file search rules when compiling with GCC
Guys, I am a novice. I use flinksql to write a simple count of user visits according to the document, but it ends after executing it once.
软件测试之集成测试
TiDB 6.0 Placement Rules In SQL 使用实践
[Translation] Chaos Net + SkyWalking: Better observability for chaos engineering
Three methods for extracting facial features
nyoj757 期末考试 (优先队列)
RT - Thread record (a, RT, RT Thread version - Thread Studio development environment and cooperate CubeMX quick-and-dirty)
PPOCR 检测器配置文件参数详解
一张图看懂 SQL 的各种 join 用法!
.NET深入解析LINQ框架(六:LINQ执行表达式)
UDP通信
【加密解密】明文加密解密-已实现【已应用】
HDD杭州站•ArkUI让开发更灵活
Login function and logout function (St. Regis Takeaway)
GCC编译的时候头文件搜索规则
安全软件Avast与赛门铁克诺顿NortonLifeLock合并获英国批准
Use KUSTO query statement (KQL) to query LOG on Azure Data Explorer Database