当前位置:网站首页>flink自定义轮询分区产生的问题
flink自定义轮询分区产生的问题
2022-08-04 05:27:00 【第一片心意】
1. 项目背景
1.1. 介绍
从4个分区的kafka特定主题中接受设备数据,然后根据主题的key进行轮询分区。第一次按顺序分配,已分配完的之后直接获取之前的分配结果即可,以保证key相同的数据可以进入下个算子的同一个分区。
1.2. 代码
1.2.1. 自定义分区
/**
* 自定义轮询分区器<br>
* 对于一个未被分区的key,对其分配下一个分区索引;对于一个已经被分区过的key,对其分配已经分配过的分区索引
*
* @author wzq
* @date 2019-12-04
**/
class CustomPartitioner extends Partitioner[String] {
private val map: mutable.Map[String, Int] = mutable.HashMap[String, Int]()
private var num: Int = 0
override def partition(key: String, numPartitions: Int): Int = {
if (map.contains(key)) {
map.getOrElse(key, 0)
} else {
val partitionIndex = num
map.put(key, num)
num += 1
if (num == numPartitions) {
num = 0
}
partitionIndex
}
}
}
1.2.2. 具体使用
.partitionCustom(new CustomPartitioner, _.f0))
解释:
读取kafka中的数据,将key和message组装为tuple2类型,这儿使用自定义分区,将key作为分区依据。
2. 问题
2.1. 介绍
kafka主题是4个分区,所以flink的dataSource也是4个并行度,source接下来的算子是10个并行度。刚开始运行时完全没有问题,同一个key永远都能进入分区下个算子的同一个并行度。但是有一天,上游(也就是kafka)的生产者更换了代码,但是并没有跟我们说他们修改了生产者默认的分区规则,导致在源头上同一个key会进入到kafka不同的分区。然后我这儿就悲剧了,flink任务一直在运行,从来没有重启过,但是上游更换了代码之后,同一个key被我的自定义分区分配到了不同的并行度,从而对业务实现产生了影响。
2.2. 原因分析
由于上游更换代码导致同一个key进入到kafka不同分区,因此flink的source的不同并行度也会接收到同一个key。然后每个并行度对同一个key第一次分配并行度索引的时间不同,因此从flink这儿来看,同一个key就会被分配到不同的并行度。也就是说,每个source并行度都会维护自己的并行度分配规则,对同一个key,在不同的source并行度中,获取到的并行度索引并不一样。
3. 解决方案
3.1. 上游解决
让kafka producer使用默认的分区机制,将同一个key永远发往同一个分区。
优点:
flink任务无需改动任何代码。
缺点;
需要上游开发人员配合。
3.2. keyBy
keyBy算子使用的是hash分区,对于同一个key字符串,取到的hash值永远一样,使用该分区方式,永远会将同一个可以分配到同一个并行度。
优点:
实现简单。
缺点:
会造成数据倾斜。
3.3. 数据先汇总再轮询分区
在使用自定义分区之前,使用map算子将所有数据收集起来,然后使用自定义分区。
优点:
没有数据倾斜。
缺点;
map算子只有一个并行度,可能会成为性能瓶颈。
3.4. 先hash再轮询分区
在使用自定义分区之前,先通过keyBy算子,将同一个Key发往固定的下游算子的某个并行度,然后在通过自定义分区进行轮询。
优点:
没有数据倾斜。
缺点:
数据处理链路拉长,可能会增加数据处理延迟。
边栏推荐
猜你喜欢
随机推荐
Swoole学习(一)
【问题解决】同一机器上Flask部署TensorRT报错记录
解决JDBC在web工程中无法获取配置文件
Linux环境下redis的下载、安装和启动(建议收藏)
Unity行为树AI分享
二月、三月校招面试复盘总结(二)
The string class introduction
Code Refactoring: For Unit Testing
关系型数据库-MySQL:二进制日志(binlog)
FLV格式详解
智能合约安全——delegatecall (1)
编程Go:学习目录
自动化运维工具Ansible(5)流程控制
关系型数据库-MySQL:多实例配置
关于事件捕获和事件冒泡的顺序,以及如何处理事件冒泡带来的影响
对象存储-分布式文件系统-MinIO-2:服务端部署
个人练习三剑客基础之模仿CSDN首页
webrtc中的任务队列TaskQueue
再识关联容器
MySql data recovery method personal summary