当前位置:网站首页>完美解决keyby造成的数据倾斜导致吞吐量降低的问题

完美解决keyby造成的数据倾斜导致吞吐量降低的问题

2022-08-04 05:27:00 第一片心意

1. 问题现象

        最近在做一个类似页面pv的累加统计,根据页面id维度来统计一段时间内收到了数据。

        下面模拟的是处理数据的原始程序。

2. 原始处理

2.1.模拟kafka源

import org.apache.flink.streaming.api.functions.source.SourceFunction
import scala.util.Random

/**
 * 每1毫秒发出一个二元组,第一个元素为随机获取到numbers中的一个数字,第二个元素为1,表示统计字段,本示例只进行累加,模拟接收到的数据的条数
 *
 * @author ziqiang.wang
 * @date 2022-01-02 17:33
 * */
class Tuple2Source extends SourceFunction[(String, Int)] {

  private val random: Random = new Random()
  private var flag: Boolean = true;
  /**
   * 数字数组,数据分布不均匀
   */
  private val numbers: Array[Int] = Array[Int](1, 1, 1, 1, 1, 2, 2, 3, 4, 5)

  override def run(ctx: SourceFunction.SourceContext[(String, Int)]): Unit = {
    while (flag) {
      val index: Int = random.nextInt(10)
      ctx.collect((numbers(index).toString), 1)
      Thread.sleep(1)
    }
  }

  override def cancel(): Unit = {
    flag = false
  }
}

        原始需求中,接收到的是kafka数据,为字符串类型,然后通过map转换,提取出页面的id,然后转化为二元组。上面自定义的source就是模拟的原始数据转化为二元组之后的数据。注意里面的numbers,数字1出现了5次,数字2出现了2次,剩下的数字出现了1次,以此来模拟数据分布不均匀的生产数据。

2.2. 对数据进行窗口累加

object NoMiniBatchDemo extends Serializable {

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(10)
    val source: DataStream[(String, Int)] = env.addSource(new Tuple2Source).name("tuple2")
    source
      .keyBy(_._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
      .reduce((x, y) => (x._1, x._2 + y._2)).name("windowAgg")
      .print("结果数据:")
    env.execute("NoMiniBatch")
  }

}

        具体计算时,根据二元组第一个字段进行keyby,然后reduce,对第二个元素进行累加即可。但是由于页面很少,而且每个页面的数据量分布很不均匀,所以造成了很大的数据倾斜。

        从上图可以看出,运行一段时间之后发现,数据倾斜非常严重,而且由于key数量比较少,又很多并行度一直是收不到数据的。在这种情况下,即使是增加算子的并行度,也是无法解决根本问题的。

        网上有种方案是,在key后面添加一个固定的随机数,亦或是对key取hash,然后和key一起拼接起来作为key,虽然能在一定程度上改善数据倾斜的问题,但还是无法彻底决绝数据倾斜造成的低吞吐量问题。

 3. 两阶段聚合解决方案

        上图是flink观望中对于flink sql的一种优化,优化方案为:Local-Global Aggregation,意思就是,现在本地进行一次聚合,然后将数据发送到下游的Agg聚合算子,然后再进行全局聚合。这种方案类似于MapReduce中的combiner,先在上游对数据进行一次聚合,以减少发送到下游的数据量,从而使降低下游的数据处理量,以增加总体的吞吐量。

3.1. 本地聚合

/**
 * 不断聚合接收到的数据,当算子子任务处理的数据量或者是时间达到要求,则输出所有累计结果,以减少发往下游的数据量
 *
 * @author ziqiang.wang
 * @date 2022-01-02 17:57
 * */
class LocalAggProcess extends ProcessFunction[(String, Int), (String, Int)] with CheckpointedFunction {

  // 状态,保存中间结果middleResult表的所有元素
  var listState: ListState[(String, Int)] = _
  // 当前并行度已处理的数据量
  var count: Long = 0
  // 处理数据量达到该阈值,输出所有中间结果
  val outThreshold: Long = 100
  // 上次输出的时间
  var lastOutTimeMillis: Long = System.currentTimeMillis()
  // 当前时间
  var currentTimeMillis: Long = System.currentTimeMillis()
  // 输出时间间隔
  val outDuration: Long = 1000
  // 保存中间聚合结果
  var middleResult: mutable.Map[String, Int] = mutable.Map[String, Int]()

  override def open(parameters: Configuration): Unit = {
    // 每秒更新一次当前时间,而不是每处理一条数据就更新一次当前时间,以减少获取系统时间造成的资源消耗
    new Timer("更新当前时间", true).scheduleAtFixedRate(new TimerTask {
      override def run(): Unit = currentTimeMillis = System.currentTimeMillis()
    }, 1000, 1000)
  }


  override def processElement(value: (String, Int), ctx: ProcessFunction[(String, Int), (String, Int)]#Context, out: Collector[(String, Int)]): Unit = {
    if (middleResult.contains(value._1)) {
      middleResult.put(value._1, middleResult(value._1) + value._2)
    } else {
      middleResult.put(value._1, value._2)
    }
    count += 1
    // 如果该并行度处理数据量达到输出阈值,或者是当前时间达到输出时间间隔,则输出所有中间结果
    if (count % outThreshold == 0 || currentTimeMillis - lastOutTimeMillis >= outDuration) {
      middleResult.foreach(entry => out.collect((entry._1, entry._2)))
      lastOutTimeMillis = currentTimeMillis
      middleResult.clear()
    }
  }

  /**
   * 做checkpoint快照时执行,将中间结果保存到listState状态中
   */
  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    listState.clear()
    middleResult.foreach(entry => listState.add(entry._1, entry._2))
  }

  /**
   * 初始化和从checkpoint恢复时调用
   */
  override def initializeState(context: FunctionInitializationContext): Unit = {
    val descriptor: ListStateDescriptor[(String, Int)] = new ListStateDescriptor[(String, Int)]("中间结果", TypeInformation.of(new TypeHint[(String, Int)] {}))
    listState = context.getOperatorStateStore.getListState(descriptor)
    if (context.isRestored) {
      middleResult.clear()
      listState.get().forEach(entry => middleResult.put(entry._1, entry._2))
    }
  }
}

3.2. 两阶段聚合实现

object LocalAggDemo extends Serializable {

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(10)
    val source: DataStream[(String, Int)] = env.addSource(new Tuple2Source).name("tuple2")
    source
      .process(new LocalAggProcess).name("miniBatch")
      .keyBy(_._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
      .reduce((x, y) => (x._1, x._2 + y._2)).name("windowAgg")
      .print("结果数据:")
    env.execute("NoMiniBatch")
  }

}

        从上图可以看出,resuce算子接收到的数据明显比不使用两阶段聚合时少的多。在本地聚合算自重,我设置的是满足100条或5秒中任何一个条件时输出数据,在生产环境中,可以根据实际接收到的数据量来对这两个条件进行调整。 

原网站

版权声明
本文为[第一片心意]所创,转载请带上原文链接,感谢
https://blog.csdn.net/u012443641/article/details/122279538