当前位置:网站首页>Flink chain conditional source code analysis
Flink chain conditional source code analysis
2022-07-03 10:47:00 【Samooyou】
flink In order to improve the efficiency of execution , Will merge some operators , Composition after merger operator chain, So the whole operator chain As a JobVertex Be dispatched to a slot Go ahead and execute , It avoids the network overhead caused by upstream and downstream operators when sending data and the performance loss of data serialization and deserialization .
flink How to judge whether an operator can form a operate chain Well ? The answer is in the source code StreamingJobGraphGenerator.java in :
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();
StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();
return downStreamVertex.getInEdges().size() == 1
&& outOperator != null
&& headOperator != null
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled();
}downStreamVertex.getInEdges().size() == 1: The entry edge of the downstream operator must be 1, The downstream operator cannot be connect,union,join, The edges of these operators are greater than 1, Because he needs to access two streamsupStreamVertex.isSameSlotSharingGroup(downStreamVertex): Both upstream and downstream operators are in the same SlotSharingGroup: Every operator can pass .slotSharingGroup() To specify the SlotSharingGroup, The default is not to specify Default, If you want to form upstream and downstream operators operater chain, Upstream and downstream SlotSharingGroup It has to be the same , Or none of them , Or set it to the same valueoutOperator.getChainingStrategy() == ChainingStrategy.ALWAYS && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS): Downstream operator ChainingStrategy The strategy must be ALWAYS, Upstream operator ChainingStrategy The strategy must be ALWAYS perhaps HEAD: If the operator calls disableChaining() Words , Of this operator ChainingStrategy Will become NEVER, It cannot be composed of any upstream and downstream operators operator chain; If the operator calls startNewChain(), So this operator's ChainingStrategy It becomes HEAD; Cannot be formed with upstream operators chain, It can only be composed of downstream operators chain.Source Operator defaults to HEAD, Other operators default to ALWAYSedge.getPartitioner() instanceof ForwardPartitioner: The data partition mode between upstream and downstream operators is ForwardPartitioner: There are several ways to partition data :shuffle,rebalance,rescale,broadcast,keyedgroup,forward etc. . If there is no display between upstream and downstream operators, call other partition modes (keyby,shuffle,rebalance,rescale,broadcast) And the concurrency of upstream and downstream operators is the same , The default is ForwardPartitioner Patternedge.getShuffleMode() != ShuffleMode.BATCH: If not specified, the default is PIPELINEDupStreamVertex.getParallelism() == downStreamVertex.getParallelism(): The concurrency of upstream and downstream operators must be the samestreamGraph.isChainingEnabled():job There is no call disableChaining()
original text :flink operater chain - Simple books
边栏推荐
- Bid -- service commitment -- self summary
- 熵值法求权重
- Knowledge map enhancement recommendation based on joint non sampling learning
- Ut2011 learning notes
- Buy health products for parents
- Leetcode刷题---278
- 安装yolov3(Anaconda)
- MySQL报错“Expression #1 of SELECT list is not in GROUP BY clause and contains nonaggre”解决方法
- 神经网络入门之模型选择(PyTorch)
- Flink <-->Redis的使用介绍+with参数
猜你喜欢

安装yolov3(Anaconda)

2021-09-22

帶你走進雲原生數據庫界扛把子Amazon Aurora

Entropy method to calculate weight

Tensorflow - tensorflow Foundation

ThreadLocal原理及使用场景

Ut2014 supplementary learning notes

User recommendation preference model based on attention enhanced knowledge perception

Linear regression of introduction to deep learning (pytorch)

Matrix calculation of Neural Network Introduction (pytoch)
随机推荐
Softmax regression (pytorch)
[ultra detailed] implement soft and hard interval SVM based on sklearn
七、MySQL之数据定义语言(二)
Leetcode skimming ---283
Leetcode skimming ---704
How to hide cvxpy warnings: warn: a- > P (column pointers) not strictly increasing, column x empty?
Softmax 回归(PyTorch)
Iterator iterator enhances for loop
缓存路由组件
Windows security center open blank
Automatic derivation of introduction to deep learning (pytoch)
Leetcode skimming ---217
C#项目-寝室管理系统(1)
神经网络入门之矩阵计算(Pytorch)
Leetcode skimming ---1385
Leetcode刷题---278
Leetcode skimming ---44
Introduction to deep learning linear algebra (pytorch)
Unity小组工程实践项目《最强外卖员》策划案&纠错文档
Preliminary knowledge of Neural Network Introduction (pytorch)