当前位置:网站首页>Flink优化及相关
Flink优化及相关
2022-08-02 21:03:00 【boyzwz】
一、Flink的优化
1、MiniBatch 聚合
默认情况下,无界聚合算子是逐条处理输入的记录,即:(1)从状态中读取累加器,(2)累加/撤回记录至累加器,(3)将累加器写回状态,(4)下一条记录将再次从(1)开始处理。
MiniBatch 聚合的核心思想是将一组输入的数据缓存在聚合算子内部的缓冲区中。当输入的数据被触发处理时,每个 key 只需一个操作即可访问状态。这样可以大大减少状态开销并获得更好的吞吐量。但是,这可能会增加一些延迟,因为它会缓冲一些记录而不是立即处理它们。这是吞吐量和延迟之间的权衡。
下图说明了 mini-batch 聚合如何减少状态操作。
开启MiniBatch 聚合(命令行执行):
-- sql中开启
-- 开启
set table.exec.mini-batch.enabled=true;
-- 最大缓存时间
set table.exec.mini-batch.allow-latency='5 s';
-- 批次大小
set table.exec.mini-batch.size=1000;
2、 Local-Global 聚合
Local-Global 聚合是为解决数据倾斜问题提出的,通过将一组聚合分为两个阶段,首先在上游进行本地聚合,然后在下游进行全局聚合,类似于 MapReduce 中的 Combine + Reduce 模式。即数据在map端进行预聚合,减少shuffle阶段数据传输量,这可以大大减少网络 shuffle 和状态访问的成本。
开启 Local-Global 聚合(依赖于启用了 mini-batch):
-- 开启预聚合需要先开启MiniBatch
set table.exec.mini-batch.enabled=true;
-- 最大缓存时间
set table.exec.mini-batch.allow-latency='5 s';
-- 批次大小
set table.exec.mini-batch.size=1000;
-- 开启预聚合
set table.optimizer.agg-phase-strategy=TWO_PHASE;
二、Flink的反压
Flink中上游的任务生产数据的速度比下游任务消费数据的速度大,Flink就会发生反压,反压会从下游向上游传播,直到sourcetask会降低拉取数据速度,避免flink任务执行报错 。
Flink中下游任务若接收到的数据过多,来不及处理,则会先将数据放在内存中,当内存不够时,会告诉上游,上游会降低发送数据的速度,并将数据缓存起来,若上游内存不足时会继续向上游传递,直至source task,降低读取数据的速度
1、Flink内部反压
增加flink任务的并行度(即增加资源,加快数据处理)
开启MiniBatch和预聚合(会增加延迟)
2、将数据写入外部系统
Flink处理完数据向外部系统输出数据的速度大于外部系统写入的速度,可增加并行度,加快写入速度
Flink写入hbase可以开启异步IO,lookup.async=true,只支持hbase2.2以上版本
-- 在创建flink sink表时添加设置
-- 每批次最大值,会增加延迟
'sink.buffer-flush.max-rows'='1000'
--提高写入数据并行度,增加成本
'sink.parallelism' ='3'
三、Flink Join
1、Regular Joins
两个动态表关联,会将两个表的数据一直保存在状态中,随着数据的读取,状态会越来越大;
类似于批处理中两表关联
-- eg
SELECT * FROM Orders
INNER/LEFT/FULL JOIN Product
ON Orders.productId = Product.id
2、Interval Joins
关联一段时间内的数据,只需在状态中保存该时间段内的数据
-- eg
SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time
-- s.ship_time - INTERVAL '4' HOUR <= o.order_time <= s.ship_time 才会关联
3、Temporal Joins
流表关联版本表(拉链表),不需要将两个表的数据一直保存在状态
-- Temporal Joins
SELECT
order_id,
price,
orders.currency,
conversion_rate,
order_time
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;
4、流表关联维表
常规Join:
从数据库中读取维表数据保存在状态中与流表关联,若在数据库中维表的数据发生改变,在flink中无法捕获更新,因为flink读取数据库表为有界流,读取完数据任务就结束了,所以只能关联到任务刚启动时读取的数据
Lookup Join:
Lookup Join关联方式,流表需要有一个时间字段
Lookup Join是在流表中每来一条数据时,再通过关联字段到维表底层数据库中查询数据进行关联,不用在状态中保存数据库数据,但会增加延迟
优化,可以在flink中设置缓存数据,每次在数据库查找以后,将数据加载到缓存中,可以在下次关联数据时,先在缓存中查找,缓存中没有再去数据库中查找
'lookup.cache.max-rows' = '100' ,-- 开启缓存,指定缓存数据量,可以提高关联性能
'lookup.cache.ttl' = '30s' -- 缓存过期时间,一般会按照维表更新频率设置
-- eg
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
边栏推荐
- golang 刷leetcode:统计打字方案数
- How to use windbg check c # a thread stack size?
- 典型相关分析CCA计算过程
- 包管理工具npm- node package management相关知识 、检查包更新、NPM包上传、更换镜像、npm ERR! registry error parsing json
- 解道9-编程技术6
- golang刷leetcode: 小于等于 K 的最长二进制子序列
- 如何成为一名正义黑客?你应该学习什么?
- ICLR 2022最佳论文:基于对比消歧的偏标签学习
- golang 刷leetcode:将字符串翻转到单调递增
- 解道6-编程技术3
猜你喜欢
ICLR 2022最佳论文:基于对比消歧的偏标签学习
Redis是如何轻松实现系统秒杀的?
[C题目]力扣138. 复制带随机指针的链表
VisualStudio 制作Dynamic Link Library动态链接库文件
Wiring diagrams of switches, motors, circuit breakers, thermocouples, and meters
汉源高科千兆4光4电工业级网管型智能环网冗余以太网交换机防浪涌防雷导轨式安装
主成分分析(PCA)
微软SQL服务器被黑客入侵以窃取代理服务的带宽
X 2 Earn必须依靠旁氏启动?GameFi的出路在哪?(下)
无线振弦采集仪远程修改参数的方式
随机推荐
数据库分析与优化
引用类型 ,值类型 ,小坑。
【流媒体】推流与拉流简介
汇编语言中b和bl关键字的区别
【干货】分库分表最佳实践
Redis是如何轻松实现系统秒杀的?
【C语言进阶】--指针典题剖析
js how to get the browser zoom ratio
SSM整合步骤(重点)
PLC working principle animation
WPF development through practical 】 【 automatic production management platform
回文自动机+CodeTON Round 2 C,D
人尽皆知的云原生,到底是大势所趋还是过度炒作?
【目标检测】YOLOv5:640与1280分辨率效果对比
并发与并行
56.【全局变量和局部变量专题】
ECCV 2022 | ByteTrack: 简单高效的数据关联方法
封装和包、访问修饰权限
解道9-编程技术6
Common tools and test methods for interface testing (Introduction)