当前位置:网站首页>Flink--Chain的条件源码分析
Flink--Chain的条件源码分析
2022-07-03 09:32:00 【Samooyou】
flink为了提高执行效率,会将部分算子进行合并,合并后组成operator chain,这样整个operator chain作为一个JobVertex被调度到一个slot上去执行,避免了上下游算子发送数据时带来的网络开销以及数据序列化反序列化的性能损耗。
flink如何判断算子能组成一个operate chain呢?答案在源码StreamingJobGraphGenerator.java中:
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();
StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();
return downStreamVertex.getInEdges().size() == 1
&& outOperator != null
&& headOperator != null
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled();
}downStreamVertex.getInEdges().size() == 1:下游算子的入边必须为1,下游算子不能是connect,union,join,这些算子入边都是大于1,因为他需要接入两个流upStreamVertex.isSameSlotSharingGroup(downStreamVertex):上游算子和下游算子都在同一个SlotSharingGroup:每个算子都可以通过.slotSharingGroup()来指定该算子的SlotSharingGroup,默认不指定都是Default,如果想要上下游算子组成operater chain,上下游的SlotSharingGroup必须相同,要么都不指定,要么设置为相同的值outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS):下游算子的ChainingStrategy策略必须是ALWAYS,上游算子的ChainingStrategy策略必须是ALWAYS或者HEAD:如果算子调用了disableChaining()的话,该算子的ChainingStrategy会变成NEVER,不能与上下游任何算子组成operator chain;如果算子调用了startNewChain(),那么这个算子的ChainingStrategy就变成HEAD;不能与上游算子组成chain,只能与下游算子组成chain。Source算子默认是HEAD,其他算子默认都是ALWAYSedge.getPartitioner() instanceof ForwardPartitioner:上下游算子之间的数据分区方式是ForwardPartitioner:数据分区有几种分区方式:shuffle,rebalance,rescale,broadcast,keyedgroup,forward等。如果上下游算子之间没有显示调用其他分区模式(keyby,shuffle,rebalance,rescale,broadcast)并且上下游算子并发度相同的情况下,默认都是ForwardPartitioner模式edge.getShuffleMode() != ShuffleMode.BATCH:如果没有指定默认都是PIPELINEDupStreamVertex.getParallelism() == downStreamVertex.getParallelism():上下游算子的并发度必须相同streamGraph.isChainingEnabled():job没有调用disableChaining()
边栏推荐
- [untitled] numpy learning
- Leetcode skimming ---202
- extern关键字
- ThreadLocal principle and usage scenario
- 6、 Data definition language of MySQL (1)
- Leetcode刷题---35
- Unity group engineering practice project "the strongest takeaway" planning case & error correction document
- Pytoch has been installed, but vs code still displays no module named 'torch‘
- High imitation bosom friend manke comic app
- Install yolov3 (Anaconda)
猜你喜欢

Pytorch ADDA code learning notes

Introduction to deep learning linear algebra (pytorch)

Hands on deep learning pytorch version exercise solution - 2.4 calculus
![[untitled]](/img/2b/177970366174e50e75b5c820c95d08.jpg)
[untitled]

A detailed explanation of vector derivative and matrix derivative

侯捷——STL源码剖析 笔记

Tensorflow—Image segmentation

Drop out (pytoch)

Tensorflow—Neural Style Transfer

2018 y7000 upgrade hard disk + migrate and upgrade black apple
随机推荐
Leetcode刷题---10
Common scenarios in which Seata distributed transactions fail and do not take effect (transactions do not rollback)
Numpy quick start (I) -- pre knowledge (create array + constant + data type)
High imitation wechat
Leetcode刷题---1
Practical part: conversion of Oracle Database Standard Edition (SE) to Enterprise Edition (EE)
六、MySQL之数据定义语言(一)
Leetcode skimming ---278
Codeup: word replacement
Leetcode刷题---189
Leetcode skimming ---283
神经网络入门之预备知识(PyTorch)
Hands on deep learning pytorch version exercise answer - 2.2 preliminary knowledge / data preprocessing
Multilayer perceptron (pytorch)
Raspberry pie 4B deploys lnmp+tor and builds a website on dark web
C#项目-寝室管理系统(1)
Hands on deep learning pytorch version exercise solution - 2.5 automatic differentiation
Leetcode skimming ---189
User recommendation preference model based on attention enhanced knowledge perception
Type de contenu « Application / X - www - form - urlencoded; Charset = utf - 8 'not supported