当前位置:网站首页>Flink Yarn Per Job - 提交应用
Flink Yarn Per Job - 提交应用
2022-08-03 09:01:00 【hyunbar】

YarnClusterDescriptor
private ApplicationReport startAppMaster(
Configuration configuration,
String applicationName,
String yarnClusterEntrypoint,
JobGraph jobGraph,
YarnClient yarnClient,
YarnClientApplication yarnApplication,
ClusterSpecification clusterSpecification) throws Exception {
... ...
yarnClient.submitApplication(appContext);
... ...
}
YarnClientImpl extends YarnClient
public ApplicationId submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException {
ApplicationId applicationId = appContext.getApplicationId();
if (applicationId == null) {
throw new ApplicationIdNotProvidedException("ApplicationId is not provided in ApplicationSubmissionContext");
} else {
SubmitApplicationRequest request = (SubmitApplicationRequest)Records.newRecord(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(appContext);
// 重要
this.rmClient.submitApplication(request);
... ...
}
}

ApplicationClientProtocolPBClientImpl implements ApplicationClientProtocol
public SubmitApplicationResponse submitApplication(SubmitApplicationRequest request) throws YarnException, IOException {
SubmitApplicationRequestProto requestProto = ((SubmitApplicationRequestPBImpl)request).getProto();
try {
return new SubmitApplicationResponsePBImpl(this.proxy.submitApplication((RpcController)null, requestProto));
} catch (ServiceException var4) {
RPCUtil.unwrapAndThrowException(var4);
return null;
}
}
ClientRMService
@Override
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnException {
ApplicationSubmissionContext submissionContext = request
.getApplicationSubmissionContext();
ApplicationId applicationId = submissionContext.getApplicationId();
// ApplicationSubmissionContext needs to be validated for safety - only
// those fields that are independent of the RM's configuration will be
// checked here, those that are dependent on RM configuration are validated
// in RMAppManager.
String user = null;
try {
// Safety
// 安全校验
user = UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException ie) {
LOG.warn("Unable to get the current user.", ie);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
ie.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId);
throw RPCUtil.getRemoteException(ie);
}
// Check whether app has already been put into rmContext,
// If it is, simply return the response
if (rmContext.getRMApps().get(applicationId) != null) {
LOG.info("This is an earlier submitted application: " + applicationId);
return SubmitApplicationResponse.newInstance();
}
if (submissionContext.getQueue() == null) {
// DEFAULT_QUEUE_NAME = "default"
submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
}
if (submissionContext.getApplicationName() == null) {
submissionContext.setApplicationName(
// DEFAULT_APPLICATION_NAME = "N/A"
YarnConfiguration.DEFAULT_APPLICATION_NAME);
}
if (submissionContext.getApplicationType() == null) {
submissionContext
// DEFAULT_APPLICATION_TYPE = "YARN"
.setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE);
} else {
if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) {
submissionContext.setApplicationType(submissionContext
.getApplicationType().substring(0,
YarnConfiguration.APPLICATION_TYPE_LENGTH));
}
}
try {
// call RMAppManager to submit application directly
// 直接调用 RMAppManager 提交应用
rmAppManager.submitApplication(submissionContext,
System.currentTimeMillis(), user);
LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user);
RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
"ClientRMService", applicationId);
} catch (YarnException e) {
LOG.info("Exception in submitting application with id " +
applicationId.getId(), e);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
e.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId);
throw e;
}
SubmitApplicationResponse response = recordFactory
.newRecordInstance(SubmitApplicationResponse.class);
return response;
}
// call RMAppManager to submit application directly
// 直接调用 RMAppManager 提交应用
rmAppManager.submitApplication(submissionContext,
System.currentTimeMillis(), user);

边栏推荐
猜你喜欢

获取JDcookie的方法

Path Prefixes (倍增!树上の二分)

The Transformer, BERT, GPT paper intensive reading notes

行业洞察 | 如何更好的实现与虚拟人的互动体验?

RSTP(端口角色+端口状态+工作机制)|||| 交换机接口分析

【论文笔记】基于动作空间划分的MAXQ自动分层方法

线程介绍与使用

English Grammar - Adverbial Clauses

Industry SaaS Microservice Stability Guarantee Actual Combat

Using pipreqs export requirements needed for the project. TXT (rather than the whole environment)
随机推荐
Batch PNG format can be converted to JPG format
【LeetCode】112. Path sum
响应式布局经典范例——巨幅背景大标题
WPS EXCEL 筛选指定长度的文本 内容 字符串
dflow入门1——HelloWorld!
SQL每日一练(牛客新题库)——第5天:高级查询
The display of the article list and the basics of creating articles and article details
LINGO 18.0软件安装包下载及安装教程
10分钟带你入门chrome(谷歌)浏览器插件开发
unity的game界面里有canvas的线框?如何隐藏掉?
STP普通生成树安全特性— bpduguard特性 + bpdufilter特性 + guard root 特性 III loopguard技术( 详解+配置)
线性表
Network LSTM both short-term and long-term memory
【LeetCode】622. Design Circular Queue
Path Prefixes (倍增!树上の二分)
JMeter接口自动化发包与示例
验证浮点数输入
QT中线程调用GUI主线程控件的问题
机器学习(公式推导与代码实现)--sklearn机器学习库
面渣逆袭:MySQL六十六问,两万字+五十图详解