当前位置:网站首页>Flink优化的方方面面
Flink优化的方方面面
2022-08-02 21:03:00 【大学生爱编程】
1. MinBatch聚合(微批处理:每隔一段时间更新状态)
flink默认每一条数据都会更新状态,此处我们缓存一批数据进行更新
优缺点:增加吞吐量但增加延迟,综合考虑
设置MinBatch:
-- sql中开启
-- 开启
set table.exec.mini-batch.enabled=true;
-- 最大缓存时间
set table.exec.mini-batch.allow-latency='5 s';
-- 批次大小
set table.exec.mini-batch.size=1000;
2. Local-Global 本地加全局聚合
开启预聚合需要先开启MiniBatch
-- 开启MinBatch
set table.exec.mini-batch.enabled=true;
-- 最大缓存时间
set table.exec.mini-batch.allow-latency='5 s';
-- 批次大小
set table.exec.mini-batch.size=1000;
-- 开启预聚合
set table.optimizer.agg-phase-strategy=TWO_PHASE;
2.1 datagen-blackhole示例
写入黑洞是用于高性能测试的,写入命令行会造成限制
上游产生数据太多,下游处理数据太少,造成反压,但是任务也不会失败会报警告
灰色运行状态:上游反压
-- source 表
CREATE TABLE words (
word STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10000', -- 每秒生成多少行数据
'fields.word.length' = '2' --字段长度限制
);
-- 黑洞
CREATE TABLE blackhole_table (
word STRING,
c BIGINT
)
WITH ('connector' = 'blackhole')
-- 执行查询
insert into blackhole_table
select word,count(1) as c from
words
group by word
2.2 图解
出现反压造成数据延迟的问题,处理不过来的数据保存在内存中,内存增加的时候会向上游反馈,上游数据放在自己的内存中,再向flink源数据进行反馈,直到sourcetask降低读取数据速率
2.3 datagen数据写入数据库调节反压示例
2.3.1 开启预聚合优化Flink内部反压(参数互相配合,调节测试)
上游生产数据的速度时50万每秒,下游消费数据的速度10万每秒 —反压
--source
CREATE TABLE words (
word STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '500000', -- 每秒生成多少行数据
'fields.word.length' = '2' --字段长度限制
);
-- 黑洞
CREATE TABLE blackhole_table (
word STRING,
c BIGINT
)
WITH ('connector' = 'blackhole');
--sql语句
insert into blackhole_table
select word,count(1) as c from
words
group by word;
开启minibatch和预聚合,预聚合之后上游发送到数据下游数据量会减少,flink内部没有发生反压了
set table.exec.mini-batch.enabled=true;
set table.exec.mini-batch.allow-latency='5 s';
set table.exec.mini-batch.size=1000;
set table.optimizer.agg-phase-strategy=TWO_PHASE;
2.3.2 写入MySQL出现外部反压(对写入方式进行调节)
将数据保存到mysql,写入数据的速度只能达到1600/s - 反压
数据源还是上面的datagen
--mysql sink
CREATE TABLE word_count (
word STRING,
c BIGINT,
PRIMARY KEY (word) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/bigdata',
'table-name' = 'word_count',
'username' = 'root',
'password' = '123456'
)
--sql语句
insert into word_count
select word,count(1) as c from
words
group by word
1.此时出现反压,我们采用批量写入等操作,增加批次大小(数据不是一条条写,会增加延迟),和提高并行度(资源申请耗时,增加成本),可以解决反压
2.删除表word_count 后添加sink表的设置:
--sink表
CREATE TABLE word_count (
word STRING,
c BIGINT,
PRIMARY KEY (word) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/bigdata',
'table-name' = 'word_count',
'username' = 'root',
'password' = '123456',
'sink.buffer-flush.max-rows'='1000' ,-- 每批次最大值,会增加延迟
'sink.parallelism' ='3' --提高写入数据并行度,增加成本
);
--sql语句
insert into word_count
select word,count(1) as c from
words
group by word;
2.3.3 数据写入hbase出现反压
-- source 表
CREATE TABLE words (
word STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '500000', -- 每秒生成的数据行数据
'fields.word.length' = '2' --字段长度限制
);
-- hbase sink
drop table hbase_word_count;
CREATE TABLE hbase_word_count (
word STRING,
info ROW<c BIGINT>,
PRIMARY KEY (word) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'word_count',
'zookeeper.quorum' = 'master:2181,node1:2181,node2:2181',
'sink.parallelism' = '3', -- 写入数据并行度
'sink.buffer-flush.max-rows' = '3000' -- 写入数据批次大小
);
--先再habse中创建表
create 'word_count','info'
--- 将数据写入hbase
insert into hbase_word_count
select word,ROW(c) as info from (
select word,count(1) as c from
words
group by word
) as a;
手动切分region,效果也一般看不出来,如果是分布式的效果预期会好一些
3. 反压总结
上游生产数据速度比下游消费数据速度大,flink会发生反压,反压会从下游向上游传播,直到sourcetask降低拉去数据速度,避免flink任务执行报错
3.1flink内部发生反压
增加flink任务并行度,就是加资源,申请资源时间长,成本提高
-- flink sql
SET 'parallelism.default' = '2';
会增加延迟:
set table.exec.mini-batch.enabled=true;
-- 最大缓存时间
set table.exec.mini-batch.allow-latency='5 s';
-- 批次大小
set table.exec.mini-batch.size=1000;
-- 开启预聚合
set table.optimizer.agg-phase-strategy=TWO_PHASE;
3.2 数据保存到外部发生反压
写入MySQL:
-- 每批次最大值,会增加延迟
'sink.buffer-flush.max-rows'='1000'
--提高写入数据并行度,增加成本
'sink.parallelism' ='3'
写入hbase:
-- 每批次最大值,会增加延迟
'sink.buffer-flush.max-rows'='1000'
--提高写入数据并行度,增加成本
'sink.parallelism' ='3'
异步IO(只支持hbase2.2以上版本):
flink查询hbase可以开启异步IO:
lookup.async=true
边栏推荐
猜你喜欢
随机推荐
golang刷letcode:公司命名
Li Mu hands-on deep learning V2-BERT pre-training and code implementation
Xcode13.1 run engineering error fatal error: 'IFlyMSC/IFly h' file not found
Flink Yarn Per Job - 启动AM
Ansible安装与配置
Which thread pool does Async use?
Electrical diagram of power supply system
快速学会ansible的安装
封装和包、访问修饰权限
《分布式微服务电商》专题(一)-项目简介
Swin Transformer 论文精读,并解析其模型结构
JMeter的基本使用
面试官居然问我:删库后,除了跑路还能干什么?
Use the TCP protocol, we won't lost package?
SSM整合步骤(重点)
HCIP--路由策略实验
[C题目]力扣138. 复制带随机指针的链表
golang刷letcode:公平分发饼干
DataGrip 安装教程 详细版
PyRosetta 安装方法之Conda安装