当前位置:网站首页>Flink Yarn Per Job - 提交流程一
Flink Yarn Per Job - 提交流程一
2022-08-01 23:43:00 【hyunbar】
AbstractJobClusterExecutor.java
@Override
public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration, @Nonnull final ClassLoader userCodeClassloader) throws Exception {
/*TODO 将 流图(StreamGraph) 转换成 作业图(JobGraph)*/
final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
/*TODO 集群描述器:创建、启动了 YarnClient, 包含了一些yarn、flink的配置和环境信息*/
try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
/*TODO 集群特有资源配置:JobManager内存、TaskManager内存、每个Tm的slot数*/
final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
final ClusterClientProvider<ClusterID> clusterClientProvider = clusterDescriptor
.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode());
LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
return CompletableFuture.completedFuture(
new ClusterClientJobClientAdapter<>(clusterClientProvider, jobGraph.getJobID(), userCodeClassloader));
}
将StreamGraph转换为JobGraph
**(1)找到createJobGraph方法
**
1)PipelineExecutorUtils
public static JobGraph getJobGraph(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws MalformedURLException {
checkNotNull(pipeline);
checkNotNull(configuration);
final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
// 往下看
final JobGraph jobGraph = FlinkPipelineTranslationUtil
.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
configuration
.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)
.ifPresent(strJobID -> jobGraph.setJobID(JobID.fromHexString(strJobID)));
jobGraph.addJars(executionConfigAccessor.getJars());
jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
return jobGraph;
}
2)FlinkPipelineTranslationUtil
public static JobGraph getJobGraph(
Pipeline pipeline,
Configuration optimizerConfiguration,
int defaultParallelism) {
FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(pipeline);
// 往下看
return pipelineTranslator.translateToJobGraph(pipeline,
optimizerConfiguration,
defaultParallelism);
}
3)StreamGraphTranslator implements FlinkPipelineTranslator
@Override
public JobGraph translateToJobGraph(
Pipeline pipeline,
Configuration optimizerConfiguration,
int defaultParallelism) {
…
StreamGraph streamGraph = (StreamGraph) pipeline;
// 重点
return streamGraph.getJobGraph(null);
}
4)StreamGraph
public JobGraph getJobGraph(@Nullable JobID jobID) {
return StreamingJobGraphGenerator.createJobGraph(this, jobID);
}
5)StreamingJobGraphGenerator
public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
}
private JobGraph createJobGraph() {
preValidate();
// make sure that all vertices start immediately
/*TODO streaming 模式下,调度模式是所有节点(vertices)一起启动:Eager */
jobGraph.setScheduleMode(streamGraph.getScheduleMode());
jobGraph.enableApproximateLocalRecovery(streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());
// Generate deterministic hashes for the nodes in order to identify them across
// submission iff they didn’t change.
// 广度优先遍历 StreamGraph 并且为每个SteamNode生成hash id,
// 保证如果提交的拓扑没有改变,则每次生成的hash都是一样的
Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
// Generate legacy version hashes for backwards compatibility
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
/* TODO 最重要的函数,生成 JobVertex,JobEdge等,并尽可能地将多个节点chain在一起*/
setChaining(hashes, legacyHashes);
/TODO 将每个JobVertex的入边集合也序列化到该JobVertex的StreamConfig中 (出边集合已经在setChaining的时候写入了)/
setPhysicalEdges();
/TODO 根据group name,为每个 JobVertex 指定所属的 SlotSharingGroup 以及针对 Iteration的头尾设置 CoLocationGroup/
setSlotSharingAndCoLocation();
setManagedMemoryFraction(
Collections.unmodifiableMap(jobVertices),
Collections.unmodifiableMap(vertexConfigs),
Collections.unmodifiableMap(chainedConfigs),
id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());
configureCheckpointing();
jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
JobGraphUtils.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);
// set the ExecutionConfig last when it has been finalized
try {
/TODO 将 StreamGraph 的 ExecutionConfig 序列化到 JobGraph 的配置中/
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
}
catch (IOException e) {
throw new IllegalConfigurationException(“Could not serialize the ExecutionConfig.” +
“This indicates that non-serializable types (like custom serializers) were registered”);
}
return jobGraph;
}
**(1)生成 JobVertex,JobEdge,并尽可能地将多个节点chain在一起**
1)StreamingJobGraphGenerator
operators start at position 1 because 0 is for chained source inputs
chain的开始位置是1,因为0是source input
/**
- Sets up task chains from the source {@link StreamNode} instances.
This will recursively create all {@link JobVertex} instances.
*/
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
// we separate out the sources that run as inputs to another operator (chained inputs)
// from the sources that needs to run as the main (head) operator.
final Map<Integer, OperatorChainInfo> chainEntryPoints = buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);
final Collection initialEntryPoints = new ArrayList<>(chainEntryPoints.values());
// iterate over a copy of the values, because this map gets concurrently modified
// 从source开始建⽴ node chains
for (OperatorChainInfo info : initialEntryPoints) {
// 构建node chains,返回当前节点的物理出边;startNodeId != currentNodeId 时,说明currentNode是chain中的子节点
createChain(
info.getStartNodeId(),
1, // operators start at position 1 because 0 is for chained source inputs
info,
chainEntryPoints);
}
}
private List createChain(
final Integer currentNodeId,
final int chainIndex,
final OperatorChainInfo chainInfo,
final Map<Integer, OperatorChainInfo> chainEntryPoints) {
Integer startNodeId = chainInfo.getStartNodeId();
if (!builtVertices.contains(startNodeId)) {
/TODO 过渡用的出边集合, 用来生成最终的 JobEdge, 注意不包括 chain 内部的边/
List transitiveOutEdges = new ArrayList();
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
/*TODO 将当前节点的出边分成 chainable 和 nonChainable 两类*/
for (StreamEdge outEdge : currentNode.getOutEdges()) {
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
for (StreamEdge chainable : chainableOutputs) {
transitiveOutEdges.addAll(
createChain(chainable.getTargetId(), chainIndex + 1, chainInfo, chainEntryPoints));
}
/*TODO 递归调用 createChain*/
for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
createChain(
nonChainable.getTargetId(),
1, // operators start at position 1 because 0 is for chained source inputs
chainEntryPoints.computeIfAbsent(
nonChainable.getTargetId(),
(k) -> chainInfo.newChain(nonChainable.getTargetId())),
chainEntryPoints);
}
/*TODO 生成当前节点的显示名,如:"Keyed Aggregation -> Sink: Unnamed"*/
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs, Optional.ofNullable(chainEntryPoints.get(currentNodeId))));
chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
OperatorID currentOperatorId = chainInfo.addNodeToChain(currentNodeId, chainedNames.get(currentNodeId));
if (currentNode.getInputFormat() != null) {
getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
}
if (currentNode.getOutputFormat() != null) {
getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
}
/*TODO 如果当前节点是起始节点, 则直接创建 JobVertex 并返回 StreamConfig, 否则先创建一个空的 StreamConfig */
StreamConfig config = currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, chainInfo)
: new StreamConfig(new Configuration());
/*TODO 设置 JobVertex 的 StreamConfig, 基本上是序列化 StreamNode 中的配置到 StreamConfig中.*/
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs, chainInfo.getChainedSources());
if (currentNodeId.equals(startNodeId)) {
/*TODO 如果是chain的起始节点,标记成chain start(不是chain中的节点,也会被标记成 chain start)*/
config.setChainStart();
config.setChainIndex(chainIndex);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
/*TODO 将当前节点(headOfChain)与所有出边相连*/
for (StreamEdge edge : transitiveOutEdges) {
/*TODO 通过StreamEdge构建出JobEdge,创建 IntermediateDataSet,用来将JobVertex和JobEdge相连*/
connect(startNodeId, edge);
}
/*TODO 把物理出边写入配置, 部署时会用到*/
config.setOutEdgesInOrder(transitiveOutEdges);
/*TODO 将chain中所有子节点的StreamConfig写入到 headOfChain 节点的 CHAINED_TASK_CONFIG 配置中*/
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else {
/*TODO 如果是 chain 中的子节点*/
chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
config.setChainIndex(chainIndex);
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
/*TODO 将当前节点的StreamConfig添加到该chain的config集合中*/
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
config.setOperatorID(currentOperatorId);
if (chainableOutputs.isEmpty()) {
config.setChainEnd();
}
/*TODO 返回连往chain外部的出边集合*/
return transitiveOutEdges;
} else {
return new ArrayList<>();
}
}
创建启动YarnClient
1)StandaloneClientFactory implements ClusterClientFactory
创建、启动了 YarnClient, 包含了一些yarn、flink的配置和环境信息
public StandaloneClusterDescriptor createClusterDescriptor(Configuration configuration) {
checkNotNull(configuration);
return new StandaloneClusterDescriptor(configuration);
}
2)YarnClusterClientFactory
private YarnClusterDescriptor getClusterDescriptor(Configuration configuration) {
/TODO 创建了YarnClient/
final YarnClient yarnClient = YarnClient.createYarnClient();
final YarnConfiguration yarnConfiguration = new YarnConfiguration();
/TODO 初始化、启动 YarnClient/
yarnClient.init(yarnConfiguration);
yarnClient.start();
return new YarnClusterDescriptor(
configuration,
yarnConfiguration,
yarnClient,
YarnClientYarnClusterInformationRetriever.create(yarnClient),
false);
}
集群资源配置
**(1) 配置内存**
JobManager内存 = jobmanager.memory.process.size
TaskManager内存 = taskmanager.memory.process.size
每个Tm的slot数 = taskmanager.numberOfTaskSlots
public ClusterSpecification getClusterSpecification(Configuration configuration) {
checkNotNull(configuration);
// jm 的内存 jobmanager.memory.process.size
final int jobManagerMemoryMB = JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
configuration,
JobManagerOptions.TOTAL_PROCESS_MEMORY)
.getTotalProcessMemorySize()
.getMebiBytes();
// tm 的内存 taskmanager.memory.process.size
final int taskManagerMemoryMB = TaskExecutorProcessUtils
.processSpecFromConfig(TaskExecutorProcessUtils.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
configuration, TaskManagerOptions.TOTAL_PROCESS_MEMORY))
.getTotalProcessMemorySize()
.getMebiBytes();
// slot的个数 taskmanager.numberOfTaskSlots
int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
return new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(jobManagerMemoryMB)
.setTaskManagerMemoryMB(taskManagerMemoryMB)
.setSlotsPerTaskManager(slotsPerTaskManager)
.createClusterSpecification();
}
集群部署
YarnClusterDescriptor
public ClusterClientProvider deployJobCluster(
ClusterSpecification clusterSpecification,
JobGraph jobGraph,
boolean detached) throws ClusterDeploymentException {
try {
// 1)
return deployInternal(
clusterSpecification,
“Flink per-job cluster”,
// 2)
getYarnJobClusterEntrypoint(),
jobGraph,
detached);
} catch (Exception e) {
throw new ClusterDeploymentException(“Could not deploy Yarn job cluster.”, e);
}
}
**(1) deployInternal方法**
/**
- This method will block until the ApplicationMaster/JobManager have been deployed on YARN.
- @param clusterSpecification Initial cluster specification for the Flink cluster to be deployed
- @param applicationName name of the Yarn application to start
- @param yarnClusterEntrypoint Class name of the Yarn cluster entry point.
- @param jobGraph A job graph which is deployed with the Flink cluster, {@code null} if none
- @param detached True if the cluster should be started in detached mode
*/
private ClusterClientProvider deployInternal(
ClusterSpecification clusterSpecification,
String applicationName,
String yarnClusterEntrypoint,
@Nullable JobGraph jobGraph,
boolean detached) throws Exception {
final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
if (!HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache)) {
throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user " +
"does not have Kerberos credentials or delegation tokens!");
}
}
/TODO 部署前检查:jar包路径、conf路径、yarn最大核数…/
isReadyForDeployment(clusterSpecification);
// ------------------ Check if the specified queue exists --------------------
/TODO 检查指定的yarn队列是否存在/
checkYarnQueues(yarnClient);
// ------------------ Check if the YARN ClusterClient has the requested resources --------------
/TODO 检查yarn是否有足够的资源/
// Create application via yarnClient
final YarnClientApplication yarnApplication = yarnClient.createApplication();
final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
Resource maxRes = appResponse.getMaximumResourceCapability();
final ClusterResourceDescription freeClusterMem;
try {
freeClusterMem = getCurrentFreeClusterResources(yarnClient);
} catch (YarnException | IOException e) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw new YarnDeploymentException(“Could not retrieve information about free cluster resources.”, e);
}
final int yarnMinAllocationMB = yarnConfiguration.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
if (yarnMinAllocationMB <= 0) {
throw new YarnDeploymentException(“The minimum allocation memory "
+ “(” + yarnMinAllocationMB + " MB) configured via '” + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
+ “’ should be greater than 0.”);
}
final ClusterSpecification validClusterSpecification;
try {
validClusterSpecification = validateClusterResources(
clusterSpecification,
yarnMinAllocationMB,
maxRes,
freeClusterMem);
} catch (YarnDeploymentException yde) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw yde;
}
LOG.info(“Cluster specification: {}”, validClusterSpecification);
final ClusterEntrypoint.ExecutionMode executionMode = detached ?
ClusterEntrypoint.ExecutionMode.DETACHED
: ClusterEntrypoint.ExecutionMode.NORMAL;
flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString());
/TODO 开始启动AM/
ApplicationReport report = startAppMaster(
flinkConfiguration,
applicationName,
yarnClusterEntrypoint,
jobGraph,
yarnClient,
yarnApplication,
validClusterSpecification);
// print the application id for user to cancel themselves.
if (detached) {
final ApplicationId yarnApplicationId = report.getApplicationId();
logDetachedClusterInformation(yarnApplicationId, LOG);
}
setClusterEntrypointInfoToConfig(report);
return () -> {
try {
return new RestClusterClient<>(flinkConfiguration, report.getApplicationId());
} catch (Exception e) {
throw new RuntimeException(“Error while creating RestClusterClient.”, e);
}
};
}
1)部署前检查:jar包路径、conf路径、yarn最大核数
private void isReadyForDeployment(ClusterSpecification clusterSpecification) throws Exception {
if (this.flinkJarPath == null) {
throw new YarnDeploymentException(“The Flink jar path is null”);
}
if (this.flinkConfiguration == null) {
throw new YarnDeploymentException(“Flink configuration object has not been set”);
}
// Check if we don’t exceed YARN’s maximum virtual cores.
final int numYarnMaxVcores = yarnClusterInformationRetriever.getMaxVcores();
int configuredAmVcores = flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES);
if (configuredAmVcores > numYarnMaxVcores) {
throw new IllegalConfigurationException(
String.format(“The number of requested virtual cores for application master %d” +
" exceeds the maximum number of virtual cores %d available in the Yarn Cluster.",
configuredAmVcores, numYarnMaxVcores));
}
int configuredVcores = flinkConfiguration.getInteger(YarnConfigOptions.VCORES, clusterSpecification.getSlotsPerTaskManager());
// don’t configure more than the maximum configured number of vcores
if (configuredVcores > numYarnMaxVcores) {
throw new IllegalConfigurationException(
String.format(“The number of requested virtual cores per node %d” +
" exceeds the maximum number of virtual cores %d available in the Yarn Cluster." +
" Please note that the number of virtual cores is set to the number of task slots by default" +
" unless configured in the Flink config with ‘%s.’",
configuredVcores, numYarnMaxVcores, YarnConfigOptions.VCORES.key()));
}
// check if required Hadoop environment variables are set. If not, warn user
if (System.getenv(“HADOOP_CONF_DIR”) == null &&
System.getenv(“YARN_CONF_DIR”) == null) {
LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. " +
"The Flink YARN Client needs one of these to be set to properly load the Hadoop " +
“configuration for accessing YARN.”);
}
}
2)检查yarn资源是否够
private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException {
List nodes = yarnClient.getNodeReports(NodeState.RUNNING);
int totalFreeMemory = 0;
int containerLimit = 0;
int[] nodeManagersFree = new int[nodes.size()];
for (int i = 0; i < nodes.size(); i++) {
NodeReport rep = nodes.get(i);
int free = rep.getCapability().getMemory() - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0);
nodeManagersFree[i] = free;
totalFreeMemory += free;
if (free > containerLimit) {
containerLimit = free;
}
}
return new ClusterResourceDescription(totalFreeMemory, containerLimit, nodeManagersFree);
}
**找到最小资源配置**
RM\_SCHEDULER\_MINIMUM\_ALLOCATION\_MB=yarn.scheduler.minimum-allocation-mb
DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB=1024

边栏推荐
猜你喜欢
sys_kill系统调用
Sql之各种Join
Enterprise firewall management, what firewall management tools are there?
Making a Simple 3D Renderer
Appears in oozie on CDH's hue, error submitting Coordinator My Schedule
Quartus uses tcl files to quickly configure pins
1个月写900多条用例,二线城市年薪33W+的测试经理能有多卷?
分享一份接口测试项目(非常值得练手)
[C language advanced] file operation (2)
The third chapter of the imitation cattle network project: develop the core functions of the community (detailed steps and ideas)
随机推荐
FAST-LIO2 code analysis (2)
Jmeter是什么
基于JAX的激活函数、softmax函数和交叉熵函数
工作5年,测试用例都设计不好?来看看大厂的用例设计总结
Calculate the midpoint between two points
研发团队数字化转型实践
numpy.around
sys_kill系统调用
UML diagram of soft skills
Sql之各种Join
@Scheduled注解详解
PostgreSQL 基础--常用命令
The monthly salary of the test post is 5-9k, how to increase the salary to 25k?
cdh6 opens oozieWeb page, Oozie web console is disabled.
oozie startup error on cdh's hue, Cannot allocate containers as requested resource is greater than maximum allowed
辛普森悖论
Additional Features for Scripting
仿牛客网项目第三章:开发社区核心功能(详细步骤和思路)
Architecture basic concept and nature of architecture
Avoid hidden text when loading fonts