当前位置:网站首页>flink 物理分区( 随机分区、 轮询分区、重缩放分区、 广播、 全局分区、自定义分区 )
flink 物理分区( 随机分区、 轮询分区、重缩放分区、 广播、 全局分区、自定义分区 )
2022-06-11 12:06:00 【但行益事莫问前程】
文章目录
前言
Flink中keyBy是一种按照键的哈希值来进行重新分区的操作,至于分区是否均匀、每个key 的数据具体会分到哪一区无法控制,因此keyBy 是一种逻辑分区(logical partitioning)操作。只有物理分区(physical partitioning),才真正控制分区策略精准地调配数据。
物理分区与 keyBy 另一大区别在于,keyBy 之后得到的是一个 KeyedStream,而物理分区之后结果仍是 DataStream,且流中元素数据类型保持不变。分区算子并不对数据进行转换处理,只是定义了数据的传输方式
1. 随机分区(shuffle)
Partitions elements randomly according to a uniform distribution
通过调用 DataStream 的.shuffle()方法,将数据随机地分配到下游算子的并行任务中去
随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区。因为是完全随机的,所以对于同样的输入数据, 每次执行得到的结果也不会相同。

dataStream.shuffle();
2. 轮询分区(Round-Robin)
按照先后顺序将数据做依次分发。通过调用 DataStream 的.rebalance()方法,就可以实现轮询重分区。rebalance使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去
dataStream.rebalance()
3. 重缩放分区(rescale)
重缩放分区和轮询分区非常相似。当调用 rescale()方法时,其实底层也是使用 Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中。 rebalance 的方式是每个发牌人都面向所有人发牌;而 rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌。
由于 rebalance 是所有分区数据的“重新平衡”,当 TaskManager 数据量较多时,这种跨节点的网络传输必然影响效率;而如果配置的 task slot 数量合适,用 rescale 的方式进行“局部重缩放”,就可以让数据只在当前 TaskManager 的多个 slot 之间重新分配,从而避免了网络传输带来的损耗。
从底层实现上看,rebalance 和 rescale 的根本区别在于任务之间的连接机制不同。rebalance将会针对所有上游任务(发送数据方)和所有下游任务(接收数据方)之间建立通信通道,这是一个笛卡尔积的关系;而 rescale 仅仅针对每一个任务和下游对应的部分任务之间建立通信通道,节省了很多资源。
dataStream.rescale()
4. 广播(broadcast)
经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用 DataStream 的 broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。
dataStream.broadcast();
5. 全局分区(global)
通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了 1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力
6. 自定义分区(Custom)
当 Flink 提 供 的 所 有 分 区 策 略 都 不 能 满 足 用 户 的 需 求 时 , 可 以 通 过 使 用partitionCustom()方法来自定义分区策略。
dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);
在调用时,方法需要传入两个参数,第一个是自定义分区器(Partitioner)对象,第二个是应用分区器的字段,它的指定方式与 keyBy 指定 key 基本一样:可以通过字段名称指定,也可以通过字段位置索引来指定,还可以实现一个 KeySelector
public class CustomPartitionTest {
//对一组自然数按照奇偶性进行重分区
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 将自然数按照奇偶分区
env.fromElements(1, 2, 3, 4, 5, 6, 7, 8)
.partitionCustom(new Partitioner<Integer>() {
@Override
public int partition(Integer key, int numPartitions) {
return key % 2;
}
}, new KeySelector<Integer, Integer>() {
@Override
public Integer getKey(Integer value) throws Exception {
return value;
}
})
.print()
.setParallelism(2);
env.execute();
}
}

边栏推荐
- Use compiler option '--downleveliteration' to allow iteration of iterations
- Zhouhongyi's speech at the China Network Security Annual Conference: 360 secure brain builds a data security system
- centos安装mysql5.7
- Generate statement is not synthesized
- Hamiltonian graph
- Four ways to create threads
- Eulato
- 带你了解直接插入排序(C语言)
- Weekly Postgres world news 2022w08
- Is the SSL certificate reliable in ensuring the information security of the website?
猜你喜欢
随机推荐
C # set or verify the format of text field in PDF
进度条加载
线程五种状态(线程生命周期)
Yapi installation
记一次 mysql 主从不同步问题排查
广东市政安全施工资料管理软件2022新表格来啦
JS addition and multiplication error resolution number precision
arguments.callee 实现函数递归调用
Use of Chinese input method input event composition
C reads TXT file to generate word document
CVPR 2022 | 文本引导的实体级别图像操作ManiTrans
(解决)Splunk 之 kv-store down 问题
Use cache to reduce network requests
Where is it safer to open an account for soda ash futures? How is the deposit calculated?
Addition of large numbers (C language)
你管这破玩意儿叫 MQ?
软件项目管理 7.1.项目进度基本概念
中国联通 22春招 群面
Elk - hearthbeat implements service monitoring
Iframe value transfer









