当前位置:网站首页>Flink Yarn Per Job - 提交应用
Flink Yarn Per Job - 提交应用
2022-08-03 09:01:00 【hyunbar】

YarnClusterDescriptor
private ApplicationReport startAppMaster(
Configuration configuration,
String applicationName,
String yarnClusterEntrypoint,
JobGraph jobGraph,
YarnClient yarnClient,
YarnClientApplication yarnApplication,
ClusterSpecification clusterSpecification) throws Exception {
... ...
yarnClient.submitApplication(appContext);
... ...
}
YarnClientImpl extends YarnClient
public ApplicationId submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException {
ApplicationId applicationId = appContext.getApplicationId();
if (applicationId == null) {
throw new ApplicationIdNotProvidedException("ApplicationId is not provided in ApplicationSubmissionContext");
} else {
SubmitApplicationRequest request = (SubmitApplicationRequest)Records.newRecord(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(appContext);
// 重要
this.rmClient.submitApplication(request);
... ...
}
}

ApplicationClientProtocolPBClientImpl implements ApplicationClientProtocol
public SubmitApplicationResponse submitApplication(SubmitApplicationRequest request) throws YarnException, IOException {
SubmitApplicationRequestProto requestProto = ((SubmitApplicationRequestPBImpl)request).getProto();
try {
return new SubmitApplicationResponsePBImpl(this.proxy.submitApplication((RpcController)null, requestProto));
} catch (ServiceException var4) {
RPCUtil.unwrapAndThrowException(var4);
return null;
}
}
ClientRMService
@Override
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnException {
ApplicationSubmissionContext submissionContext = request
.getApplicationSubmissionContext();
ApplicationId applicationId = submissionContext.getApplicationId();
// ApplicationSubmissionContext needs to be validated for safety - only
// those fields that are independent of the RM's configuration will be
// checked here, those that are dependent on RM configuration are validated
// in RMAppManager.
String user = null;
try {
// Safety
// 安全校验
user = UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException ie) {
LOG.warn("Unable to get the current user.", ie);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
ie.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId);
throw RPCUtil.getRemoteException(ie);
}
// Check whether app has already been put into rmContext,
// If it is, simply return the response
if (rmContext.getRMApps().get(applicationId) != null) {
LOG.info("This is an earlier submitted application: " + applicationId);
return SubmitApplicationResponse.newInstance();
}
if (submissionContext.getQueue() == null) {
// DEFAULT_QUEUE_NAME = "default"
submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
}
if (submissionContext.getApplicationName() == null) {
submissionContext.setApplicationName(
// DEFAULT_APPLICATION_NAME = "N/A"
YarnConfiguration.DEFAULT_APPLICATION_NAME);
}
if (submissionContext.getApplicationType() == null) {
submissionContext
// DEFAULT_APPLICATION_TYPE = "YARN"
.setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE);
} else {
if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) {
submissionContext.setApplicationType(submissionContext
.getApplicationType().substring(0,
YarnConfiguration.APPLICATION_TYPE_LENGTH));
}
}
try {
// call RMAppManager to submit application directly
// 直接调用 RMAppManager 提交应用
rmAppManager.submitApplication(submissionContext,
System.currentTimeMillis(), user);
LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user);
RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
"ClientRMService", applicationId);
} catch (YarnException e) {
LOG.info("Exception in submitting application with id " +
applicationId.getId(), e);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
e.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId);
throw e;
}
SubmitApplicationResponse response = recordFactory
.newRecordInstance(SubmitApplicationResponse.class);
return response;
}
// call RMAppManager to submit application directly
// 直接调用 RMAppManager 提交应用
rmAppManager.submitApplication(submissionContext,
System.currentTimeMillis(), user);

边栏推荐
猜你喜欢

HCIP练习02(OSPF)

What are pseudo-classes and pseudo-elements?The difference between pseudo-classes and pseudo-elements

【LeetCode】226. Flip the binary tree

STP生成树(端口状态+端口角色+收敛机制 )|||| STP优化技术( uplinkfast技术+Portfast技术+backbonefast技术 )详解

Redisson实现分布式锁

多媒体数据处理实验2:PCA

Scala parallel collections, parallel concurrency, thread safety issues, ThreadLocal

Exception: Dataset not found.解决办法

RSTP(端口角色+端口状态+工作机制)|||| 交换机接口分析

flutter 应用 抓包
随机推荐
面渣逆袭:MySQL六十六问,两万字+五十图详解
10分钟带你入门chrome(谷歌)浏览器插件开发
JMeter接口自动化发包与示例
0day_Topsec上网行为管理RCE
scala reduce、reduceLeft 、reduceRight 、fold、foldLeft 、foldRight
Exch:重命名或删除默认邮箱数据库
LeetCode第三题(Longest Substring Without Repeating Characters)三部曲之二:编码实现
C# 一周入门高级编程之《C#-继承》Day One
scala 并行集合、并行并发、线程安全问题、ThreadLocal
HCIP实验(06)
基于二次型性能指标的燃料电池过氧比RBF-PID控制
Industry SaaS Microservice Stability Guarantee Actual Combat
QImage的指针问题
unity的game界面里有canvas的线框?如何隐藏掉?
HCIP练习03(重发布)
LeetCode 每日一题——622. 设计循环队列
AUC的两种计算方式
分析型数据库性能测试总结
dflow入门2——Slices
编程踩坑合集