当前位置:网站首页>Flink Yarn Per Job - Submit application

Flink Yarn Per Job - Submit application

2022-08-03 09:03:00 hyunbar

图片

YarnClusterDescriptor

private ApplicationReport startAppMaster(
    Configuration configuration,
    String applicationName,
    String yarnClusterEntrypoint,
    JobGraph jobGraph,
    YarnClient yarnClient,
    YarnClientApplication yarnApplication,
    ClusterSpecification clusterSpecification) throws Exception {
    ... ... 
   yarnClient.submitApplication(appContext);

    ... ...
}
YarnClientImpl extends YarnClient
public ApplicationId submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException {
    ApplicationId applicationId = appContext.getApplicationId();
    if (applicationId == null) {
        throw new ApplicationIdNotProvidedException("ApplicationId is not provided in ApplicationSubmissionContext");
    } else {
        SubmitApplicationRequest request = (SubmitApplicationRequest)Records.newRecord(SubmitApplicationRequest.class);
        request.setApplicationSubmissionContext(appContext);
       // 重要
        this.rmClient.submitApplication(request);
       ... ...
       }
}

图片

ApplicationClientProtocolPBClientImpl implements ApplicationClientProtocol
public SubmitApplicationResponse submitApplication(SubmitApplicationRequest request) throws YarnException, IOException {
    SubmitApplicationRequestProto requestProto = ((SubmitApplicationRequestPBImpl)request).getProto();

    try {
        return new SubmitApplicationResponsePBImpl(this.proxy.submitApplication((RpcController)null, requestProto));
    } catch (ServiceException var4) {
        RPCUtil.unwrapAndThrowException(var4);
        return null;
    }
}

ClientRMService

@Override
public SubmitApplicationResponse submitApplication(
    SubmitApplicationRequest request) throws YarnException {
  ApplicationSubmissionContext submissionContext = request
      .getApplicationSubmissionContext();
  ApplicationId applicationId = submissionContext.getApplicationId();

  // ApplicationSubmissionContext needs to be validated for safety - only
  // those fields that are independent of the RM's configuration will be
  // checked here, those that are dependent on RM configuration are validated
  // in RMAppManager.

  String user = null;
  try {
    // Safety
      // 安全校验
    user = UserGroupInformation.getCurrentUser().getShortUserName();
  } catch (IOException ie) {
    LOG.warn("Unable to get the current user.", ie);
    RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
        ie.getMessage(), "ClientRMService",
        "Exception in submitting application", applicationId);
    throw RPCUtil.getRemoteException(ie);
  }

  // Check whether app has already been put into rmContext,
  // If it is, simply return the response
  if (rmContext.getRMApps().get(applicationId) != null) {
    LOG.info("This is an earlier submitted application: " + applicationId);
    return SubmitApplicationResponse.newInstance();
  }

  if (submissionContext.getQueue() == null) {
  // DEFAULT_QUEUE_NAME = "default"
    submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
  }
  if (submissionContext.getApplicationName() == null) {
    submissionContext.setApplicationName(
      //  DEFAULT_APPLICATION_NAME = "N/A"
        YarnConfiguration.DEFAULT_APPLICATION_NAME);
  }
  if (submissionContext.getApplicationType() == null) {
    submissionContext
      // DEFAULT_APPLICATION_TYPE = "YARN"
      .setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE);
  } else {
    if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) {
      submissionContext.setApplicationType(submissionContext
        .getApplicationType().substring(0,
          YarnConfiguration.APPLICATION_TYPE_LENGTH));
    }
  }

  try {
    // call RMAppManager to submit application directly
      // 直接调用 RMAppManager 提交应用
    rmAppManager.submitApplication(submissionContext,
        System.currentTimeMillis(), user);

    LOG.info("Application with id " + applicationId.getId() + 
        " submitted by user " + user);
    RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
        "ClientRMService", applicationId);
  } catch (YarnException e) {
    LOG.info("Exception in submitting application with id " +
        applicationId.getId(), e);
    RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
        e.getMessage(), "ClientRMService",
        "Exception in submitting application", applicationId);
    throw e;
  }

  SubmitApplicationResponse response = recordFactory
      .newRecordInstance(SubmitApplicationResponse.class);
  return response;
}
// call RMAppManager to submit application directly
// 直接调用 RMAppManager 提交应用
rmAppManager.submitApplication(submissionContext,
    System.currentTimeMillis(), user);

图片

原网站

版权声明
本文为[hyunbar]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/215/202208030901150899.html