当前位置:网站首页>完美解决keyby造成的数据倾斜导致吞吐量降低的问题
完美解决keyby造成的数据倾斜导致吞吐量降低的问题
2022-08-04 05:27:00 【第一片心意】
1. 问题现象
最近在做一个类似页面pv的累加统计,根据页面id维度来统计一段时间内收到了数据。
下面模拟的是处理数据的原始程序。
2. 原始处理
2.1.模拟kafka源
import org.apache.flink.streaming.api.functions.source.SourceFunction
import scala.util.Random
/**
* 每1毫秒发出一个二元组,第一个元素为随机获取到numbers中的一个数字,第二个元素为1,表示统计字段,本示例只进行累加,模拟接收到的数据的条数
*
* @author ziqiang.wang
* @date 2022-01-02 17:33
* */
class Tuple2Source extends SourceFunction[(String, Int)] {
private val random: Random = new Random()
private var flag: Boolean = true;
/**
* 数字数组,数据分布不均匀
*/
private val numbers: Array[Int] = Array[Int](1, 1, 1, 1, 1, 2, 2, 3, 4, 5)
override def run(ctx: SourceFunction.SourceContext[(String, Int)]): Unit = {
while (flag) {
val index: Int = random.nextInt(10)
ctx.collect((numbers(index).toString), 1)
Thread.sleep(1)
}
}
override def cancel(): Unit = {
flag = false
}
}原始需求中,接收到的是kafka数据,为字符串类型,然后通过map转换,提取出页面的id,然后转化为二元组。上面自定义的source就是模拟的原始数据转化为二元组之后的数据。注意里面的numbers,数字1出现了5次,数字2出现了2次,剩下的数字出现了1次,以此来模拟数据分布不均匀的生产数据。
2.2. 对数据进行窗口累加
object NoMiniBatchDemo extends Serializable {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(10)
val source: DataStream[(String, Int)] = env.addSource(new Tuple2Source).name("tuple2")
source
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce((x, y) => (x._1, x._2 + y._2)).name("windowAgg")
.print("结果数据:")
env.execute("NoMiniBatch")
}
}具体计算时,根据二元组第一个字段进行keyby,然后reduce,对第二个元素进行累加即可。但是由于页面很少,而且每个页面的数据量分布很不均匀,所以造成了很大的数据倾斜。

从上图可以看出,运行一段时间之后发现,数据倾斜非常严重,而且由于key数量比较少,又很多并行度一直是收不到数据的。在这种情况下,即使是增加算子的并行度,也是无法解决根本问题的。
网上有种方案是,在key后面添加一个固定的随机数,亦或是对key取hash,然后和key一起拼接起来作为key,虽然能在一定程度上改善数据倾斜的问题,但还是无法彻底决绝数据倾斜造成的低吞吐量问题。
3. 两阶段聚合解决方案

上图是flink观望中对于flink sql的一种优化,优化方案为:Local-Global Aggregation,意思就是,现在本地进行一次聚合,然后将数据发送到下游的Agg聚合算子,然后再进行全局聚合。这种方案类似于MapReduce中的combiner,先在上游对数据进行一次聚合,以减少发送到下游的数据量,从而使降低下游的数据处理量,以增加总体的吞吐量。
3.1. 本地聚合
/**
* 不断聚合接收到的数据,当算子子任务处理的数据量或者是时间达到要求,则输出所有累计结果,以减少发往下游的数据量
*
* @author ziqiang.wang
* @date 2022-01-02 17:57
* */
class LocalAggProcess extends ProcessFunction[(String, Int), (String, Int)] with CheckpointedFunction {
// 状态,保存中间结果middleResult表的所有元素
var listState: ListState[(String, Int)] = _
// 当前并行度已处理的数据量
var count: Long = 0
// 处理数据量达到该阈值,输出所有中间结果
val outThreshold: Long = 100
// 上次输出的时间
var lastOutTimeMillis: Long = System.currentTimeMillis()
// 当前时间
var currentTimeMillis: Long = System.currentTimeMillis()
// 输出时间间隔
val outDuration: Long = 1000
// 保存中间聚合结果
var middleResult: mutable.Map[String, Int] = mutable.Map[String, Int]()
override def open(parameters: Configuration): Unit = {
// 每秒更新一次当前时间,而不是每处理一条数据就更新一次当前时间,以减少获取系统时间造成的资源消耗
new Timer("更新当前时间", true).scheduleAtFixedRate(new TimerTask {
override def run(): Unit = currentTimeMillis = System.currentTimeMillis()
}, 1000, 1000)
}
override def processElement(value: (String, Int), ctx: ProcessFunction[(String, Int), (String, Int)]#Context, out: Collector[(String, Int)]): Unit = {
if (middleResult.contains(value._1)) {
middleResult.put(value._1, middleResult(value._1) + value._2)
} else {
middleResult.put(value._1, value._2)
}
count += 1
// 如果该并行度处理数据量达到输出阈值,或者是当前时间达到输出时间间隔,则输出所有中间结果
if (count % outThreshold == 0 || currentTimeMillis - lastOutTimeMillis >= outDuration) {
middleResult.foreach(entry => out.collect((entry._1, entry._2)))
lastOutTimeMillis = currentTimeMillis
middleResult.clear()
}
}
/**
* 做checkpoint快照时执行,将中间结果保存到listState状态中
*/
override def snapshotState(context: FunctionSnapshotContext): Unit = {
listState.clear()
middleResult.foreach(entry => listState.add(entry._1, entry._2))
}
/**
* 初始化和从checkpoint恢复时调用
*/
override def initializeState(context: FunctionInitializationContext): Unit = {
val descriptor: ListStateDescriptor[(String, Int)] = new ListStateDescriptor[(String, Int)]("中间结果", TypeInformation.of(new TypeHint[(String, Int)] {}))
listState = context.getOperatorStateStore.getListState(descriptor)
if (context.isRestored) {
middleResult.clear()
listState.get().forEach(entry => middleResult.put(entry._1, entry._2))
}
}
}3.2. 两阶段聚合实现
object LocalAggDemo extends Serializable {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(10)
val source: DataStream[(String, Int)] = env.addSource(new Tuple2Source).name("tuple2")
source
.process(new LocalAggProcess).name("miniBatch")
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce((x, y) => (x._1, x._2 + y._2)).name("windowAgg")
.print("结果数据:")
env.execute("NoMiniBatch")
}
}
从上图可以看出,resuce算子接收到的数据明显比不使用两阶段聚合时少的多。在本地聚合算自重,我设置的是满足100条或5秒中任何一个条件时输出数据,在生产环境中,可以根据实际接收到的数据量来对这两个条件进行调整。
边栏推荐
猜你喜欢
随机推荐
Cannot read properties of null (reading 'insertBefore')
Swoole学习(一)
MySQL log articles, binlog log of MySQL log, detailed explanation of binlog log
MySQL database (basic)
【论文阅读笔记】无监督行人重识别中的采样策略
LCP 17. Quick Calculation Robot
[NSSRound#1 Basic]
Swoole学习(二)
C language -- operator details
Sublime Text 3 2021.8.3 个人配置
将两个DataTable合并——DataTable.Merge 方法
程序、进程、线程、协程的概念及区别
跳转页面实时调用后台接口,更新页面数据
Unity自动生成阻挡Collider的GameObject工具
自动化运维工具Ansible(2)ad-hoc
对象存储-分布式文件系统-MinIO-1:概念
Oracle备份脚本
【问题解决】同一机器上Flask部署TensorRT报错记录
warning C4251: “std::vector<_Ty>”需要有 dll 接口由 class“Test”的客户端使用错误
实现登录密码混合动态因子,且动态因子隐式
![Embedded system driver primary [3] - _IO model in character device driver foundation](/img/c7/21fc0651964a6a435e8ec5743b7662.png)








