当前位置:网站首页>Flink Yarn Per Job - 创建启动Dispatcher RM JobManager
Flink Yarn Per Job - 创建启动Dispatcher RM JobManager
2022-08-03 09:01:00 【hyunbar】
Per-job 模式的 AM container 加载运行入口是 YarnJobClusterEntryPoint 中的 main()方法
YarnClusterDescriptor
public ClusterClientProvider<ApplicationId> deployJobCluster(
ClusterSpecification clusterSpecification,
JobGraph jobGraph,
boolean detached) throws ClusterDeploymentException {
try {
return deployInternal(
clusterSpecification,
"Flink per-job cluster",
getYarnJobClusterEntrypoint(),
jobGraph,
detached);
} catch (Exception e) {
throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
}
}
YarnClusterDescriptor的main方法
YarnJobClusterEntrypoint
// ------------------------------------------------------------------------
// The executable entry point for the Yarn Application Master Process
// for a single Flink job.
// ------------------------------------------------------------------------
public static void main(String[] args) {
// startup checks and logging
EnvironmentInformation.logEnvironmentInfo(LOG, YarnJobClusterEntrypoint.class.getSimpleName(), args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
Map<String, String> env = System.getenv();
final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key());
Preconditions.checkArgument(
workingDirectory != null,
"Working directory variable (%s) not set",
ApplicationConstants.Environment.PWD.key());
try {
YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
} catch (IOException e) {
LOG.warn("Could not log YARN environment information.", e);
}
final Configuration dynamicParameters = ClusterEntrypointUtils.parseParametersOrExit(
args,
new DynamicParametersConfigurationParserFactory(),
YarnJobClusterEntrypoint.class);
final Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, dynamicParameters, env);
YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new YarnJobClusterEntrypoint(configuration);
// 重要
ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
}
ClusterEntrypoint
// --------------------------------------------------
// Helper methods
// --------------------------------------------------
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
try {
// 重要
clusterEntrypoint.startCluster();
} catch (ClusterEntrypointException e) {
... ...
}
public void startCluster() throws ClusterEntrypointException {
LOG.info("Starting {}.", getClass().getSimpleName());
try {
replaceGracefulExitWithHaltIfConfigured(configuration);
PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);
configureFileSystems(configuration, pluginManager);
SecurityContext securityContext = installSecurityContext(configuration);
securityContext.runSecured((Callable<Void>) () -> {
// 重要
runCluster(configuration, pluginManager);
return null;
});
}
... ...
}
private void runCluster(Configuration configuration, PluginManager pluginManager) throws Exception {
synchronized (lock) {
// 初始化服务:Rpc相关
initializeServices(configuration, pluginManager);
// write host information into configuration
// 将host信息写入配置
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
// 创建 dispatcher、ResourceManager 对象的工厂类
final DispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);
// 创建和启动 JobManager里的组件:
// 启动 RpcService、HAService、BlobServer、HeartbeatServices、
// MetricRegistry、ExecutionGraphStore 等
clusterComponent = dispatcherResourceManagerComponentFactory.create(
configuration,
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
this);
clusterComponent.getShutDownFuture().whenComplete(
(ApplicationStatus applicationStatus, Throwable throwable) -> {
if (throwable != null) {
shutDownAsync(
ApplicationStatus.UNKNOWN,
ExceptionUtils.stringifyException(throwable),
false);
} else {
// This is the general shutdown path. If a separate more specific shutdown was
// already triggered, this will do nothing
shutDownAsync(
applicationStatus,
null,
true);
}
});
}
}
初始化RPC相关的服务
将host信息写入配置
创建 dispatcher、ResourceManager 对象的工厂类
创建和启动 JobManager里的组件
RpcService
HAService
BlobServer
HeartbeatServices
MetricRegistry
ExecutionGraphStore
JobManager里面的三个组件
DefaultDispatcherResourceManagerComponentFactory
@Override
public DispatcherResourceManagerComponent create(
Configuration configuration,
Executor ioExecutor,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
MetricQueryServiceRetriever metricQueryServiceRetriever,
FatalErrorHandler fatalErrorHandler) throws Exception {
LeaderRetrievalService dispatcherLeaderRetrievalService = null;
LeaderRetrievalService resourceManagerRetrievalService = null;
WebMonitorEndpoint<?> webMonitorEndpoint = null;
ResourceManager<?> resourceManager = null;
DispatcherRunner dispatcherRunner = null;
try {
dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();
final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
rpcService,
DispatcherGateway.class,
DispatcherId::fromUuid,
new ExponentialBackoffRetryStrategy(12, Duration.ofMillis(10), Duration.ofMillis(50)));
final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
rpcService,
ResourceManagerGateway.class,
ResourceManagerId::fromUuid,
new ExponentialBackoffRetryStrategy(12, Duration.ofMillis(10), Duration.ofMillis(50)));
final ScheduledExecutorService executor = WebMonitorEndpoint.createExecutorService(
configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint");
final long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
final MetricFetcher metricFetcher = updateInterval == 0
? VoidMetricFetcher.INSTANCE
: MetricFetcherImpl.fromConfiguration(
configuration,
metricQueryServiceRetriever,
dispatcherGatewayRetriever,
executor);
// 创建接收前端Rest请求的节点,web页面提交的
webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
metricFetcher,
highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
fatalErrorHandler);
log.debug("Starting Dispatcher REST endpoint.");
webMonitorEndpoint.start();
final String hostname = RpcUtils.getHostname(rpcService);
// 创建 Yarn模式的 ResourceManager
resourceManager = resourceManagerFactory.createResourceManager(
configuration,
ResourceID.generate(),
rpcService,
highAvailabilityServices,
heartbeatServices,
fatalErrorHandler,
new ClusterInformation(hostname, blobServer.getPort()),
webMonitorEndpoint.getRestBaseUrl(),
metricRegistry,
hostname,
ioExecutor);
final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint, ioExecutor);
final PartialDispatcherServices partialDispatcherServices = new PartialDispatcherServices(
configuration,
highAvailabilityServices,
resourceManagerGatewayRetriever,
blobServer,
heartbeatServices,
() -> MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, hostname),
archivedExecutionGraphStore,
fatalErrorHandler,
historyServerArchivist,
metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
ioExecutor);
log.debug("Starting Dispatcher.");
// 创建和启动 Dispatcher => dispatcher会创建和启动JobMaster
dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(
highAvailabilityServices.getDispatcherLeaderElectionService(),
fatalErrorHandler,
new HaServicesJobGraphStoreFactory(highAvailabilityServices),
ioExecutor,
rpcService,
partialDispatcherServices);
log.debug("Starting ResourceManager.");
// 启动 ResourceManager
resourceManager.start();
resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
return new DispatcherResourceManagerComponent(
dispatcherRunner,
DefaultResourceManagerService.createFor(resourceManager),
dispatcherLeaderRetrievalService,
resourceManagerRetrievalService,
webMonitorEndpoint,
fatalErrorHandler);
}
... ...
}
创建接收前端Rest请求的节点,web页面提交的应用
创建 Yarn模式的 ResourceManager
创建和启动 Dispatcher => dispatcher会创建和启动JobMaster
启动 ResourceManager
创建 YarnResourceMananger
ResourceManagerFactory
public ResourceManager<T> createResourceManager(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
MetricRegistry metricRegistry,
String hostname,
Executor ioExecutor) throws Exception {
final ResourceManagerMetricGroup resourceManagerMetricGroup = ResourceManagerMetricGroup.create(metricRegistry, hostname);
final SlotManagerMetricGroup slotManagerMetricGroup = SlotManagerMetricGroup.create(metricRegistry, hostname);
// 重要
final ResourceManagerRuntimeServices resourceManagerRuntimeServices = createResourceManagerRuntimeServices(
configuration, rpcService, highAvailabilityServices, slotManagerMetricGroup);
return createResourceManager(
configuration,
resourceId,
rpcService,
highAvailabilityServices,
heartbeatServices,
fatalErrorHandler,
clusterInformation,
webInterfaceUrl,
resourceManagerMetricGroup,
resourceManagerRuntimeServices,
ioExecutor);
}
ResourceManagerFactory
protected abstract ResourceManager<T> createResourceManager(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup,
ResourceManagerRuntimeServices resourceManagerRuntimeServices,
Executor ioExecutor) throws Exception;
private ResourceManagerRuntimeServices createResourceManagerRuntimeServices(
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
SlotManagerMetricGroup slotManagerMetricGroup) throws ConfigurationException {
// 重要
return ResourceManagerRuntimeServices.fromConfiguration(
createResourceManagerRuntimeServicesConfiguration(configuration),
highAvailabilityServices,
rpcService.getScheduledExecutor(),
slotManagerMetricGroup);
}
YarnResourceManagerFactory
protected ResourceManagerDriver<YarnWorkerNode> createResourceManagerDriver(Configuration configuration, String webInterfaceUrl, String rpcAddress) {
final YarnResourceManagerDriverConfiguration yarnResourceManagerDriverConfiguration = new YarnResourceManagerDriverConfiguration(System.getenv(), rpcAddress, webInterfaceUrl);
return new YarnResourceManagerDriver(
configuration,
yarnResourceManagerDriverConfiguration,
DefaultYarnResourceManagerClientFactory.getInstance(),
DefaultYarnNodeManagerClientFactory.getInstance());
}
YarnResourceManagerDriver
public YarnResourceManagerDriver(
Configuration flinkConfig,
YarnResourceManagerDriverConfiguration configuration,
YarnResourceManagerClientFactory yarnResourceManagerClientFactory,
YarnNodeManagerClientFactory yarnNodeManagerClientFactory) {
super(flinkConfig, GlobalConfiguration.loadConfiguration(configuration.getCurrentDir()));
this.yarnConfig = new YarnConfiguration();
this.requestResourceFutures = new HashMap<>();
this.configuration = configuration;
// yarn心跳间隔 yarn.heartbeat.interval
final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
// 过期间隔时间
final long yarnExpiryIntervalMS = yarnConfig.getLong(
YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
if (yarnHeartbeatIntervalMS >= yarnExpiryIntervalMS) {
log.warn("The heartbeat interval of the Flink Application master ({}) is greater " +
"than YARN's expiry interval ({}). The application is likely to be killed by YARN.",
yarnHeartbeatIntervalMS, yarnExpiryIntervalMS);
}
yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMS;
containerRequestHeartbeatIntervalMillis = flinkConfig.getInteger(YarnConfigOptions.CONTAINER_REQUEST_HEARTBEAT_INTERVAL_MILLISECONDS);
this.registerApplicationMasterResponseReflector = new RegisterApplicationMasterResponseReflector(log);
this.yarnResourceManagerClientFactory = yarnResourceManagerClientFactory;
this.yarnNodeManagerClientFactory = yarnNodeManagerClientFactory;
}
创建slotManager
public static ResourceManagerRuntimeServices fromConfiguration( |
创建和启动 Dispatcher
DefaultDispatcherResourceManagerComponentFactory
dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(
highAvailabilityServices.getDispatcherLeaderElectionService(),
fatalErrorHandler,
new HaServicesJobGraphStoreFactory(highAvailabilityServices),
ioExecutor,
rpcService,
partialDispatcherServices);
DefaultDispatcherRunnerFactory implements DispatcherRunnerFactory
public DispatcherRunner createDispatcherRunner(
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler,
JobGraphStoreFactory jobGraphStoreFactory,
Executor ioExecutor,
RpcService rpcService,
PartialDispatcherServices partialDispatcherServices) throws Exception {
final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory = dispatcherLeaderProcessFactoryFactory.createFactory(
jobGraphStoreFactory,
ioExecutor,
rpcService,
partialDispatcherServices,
fatalErrorHandler);
// 重要
return DefaultDispatcherRunner.create(
leaderElectionService,
fatalErrorHandler,
dispatcherLeaderProcessFactory);
}
DefaultDispatcherRunner
public static DispatcherRunner create(
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler,
DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory) throws Exception {
final DefaultDispatcherRunner dispatcherRunner = new DefaultDispatcherRunner(
leaderElectionService,
fatalErrorHandler,
dispatcherLeaderProcessFactory);
return DispatcherRunnerLeaderElectionLifecycleManager.createFor(dispatcherRunner, leaderElectionService);
}
DispatcherRunnerLeaderElectionLifecycleManager
public static <T extends DispatcherRunner & LeaderContender> DispatcherRunner createFor(T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
return new DispatcherRunnerLeaderElectionLifecycleManager<>(dispatcherRunner, leaderElectionService);
}
private DispatcherRunnerLeaderElectionLifecycleManager(T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
this.dispatcherRunner = dispatcherRunner;
this.leaderElectionService = leaderElectionService;
// 启动Dispatcher的leader选举
leaderElectionService.start(dispatcherRunner);
}
StandaloneLeaderElectionService implements LeaderElectionService
@Override
public void start(LeaderContender newContender) throws Exception {
if (contender != null) {
// Service was already started
throw new IllegalArgumentException("Leader election service cannot be started multiple times.");
}
contender = Preconditions.checkNotNull(newContender);
// directly grant leadership to the given contender
contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
}
DefaultDispatcherRunner
@Override
public void grantLeadership(UUID leaderSessionID) {
runActionIfRunning(() -> startNewDispatcherLeaderProcess(leaderSessionID));
}
private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
// 先停止
stopDispatcherLeaderProcess();
// 创建dispatcher的leader
dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);
final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;
// 继续往下
FutureUtils.assertNoException(
previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));
}
AbstractDispatcherLeaderProcess
public final void start() {
runIfStateIs(
State.CREATED,
this::startInternal);
}
private void startInternal() {
log.info("Start {}.", getClass().getSimpleName());
state = State.RUNNING;
onStart();
}
JobDispatcherLeaderProcess
@Override
protected void onStart() {
final DispatcherGatewayService dispatcherService = dispatcherGatewayServiceFactory.create(
DispatcherId.fromUuid(getLeaderSessionId()),
Collections.singleton(jobGraph),
ThrowingJobGraphWriter.INSTANCE);
completeDispatcherSetup(dispatcherService);
}
DefaultDispatcherGatewayServiceFactory implements AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory
@Override
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
DispatcherId fencingToken,
Collection<JobGraph> recoveredJobs,
JobGraphWriter jobGraphWriter) {
final Dispatcher dispatcher;
try {
// 创建Dispatcher
dispatcher = dispatcherFactory.createDispatcher(
rpcService,
fencingToken,
recoveredJobs,
(dispatcherGateway, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(),
PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, jobGraphWriter));
} catch (Exception e) {
throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
}
// 启动 Dispatcher,接着看 onStart()
dispatcher.start();
return DefaultDispatcherGatewayService.from(dispatcher);
}
RpcEndpoint
/**
* Triggers start of the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is ready
* to process remote procedure calls.
*/
public final void start() {
// 终端的启动,实际上是由 自身网关(RpcServer)来启动的
rpcServer.start();
}
AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, RpcServer
@Override
public void start() {
rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
}
@Override
public void stop() {
rpcEndpoint.tell(ControlMessages.STOP, ActorRef.noSender());
}
启动 ResourceManager
DefaultDispatcherResourceManagerComponentFactory
log.debug("Starting ResourceManager.");
/*TODO 启动 ResourceManager*/
resourceManager.start();
ResourceManager
@Override
public final void onStart() throws Exception {
try {
startResourceManagerServices();
} catch (Throwable t) {
final ResourceManagerException exception = new ResourceManagerException(String.format("Could not start the ResourceManager %s", getAddress()), t);
onFatalError(exception);
throw exception;
}
}
private void startResourceManagerServices() throws Exception {
try {
leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
// 创建了Yarn的RM和NM的客户端,初始化并启动
initialize();
// 通过选举服务,启动ResourceManager
leaderElectionService.start(this);
jobLeaderIdService.start(new JobLeaderIdActionsImpl());
registerTaskExecutorMetrics();
} catch (Exception e) {
handleStartResourceManagerServicesException(e);
}
}
YarnResourceManagerDriver
@Override
protected void initializeInternal() throws Exception {
final YarnContainerEventHandler yarnContainerEventHandler = new YarnContainerEventHandler();
try {
// 创建Yarn的ResourceManager的客户端,并且初始化和启动
resourceManagerClient = yarnResourceManagerClientFactory.createResourceManagerClient(
yarnHeartbeatIntervalMillis,
yarnContainerEventHandler);
resourceManagerClient.init(yarnConfig);
resourceManagerClient.start();
final RegisterApplicationMasterResponse registerApplicationMasterResponse = registerApplicationMaster();
getContainersFromPreviousAttempts(registerApplicationMasterResponse);
taskExecutorProcessSpecContainerResourcePriorityAdapter =
new TaskExecutorProcessSpecContainerResourcePriorityAdapter(
registerApplicationMasterResponse.getMaximumResourceCapability(),
ExternalResourceUtils.getExternalResources(flinkConfig, YarnConfigOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX));
} catch (Exception e) {
throw new ResourceManagerException("Could not start resource manager client.", e);
}
// 创建yarn的 NodeManager的客户端,并且初始化和启动
nodeManagerClient = yarnNodeManagerClientFactory.createNodeManagerClient(yarnContainerEventHandler);
nodeManagerClient.init(yarnConfig);
nodeManagerClient.start();
}
创建Yarn的ResourceManager的客户端,并且初始化和启动
创建yarn的 NodeManager的客户端,并且初始化和启动
Dispatcher 启动 JobManager
Dispatcher
@Override
public void onStart() throws Exception {
try {
// 启动 dispatcher服务
startDispatcherServices();
} catch (Throwable t) {
final DispatcherException exception = new DispatcherException(String.format("Could not start the Dispatcher %s", getAddress()), t);
onFatalError(exception);
throw exception;
}
// 启动JobMaster
startRecoveredJobs();
this.dispatcherBootstrap = this.dispatcherBootstrapFactory.create(
getSelfGateway(DispatcherGateway.class),
this.getRpcService().getScheduledExecutor() ,
this::onFatalError);
}
启动 dispatcher服务
启动JobMaster
private void startRecoveredJobs() {
for (JobGraph recoveredJob : recoveredJobs) {
// 下追
runRecoveredJob(recoveredJob);
}
recoveredJobs.clear();
}
private void runRecoveredJob(final JobGraph recoveredJob) {
checkNotNull(recoveredJob);
try {
// 下追
runJob(recoveredJob, ExecutionType.RECOVERY);
} catch (Throwable throwable) {
onFatalError(new DispatcherException(String.format("Could not start recovered job %s.", recoveredJob.getJobID()), throwable));
}
}
private void runJob(JobGraph jobGraph, ExecutionType executionType) {
Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
long initializationTimestamp = System.currentTimeMillis();
// 下追
CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph, initializationTimestamp);
DispatcherJob dispatcherJob = DispatcherJob.createFor(
jobManagerRunnerFuture,
jobGraph.getJobID(),
jobGraph.getName(),
initializationTimestamp);
runningJobs.put(jobGraph.getJobID(), dispatcherJob);
final JobID jobId = jobGraph.getJobID();
... ...
}
CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph, long initializationTimestamp) {
final RpcService rpcService = getRpcService();
return CompletableFuture.supplyAsync(
() -> {
try {
// 创建JobMaster
JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner(
jobGraph,
configuration,
rpcService,
highAvailabilityServices,
heartbeatServices,
jobManagerSharedServices,
new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
fatalErrorHandler,
initializationTimestamp);
// 启动JobMaster
runner.start();
return runner;
} catch (Exception e) {
throw new CompletionException(new JobInitializationException(jobGraph.getJobID(), "Could not instantiate JobManager.", e));
}
},
ioExecutor); // do not use main thread executor. Otherwise, Dispatcher is blocked on JobManager creation
}
创建JobMaster
启动JobMaster
DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory
@Override
public JobManagerRunner createJobManagerRunner(
JobGraph jobGraph,
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
JobManagerSharedServices jobManagerServices,
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
long initializationTimestamp) throws Exception {
final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);
final SlotPoolFactory slotPoolFactory = SlotPoolFactory.fromConfiguration(configuration);
final SchedulerNGFactory schedulerNGFactory = SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration);
final ShuffleMaster<?> shuffleMaster = ShuffleServiceLoader.loadShuffleServiceFactory(configuration).createShuffleMaster(configuration);
final JobMasterServiceFactory jobMasterFactory = new DefaultJobMasterServiceFactory(
jobMasterConfiguration,
slotPoolFactory,
rpcService,
highAvailabilityServices,
jobManagerServices,
heartbeatServices,
jobManagerJobMetricGroupFactory,
fatalErrorHandler,
schedulerNGFactory,
shuffleMaster);
// 下追
return new JobManagerRunnerImpl(
jobGraph,
jobMasterFactory,
highAvailabilityServices,
jobManagerServices.getLibraryCacheManager().registerClassLoaderLease(jobGraph.getJobID()),
jobManagerServices.getScheduledExecutorService(),
fatalErrorHandler,
initializationTimestamp);
}
JobManagerRunnerImpl
public JobManagerRunnerImpl(
final JobGraph jobGraph,
final JobMasterServiceFactory jobMasterFactory,
final HighAvailabilityServices haServices,
final LibraryCacheManager.ClassLoaderLease classLoaderLease,
final Executor executor,
final FatalErrorHandler fatalErrorHandler,
long initializationTimestamp) throws Exception {
... ...
// now start the JobManager
this.jobMasterService = jobMasterFactory.createJobMasterService(jobGraph, this, userCodeLoader, initializationTimestamp);
}
DefaultJobMasterServiceFactory implements JobMasterServiceFactory
@Override
public JobMaster createJobMasterService(
JobGraph jobGraph,
OnCompletionActions jobCompletionActions,
ClassLoader userCodeClassloader,
long initializationTimestamp) throws Exception {
return new JobMaster(
rpcService,
jobMasterConfiguration,
ResourceID.generate(),
jobGraph,
haServices,
slotPoolFactory,
jobManagerSharedServices,
heartbeatServices,
jobManagerJobMetricGroupFactory,
jobCompletionActions,
fatalErrorHandler,
userCodeClassloader,
schedulerNGFactory,
shuffleMaster,
lookup -> new JobMasterPartitionTrackerImpl(
jobGraph.getJobID(),
shuffleMaster,
lookup
),
new DefaultExecutionDeploymentTracker(),
DefaultExecutionDeploymentReconciler::new,
initializationTimestamp);
}
在JobManager中,最终创建JobMaster来执行任务
JobMaster 的构造器
public JobMaster(
... ...) throws Exception {
super(rpcService, AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME), null);
... ...
this.executionDeploymentTracker = executionDeploymentTracker;
this.executionDeploymentReconciler = executionDeploymentReconcilerFactory.create(executionStateReconciliationHandler);
this.jobMasterConfiguration = checkNotNull(jobMasterConfiguration);
this.resourceId = checkNotNull(resourceId);
this.jobGraph = checkNotNull(jobGraph);
this.rpcTimeout = jobMasterConfiguration.getRpcTimeout();
this.highAvailabilityServices = checkNotNull(highAvailabilityService);
this.blobWriter = jobManagerSharedServices.getBlobWriter();
this.scheduledExecutorService = jobManagerSharedServices.getScheduledExecutorService();
this.jobCompletionActions = checkNotNull(jobCompletionActions);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
this.userCodeLoader = checkNotNull(userCodeLoader);
this.schedulerNGFactory = checkNotNull(schedulerNGFactory);
this.heartbeatServices = checkNotNull(heartbeatServices);
this.jobMetricGroupFactory = checkNotNull(jobMetricGroupFactory);
this.initializationTimestamp = initializationTimestamp;
this.retrieveTaskManagerHostName = jobMasterConfiguration.getConfiguration()
.getBoolean(JobManagerOptions.RETRIEVE_TASK_MANAGER_HOSTNAME);
final String jobName = jobGraph.getName();
final JobID jid = jobGraph.getJobID();
log.info("Initializing job {} ({}).", jobName, jid);
resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
this.slotPool = checkNotNull(slotPoolFactory).createSlotPool(jid);
this.registeredTaskManagers = new HashMap<>(4);
this.partitionTracker = checkNotNull(partitionTrackerFactory)
.create(resourceID -> {
Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerInfo = registeredTaskManagers.get(resourceID);
if (taskManagerInfo == null) {
return Optional.empty();
}
return Optional.of(taskManagerInfo.f1);
});
this.backPressureStatsTracker = checkNotNull(jobManagerSharedServices.getBackPressureStatsTracker());
this.shuffleMaster = checkNotNull(shuffleMaster);
this.jobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
// 创建 调度器,创建的时候把 JobGraph转换成 ExecutionGraph
this.schedulerNG = createScheduler(executionDeploymentTracker, jobManagerJobMetricGroup);
this.jobStatusListener = null;
this.resourceManagerConnection = null;
this.establishedResourceManagerConnection = null;
this.accumulators = new HashMap<>();
this.taskManagerHeartbeatManager = NoOpHeartbeatManager.getInstance();
this.resourceManagerHeartbeatManager = NoOpHeartbeatManager.getInstance();
}
创建 调度器,创建的时候把 JobGraph转换成 ExecutionGraph
SchedulerBase
public SchedulerBase(
... ...) throws Exception {
this.log = checkNotNull(log);
this.jobGraph = checkNotNull(jobGraph);
... ...
// 下追
this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker), checkNotNull(executionDeploymentTracker), initializationTimestamp);
this.schedulingTopology = executionGraph.getSchedulingTopology();
stateLocationRetriever =
executionVertexId -> getExecutionVertex(executionVertexId).getPreferredLocationBasedOnState();
inputsLocationsRetriever = new ExecutionGraphToInputsLocationsRetrieverAdapter(executionGraph);
this.coordinatorMap = createCoordinatorMap();
}
private ExecutionGraph createAndRestoreExecutionGraph(
JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
ShuffleMaster<?> shuffleMaster,
JobMasterPartitionTracker partitionTracker,
ExecutionDeploymentTracker executionDeploymentTracker,
long initializationTimestamp) throws Exception {
// 下追
ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker, executionDeploymentTracker, initializationTimestamp);
... ...
return newExecutionGraph;
}
private ExecutionGraph createExecutionGraph(
... ...
return ExecutionGraphBuilder.buildGraph(
null,
jobGraph,
... ...);
}
ExecutionGraphBuilder
public static ExecutionGraph buildGraph(
... ...
checkNotNull(jobGraph, "job graph cannot be null");
final String jobName = jobGraph.getName();
final JobID jobId = jobGraph.getJobID();
... ...
// create a new execution graph, if none exists so far
final ExecutionGraph executionGraph;
try {
// 如果不存在执⾏图,就创建⼀个新的执⾏图
executionGraph = (prior != null) ? prior :
new ExecutionGraph(
jobInformation,
futureExecutor,
ioExecutor,
rpcTimeout,
restartStrategy,
maxPriorAttemptsHistoryLength,
failoverStrategyFactory,
slotProvider,
classLoader,
blobWriter,
allocationTimeout,
partitionReleaseStrategyFactory,
shuffleMaster,
partitionTracker,
jobGraph.getScheduleMode(),
executionDeploymentListener,
executionStateUpdateListener,
initializationTimestamp);
} catch (IOException e) {
throw new JobException("Could not create the ExecutionGraph.", e);
}
// set the basic properties
try {
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
}
catch (Throwable t) {
log.warn("Cannot create JSON plan for job", t);
// give the graph an empty plan
executionGraph.setJsonPlan("{}");
}
// initialize the vertices that have a master initialization hook
// file output formats create directories here, input formats create splits
final long initMasterStart = System.nanoTime();
log.info("Running initialization on master for job {} ({}).", jobName, jobId);
for (JobVertex vertex : jobGraph.getVertices()) {
String executableClass = vertex.getInvokableClassName();
if (executableClass == null || executableClass.isEmpty()) {
throw new JobSubmissionException(jobId,
"The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class.");
}
try {
vertex.initializeOnMaster(classLoader);
}
catch (Throwable t) {
throw new JobExecutionException(jobId,
"Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t);
}
}
log.info("Successfully ran initialization on master in {} ms.",
(System.nanoTime() - initMasterStart) / 1_000_000);
// topologically sort the job vertices and attach the graph to the existing one
// 对JobGraph进⾏拓扑排序,获取所有的JobVertex列表
List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
if (log.isDebugEnabled()) {
log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), jobName, jobId);
}
// 将拓扑排序过的JobGraph添加到 executionGraph数据结构中。
executionGraph.attachJobGraph(sortedTopology);
if (log.isDebugEnabled()) {
log.debug("Successfully created execution graph from job graph {} ({}).", jobName, jobId);
}
... ...
executionGraph.enableCheckpointing(
chkConfig,
triggerVertices,
ackVertices,
confirmVertices,
hooks,
checkpointIdCounter,
completedCheckpoints,
rootBackend,
checkpointStatsTracker);
}
// create all the metrics for the Execution Graph
metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(executionGraph));
metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(executionGraph));
metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(executionGraph));
executionGraph.getFailoverStrategy().registerMetrics(metrics);
return executionGraph;
}
如果不存在执⾏图,就创建⼀个新的执⾏图
对JobGraph进⾏拓扑排序,获取所有的JobVertex列表
将拓扑排序过的JobGraph添加到 executionGraph数据结构中。
JobManagerRunnerImpl
@Override
public void start() throws Exception {
leaderElectionService.start(this);
}
StandaloneLeaderElectionService
public void start(LeaderContender newContender) throws Exception {
if (contender != null) {
// Service was already started
throw new IllegalArgumentException("Leader election service cannot be started multiple times.");
}
contender = Preconditions.checkNotNull(newContender);
// directly grant leadership to the given contender
contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
}
@Override
public void grantLeadership(final UUID leaderSessionID) {
... ...
return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
... ...
}
private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();
return jobSchedulingStatusFuture.thenCompose(
jobSchedulingStatus -> {
if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
return jobAlreadyDone();
} else {
// 启动 JobMaster
return startJobMaster(leaderSessionId);
}
});
}
private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, jobMasterService.getAddress());
... ...
final CompletableFuture<Acknowledge> startFuture;
try {
// 启动JobMaster服务
startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
} catch (Exception e) {
return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));
}
final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
return startFuture.thenAcceptAsync(
(Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(
leaderSessionId,
jobMasterService.getAddress(),
currentLeaderGatewayFuture),
executor);
}
启动JobMaster服务
JobMaster
public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {
// make sure we receive RPC and async calls
start();
// 异步不阻塞 调用
return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
}
边栏推荐
猜你喜欢
基于百度AI和QT的景物识别系统
MySQL-TCL语言-transaction control language事务控制语言
行业 SaaS 微服务稳定性保障实战
Docker starts mysql
HCIP练习(OSPF)
Scala parallel collections, parallel concurrency, thread safety issues, ThreadLocal
【LeetCode】226. Flip the binary tree
The Transformer, BERT, GPT paper intensive reading notes
Network LSTM both short-term and long-term memory
【无标题】
随机推荐
greenplum role /user 管理
scala减少,reduceLeft reduceRight,折叠,foldLeft foldRight
word之个人设置
【快手面试】Word2vect生成的向量,为什么可以计算相似度,相似度有什么意义?
FusionAccess软件架构、FusionAccess必须配置的四个组件、桌面发放流程、虚拟机组类型、桌面组类型
手把手教你如何自制目标检测框架(从理论到实现)
Machine learning (formula derivation and code implementation)--sklearn machine learning library
基于二次型性能指标的燃料电池过氧比RBF-PID控制
行业 SaaS 微服务稳定性保障实战
unity的game界面里有canvas的线框?如何隐藏掉?
【LeetCode】101. Symmetric Binary Tree
分析型数据库性能测试总结
Using pipreqs export requirements needed for the project. TXT (rather than the whole environment)
软体按摩机器人驱动器的设计与仿真
36氪详情页AES
timestamp
CSP-S2019 Day2
10 minutes to get you started chrome (Google) browser plug-in development
QImage的指针问题
pytorch one-hot tips