当前位置:网站首页>flink自定义轮询分区产生的问题
flink自定义轮询分区产生的问题
2022-08-04 05:27:00 【第一片心意】
1. 项目背景
1.1. 介绍
从4个分区的kafka特定主题中接受设备数据,然后根据主题的key进行轮询分区。第一次按顺序分配,已分配完的之后直接获取之前的分配结果即可,以保证key相同的数据可以进入下个算子的同一个分区。
1.2. 代码
1.2.1. 自定义分区
/**
* 自定义轮询分区器<br>
* 对于一个未被分区的key,对其分配下一个分区索引;对于一个已经被分区过的key,对其分配已经分配过的分区索引
*
* @author wzq
* @date 2019-12-04
**/
class CustomPartitioner extends Partitioner[String] {
private val map: mutable.Map[String, Int] = mutable.HashMap[String, Int]()
private var num: Int = 0
override def partition(key: String, numPartitions: Int): Int = {
if (map.contains(key)) {
map.getOrElse(key, 0)
} else {
val partitionIndex = num
map.put(key, num)
num += 1
if (num == numPartitions) {
num = 0
}
partitionIndex
}
}
}1.2.2. 具体使用
.partitionCustom(new CustomPartitioner, _.f0))解释:
读取kafka中的数据,将key和message组装为tuple2类型,这儿使用自定义分区,将key作为分区依据。
2. 问题
2.1. 介绍
kafka主题是4个分区,所以flink的dataSource也是4个并行度,source接下来的算子是10个并行度。刚开始运行时完全没有问题,同一个key永远都能进入分区下个算子的同一个并行度。但是有一天,上游(也就是kafka)的生产者更换了代码,但是并没有跟我们说他们修改了生产者默认的分区规则,导致在源头上同一个key会进入到kafka不同的分区。然后我这儿就悲剧了,flink任务一直在运行,从来没有重启过,但是上游更换了代码之后,同一个key被我的自定义分区分配到了不同的并行度,从而对业务实现产生了影响。
2.2. 原因分析
由于上游更换代码导致同一个key进入到kafka不同分区,因此flink的source的不同并行度也会接收到同一个key。然后每个并行度对同一个key第一次分配并行度索引的时间不同,因此从flink这儿来看,同一个key就会被分配到不同的并行度。也就是说,每个source并行度都会维护自己的并行度分配规则,对同一个key,在不同的source并行度中,获取到的并行度索引并不一样。
3. 解决方案
3.1. 上游解决
让kafka producer使用默认的分区机制,将同一个key永远发往同一个分区。
优点:
flink任务无需改动任何代码。
缺点;
需要上游开发人员配合。
3.2. keyBy
keyBy算子使用的是hash分区,对于同一个key字符串,取到的hash值永远一样,使用该分区方式,永远会将同一个可以分配到同一个并行度。
优点:
实现简单。
缺点:
会造成数据倾斜。
3.3. 数据先汇总再轮询分区
在使用自定义分区之前,使用map算子将所有数据收集起来,然后使用自定义分区。
优点:
没有数据倾斜。
缺点;
map算子只有一个并行度,可能会成为性能瓶颈。
3.4. 先hash再轮询分区
在使用自定义分区之前,先通过keyBy算子,将同一个Key发往固定的下游算子的某个并行度,然后在通过自定义分区进行轮询。
优点:
没有数据倾斜。
缺点:
数据处理链路拉长,可能会增加数据处理延迟。
边栏推荐
- Unity动画生成工具
- 关于let var 和const的区别以及使用
- ORACLE LINUX 6.5 安装重启后Kernel panic - not syncing : Fatal exception
- 8.30难题留坑:计数器问题和素数等差数列问题
- warning C4251: “std::vector&lt;_Ty&gt;”需要有 dll 接口由 class“Test”的客户端使用错误
- 即时通讯网 即时通讯音视频开发
- 自动化运维工具Ansible(6)Jinja2模板
- Canal mysql data synchronization
- webrtc中的任务队列TaskQueue
- 解决安装nbextensions后使用Jupyter Notebook时出现template_paths相关错误的问题
猜你喜欢
随机推荐
webrtc中的引用计框架
Vulnhub:Sar-1
登录页面js手写
智能合约安全——私有数据访问
关于事件捕获和事件冒泡的顺序,以及如何处理事件冒泡带来的影响
Shell(3)条件控制语句
关系型数据库-MySQL:多实例配置
想好了吗?
Unity DOTS学习教程汇总
箭头函数的使用
7.16 Day22---MYSQL (Dao mode encapsulates JDBC)
JS代码预编译
flink-sql所有表格式format
Oracle备份脚本
Set集合与Map集合
7.18 Day23 - the markup language
NFT市场可二开开源系统
关系型数据库-MySQL:二进制日志(binlog)
自己学习爬虫写的基础小函数
Programming hodgepodge (3)


![Deploy LVS-DR cluster [experimental]](/img/ad/84e05a6421d668b0b6ba6eeba0c730.jpg)






