当前位置:网站首页>Flink chain conditional source code analysis
Flink chain conditional source code analysis
2022-07-03 10:47:00 【Samooyou】
flink In order to improve the efficiency of execution , Will merge some operators , Composition after merger operator chain, So the whole operator chain As a JobVertex Be dispatched to a slot Go ahead and execute , It avoids the network overhead caused by upstream and downstream operators when sending data and the performance loss of data serialization and deserialization .
flink How to judge whether an operator can form a operate chain Well ? The answer is in the source code StreamingJobGraphGenerator.java in :
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();
StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();
return downStreamVertex.getInEdges().size() == 1
&& outOperator != null
&& headOperator != null
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled();
}downStreamVertex.getInEdges().size() == 1: The entry edge of the downstream operator must be 1, The downstream operator cannot be connect,union,join, The edges of these operators are greater than 1, Because he needs to access two streamsupStreamVertex.isSameSlotSharingGroup(downStreamVertex): Both upstream and downstream operators are in the same SlotSharingGroup: Every operator can pass .slotSharingGroup() To specify the SlotSharingGroup, The default is not to specify Default, If you want to form upstream and downstream operators operater chain, Upstream and downstream SlotSharingGroup It has to be the same , Or none of them , Or set it to the same valueoutOperator.getChainingStrategy() == ChainingStrategy.ALWAYS && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS): Downstream operator ChainingStrategy The strategy must be ALWAYS, Upstream operator ChainingStrategy The strategy must be ALWAYS perhaps HEAD: If the operator calls disableChaining() Words , Of this operator ChainingStrategy Will become NEVER, It cannot be composed of any upstream and downstream operators operator chain; If the operator calls startNewChain(), So this operator's ChainingStrategy It becomes HEAD; Cannot be formed with upstream operators chain, It can only be composed of downstream operators chain.Source Operator defaults to HEAD, Other operators default to ALWAYSedge.getPartitioner() instanceof ForwardPartitioner: The data partition mode between upstream and downstream operators is ForwardPartitioner: There are several ways to partition data :shuffle,rebalance,rescale,broadcast,keyedgroup,forward etc. . If there is no display between upstream and downstream operators, call other partition modes (keyby,shuffle,rebalance,rescale,broadcast) And the concurrency of upstream and downstream operators is the same , The default is ForwardPartitioner Patternedge.getShuffleMode() != ShuffleMode.BATCH: If not specified, the default is PIPELINEDupStreamVertex.getParallelism() == downStreamVertex.getParallelism(): The concurrency of upstream and downstream operators must be the samestreamGraph.isChainingEnabled():job There is no call disableChaining()
original text :flink operater chain - Simple books
边栏推荐
猜你喜欢

Entropy method to calculate weight

Detailed cross validation and grid search -- sklearn implementation

MySql 怎么查出符合条件的最新的数据行?

Pour vous amener dans le monde des bases de données natives du cloud

QT:QSS自定义 QScrollBar实例

Jetson TX2 刷机
![[untitled]](/img/2b/177970366174e50e75b5c820c95d08.jpg)
[untitled]
正常一英寸25.4厘米,在影像领域是16厘米

神经网络入门之预备知识(PyTorch)

How to hide cvxpy warnings: warn: a- > P (column pointers) not strictly increasing, column x empty?
随机推荐
Ut2012 learning notes
Ut2011 learning notes
Mysql--索引原理+如何使用
Bidding website architecture project progress -- Network Security
Softmax regression (pytorch)
Leetcode刷题---75
Leetcode刷题---367
Flink--Chain的条件源码分析
Introduction to deep learning linear algebra (pytorch)
Windows security center open blank
MySQL报错“Expression #1 of SELECT list is not in GROUP BY clause and contains nonaggre”解决方法
Ind yff first week
[combinatorial mathematics] pigeon nest principle (simple form examples of pigeon nest Principle 4 and 5)
正常一英寸25.4厘米,在影像领域是16厘米
Flink--自定义函数
面试官:Redis中列表的内部实现方式是什么?
QT:QSS自定义 QMenuBar实例
MySQL checks for automatic updates at 0:00 every day
8、 Transaction control language of MySQL
7、 Data definition language of MySQL (2)