当前位置:网站首页>Flink--Chain的条件源码分析
Flink--Chain的条件源码分析
2022-07-03 09:32:00 【Samooyou】
flink为了提高执行效率,会将部分算子进行合并,合并后组成operator chain,这样整个operator chain作为一个JobVertex被调度到一个slot上去执行,避免了上下游算子发送数据时带来的网络开销以及数据序列化反序列化的性能损耗。
flink如何判断算子能组成一个operate chain呢?答案在源码StreamingJobGraphGenerator.java中:
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();
StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();
return downStreamVertex.getInEdges().size() == 1
&& outOperator != null
&& headOperator != null
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled();
}downStreamVertex.getInEdges().size() == 1:下游算子的入边必须为1,下游算子不能是connect,union,join,这些算子入边都是大于1,因为他需要接入两个流upStreamVertex.isSameSlotSharingGroup(downStreamVertex):上游算子和下游算子都在同一个SlotSharingGroup:每个算子都可以通过.slotSharingGroup()来指定该算子的SlotSharingGroup,默认不指定都是Default,如果想要上下游算子组成operater chain,上下游的SlotSharingGroup必须相同,要么都不指定,要么设置为相同的值outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS):下游算子的ChainingStrategy策略必须是ALWAYS,上游算子的ChainingStrategy策略必须是ALWAYS或者HEAD:如果算子调用了disableChaining()的话,该算子的ChainingStrategy会变成NEVER,不能与上下游任何算子组成operator chain;如果算子调用了startNewChain(),那么这个算子的ChainingStrategy就变成HEAD;不能与上游算子组成chain,只能与下游算子组成chain。Source算子默认是HEAD,其他算子默认都是ALWAYSedge.getPartitioner() instanceof ForwardPartitioner:上下游算子之间的数据分区方式是ForwardPartitioner:数据分区有几种分区方式:shuffle,rebalance,rescale,broadcast,keyedgroup,forward等。如果上下游算子之间没有显示调用其他分区模式(keyby,shuffle,rebalance,rescale,broadcast)并且上下游算子并发度相同的情况下,默认都是ForwardPartitioner模式edge.getShuffleMode() != ShuffleMode.BATCH:如果没有指定默认都是PIPELINEDupStreamVertex.getParallelism() == downStreamVertex.getParallelism():上下游算子的并发度必须相同streamGraph.isChainingEnabled():job没有调用disableChaining()
边栏推荐
- Leetcode skimming ---977
- Codeup: word replacement
- Knowledge map reasoning -- hybrid neural network and distributed representation reasoning
- Ut2014 supplementary learning notes
- Ut2013 learning notes
- Content type ‘application/x-www-form-urlencoded;charset=UTF-8‘ not supported
- MySQL报错“Expression #1 of SELECT list is not in GROUP BY clause and contains nonaggre”解决方法
- Model evaluation and selection
- Leetcode skimming ---202
- Numpy Foundation
猜你喜欢

神经网络入门之矩阵计算(Pytorch)

Numpy realizes the classification of iris by perceptron

ECMAScript -- "ES6 syntax specification # Day1

Unity学习笔记:联网游戏Pixel Adventure 1学习过程&纠错心得

八、MySQL之事务控制语言

Numpy quick start (II) -- Introduction to array (creation of array + basic operation of array)

A detailed explanation of vector derivative and matrix derivative

Tensorflow—Neural Style Transfer

2021-09-22

Hou Jie -- STL source code analysis notes
随机推荐
神经网络入门之矩阵计算(Pytorch)
六、MySQL之数据定义语言(一)
C project - dormitory management system (1)
熵值法求权重
A complete mall system
mysql5.7安装和配置教程(图文超详细版)
ThreadLocal原理及使用场景
Pytoch has been installed, but vs code still displays no module named 'torch‘
CSDN, I'm coming!
神经网络入门之模型选择(PyTorch)
A complete answer sheet recognition system
Raspberry pie 4B installs yolov5 to achieve real-time target detection
Hou Jie -- STL source code analysis notes
SQL Server Management Studio cannot be opened
Numpy quick start (II) -- Introduction to array (creation of array + basic operation of array)
Tensorflow - tensorflow Foundation
Numpy quick start (V) -- Linear Algebra
Introduction to deep learning linear algebra (pytorch)
[ultra detailed] implement soft and hard interval SVM based on sklearn
Powshell's set location: unable to find a solution to the problem of accepting actual parameters