当前位置:网站首页>MySQL-FlinkCDC-Hudi enters the lake in real time
MySQL-FlinkCDC-Hudi enters the lake in real time
2022-08-02 07:51:00 【smile 0628】
服务器基础环境
Maven和JDK环境版本
Hadoop版本
Hadoop环境变量配置
export HADOOP_HOME=/opt/module/hadoop-3.1.3
export HADOOP_CALSSPATH=`$HADOOP_HOME/bin/hadoop classpath`
Hudi编译环境配置
Maven的setting.xml配置修改
<mirrors>
<mirror>
<id>alimaven</id>
<mirrorOf>central,!cloudera</mirrorOf>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</mirror>
</mirrors>
下载Hudi源码包
git clone https://github.com/apache/hudi.git
修改Hudi集成Flink和Hive编译依赖版本配置
packaging/hudi-flink-bundle/
pom.xml文件根据hiveThe environment is modified by itself
编译Hudi指定Flink和Hadoop和Hive版本信息
可加 –e –X 参数查看编译ERROR异常和DEBUG信息
说明:默认scala2.11、默认不包含hive依赖
mvn clean install -DskipTests-Drat.skip=true -Dflink1.13 -Dscala-2.11 -Dhadoop.version=3.1.3 -Pflink-bundle-shade-hive3
Hudi编译结果说明
hudi-flink-bundle.jar
是flink用来写入和读取数据
hudi-mr-bundle.jar
是hive需要用来读hudi数据
Flink环境配置
版本说明:Flink1.13.6和scala2.11版本
Flink_HOME下的yaml配置
# state.backend: filesystem
state.backend: rocksdb
# 开启增量checkpoint
state.backend.incremental: true
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
state.checkpoints.dir: hdfs://nameservice/flink/flink-checkpoints
classloader.check-leaked-classloader: false
classloader.resolve-order: parent-first
FLINK_HOME的lib下添加依赖
flink-sql-connector-mysql-cdc-2.2.1.jar
flink-sql-connector-kafka_2.11-1.13.6.jar
--- Hadoop home lib下copy过来
hadoop-mapreduce-client-common-3.1.3.jar
hadoop-mapreduce-client-core-3.1.3.jar
hadoop-mapreduce-client-jobclient-3.1.3.jar
guava-27.0-jre.jar
--- hudi编译jar copy过来
hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar
启动Flink Yarn Session服务
export HADOOP_HOME='/opt/module/hadoop-3.1.3'
export HADOOP_CONF_DIR='/opt/module/hadoop-3.1.3/etc/hadoop'
export HADOOP_CONFIG_DIR='/opt/module/hadoop-3.1.3/etc/hadoop'
export HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CALSSPATH=`$HADOOP_HOME/bin/hadoop classpath`
bin/yarn-session.sh -s 2 -jm 2048 -tm 2048 -nm ys_hudi -d
各类UI查看
启动Flinksql Client
bin/sql-client.sh embedded -s yarn-session
FlinkCDC sink Hudi过程
MySQL测试建表语句
create table users_cdc(
id bigint auto_increment primary key,
name varchar(20) null,
birthday timestamp default CURRENT_TIMESTAMP not null,
ts timestamp default CURRENT_TIMESTAMP not null
);
FlinkCDC DDL语句
CREATE TABLE mysql_users (
id BIGINT PRIMARY KEY NOT ENFORCED ,
name STRING,
birthday TIMESTAMP(3),
ts TIMESTAMP(3)
) WITH (
'connector'= 'mysql-cdc',
'hostname'= 'bigdata',
'port'= '3306',
'username'= 'root',
'password'= 'root',
'server-time-zone'= 'Asia/Shanghai',
'debezium.snapshot.mode'='initial',
'database-name'= 'db1',
'table-name'= 'users_cdc'
);
查询mysql-cdc表
select * from mysql_users;
由于目前MySQL users_cdc表是空,所以flinksql 查询没有数据 只有表结构;
创建临时视图,增加分区列,方便同步hive分区表
create view mycdc_v AS SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') as `partition` FROM mysql_users;
设置checkpoint间隔时间,存储路径已在flink-conf配置设置全局路径
建议:测试环境 可设置秒级别(不能太小),生产环境可设置分钟级别
set execution.checkpointing.interval=30sec;
Flinksql 创建 cdc sink hudi文件,并自动同步hive分区表DDL 语句
CREATE TABLE mysqlcdc_sync_hive01(
id bigint ,
name string,
birthday TIMESTAMP(3),
ts TIMESTAMP(3),
`partition` VARCHAR(20),
primary key(id) not enforced --必须指定uuid 主键
)
PARTITIONED BY (`partition`)
with(
'connector'='hudi'
,'path'= 'hdfs://bigdata:8020/hudi/mysql_cdc_sync_hive_01'
, 'hoodie.datasource.write.recordkey.field'= 'id'-- 主键
, 'write.precombine.field'= 'ts'-- 自动precombine的字段
, 'write.tasks'= '1'
, 'compaction.tasks'= '1'
, 'write.rate.limit'= '2000'-- 限速
, 'table.type'= 'MERGE_ON_READ'-- 默认COPY_ON_WRITE,可选MERGE_ON_READ
, 'compaction.async.enabled'= 'true'-- 是否开启异步压缩
, 'compaction.trigger.strategy'= 'num_commits'-- 按次数压缩
, 'compaction.delta_commits'= '1'-- 默认为5
, 'changelog.enabled'= 'true'-- 开启changelog变更
, 'read.streaming.enabled'= 'true'-- 开启流读
, 'read.streaming.check-interval'= '3'-- 检查间隔,默认60s
, 'hive_sync.enable'= 'true'-- 开启自动同步hive
, 'hive_sync.mode'= 'hms'-- 自动同步hive模式,默认jdbc模式
, 'hive_sync.metastore.uris'= 'thrift://bigdata:9083'-- hive metastore地址
, 'hive_sync.jdbc_url'= 'jdbc:hive2://bigdata:10000'-- hiveServer地址
, 'hive_sync.table'= 'mysql_cdc_sync_hive_01'-- hive 新建表名
, 'hive_sync.db'= 'czs'-- hive 新建数据库名
, 'hive_sync.username'= 'root'-- HMS 用户名
, 'hive_sync.password'= 'root'-- HMS 密码
, 'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp类型
);
Flink sql mysql cdc数据写入hudi文件数据
insert into mysqlcdc_sync_hive01 select id,name,birthday,ts,`partition` from mycdc_v;
![\[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IVHFBpJa-1659193677348)(C:\Users\Chen Zhangsheng\AppData\Roaming\Typora\typora-user-images\image-20220728095449769.png)\]](https://img-blog.csdnimg.cn/3949de15ec1c43768c0ee1e3b3f7a8d4.png)
说明:目前还没写入测试数据,hudi目录只生成一些状态标记文件,还未生成分区目录以及.log 和.parquet数据文件,具体含义可见hudi官方文档.
Mysql数据源写入测试数据
insert into users_cdc (name) values ('cdc01');
Flinksql 查询mysql cdc insert数据
set sql-client.execution.result-mode=tableau;
select * from mysql_users; -- 查询到一条insert数据
Flink web UI页面可以看到DAG 各个环节产生一条测试数据
Flinksql 查询 sink的hudi表数据
select * from mysqlcdc_sync_hive01; --已查询到一条insert数据
Hdfs上Hudi文件目录变化情况
Hive分区表和数据自动同步情况
查看自动创建hive表结构
show create table mysql_cdc_sync_hive_01_ro;
show create table mysql_cdc_sync_hive_01_rt;
查看自动生成的表分区信息
show partitions mysql_cdc_sync_hive_01_ro;
show partitions mysql_cdc_sync_hive_01_rt;
说明:已自动生产hudi MOR模式的ro、rt
ro表只能查parquet文件数据,input是HoodieParquetInputFormat
rt表parquet文件数据和log文件数据都可查,input是HoodieParquetRealtimeInputFormat
Hive访问Hudi数据
说明:需要引入hudi-hadoop-mr-bundle
引入方式如下:
- 引入到$HIVE_HOME/lib下
- 引入到$HIVE_HOME/auxlibCustom third-party dependency modification hive-site.xml配置文件
- Hive shell命令行引入 Session级别有效
其中1和3配置完后需要重启hive-server服务
查询Hive 分区表数据
select * from mysql_cdc_sync_hive_01_ro; --已查询到mysq insert的一条数据
select * from mysql_cdc_sync_hive_01_rt; --已查询到mysq insert的一条数据
select name,ts from mysql_cdc_sync_hive_01_ro where `partition`='20220728'; --条件查询
select count(1) from mysql_cdc_sync_hive_01_ro; --Hive ro表count查询
select count(1) from mysql_cdc_sync_hive_01_ro; --Hive rt表count查询
hive count异常解决
引入hudi-hadoop-mr-bundle
依赖
hive> add jar hdfs://bigdata:8020/czs/hudi-hadoop-mr-bundle-xxx.jar;
hive> set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
Mysql数据源写入多条测试数据
insert into users_cdc (name) values ('cdc02');
insert into users_cdc (name) values ('cdc03');
insert into users_cdc (name) values ('cdc04');
insert into users_cdc (name) values ('cdc05');
insert into users_cdc (name) values ('cdc06');
Flink web UI DAG中数据链路情况
Hdfs上Hudifile directory changes
状态文件说明:
(1)requested:表示一个动作已被安排,但尚未启动
(2)inflight:表示当前正在执行操作
(3)completed:表示在时间线上完成了操作
Flink jobmanager log sync hive过程详细日志
搜索mysqlcdc_sync_hive01
即定义的hudi表名
Mysql数据源更新数据
update users_cdc set name = 'cdc05-bj'where id = 5;
Flinksql 查询cdc update数据 产生两条binlog数据
说明:flinksql 查询最终只有一条+I有效数据,且数据已更新
Flink web UI DAG接受到两条binlog数据,但最终compact和sink只有一条有效数据
MySQLThe data source deletes the data
delete from users_cdc where id = 3;
select * from users_cdc;
Flink Web UI job DAG中捕获一条新数据:
Hudi文件类型说明:
(1)commits: 表示将一批数据原子性写入表中
(2)cleans: 清除表中不在需要的旧版本文件的后台活动
(3)delta_commit:增量提交是指将一批数据原子性写入MergeOnRead类型的表中,其中部分或者所有数据可以写入增量日志中
(4)compaction: 协调hudi中差异数据结构的后台活动,例如:将更新从基于行的日志文件变成列格式.在内部,压缩的表现为时间轴上的特殊提交
(5)rollback:表示提交操作不成功且已经回滚,会删除在写入过程中产生的数据
Flink CK情况
设置的30s一次CK
边栏推荐
猜你喜欢
随机推荐
Facebook社媒营销的5大技巧,迅速提高独立站转化率!
责任链模式(Chain Of Responsibility)
【机器学习】实验1布置:基于决策树的英雄联盟游戏胜负预测
交换部分 VLAN
封装class类一次性解决全屏问题
修改apt-get源为国内镜像源
OC-NSDictionary
CSRF-跨站请求伪造-相关知识
【请教】SQL语句按列1去重来计算列2之和
张驰课堂:六西格玛测量系统的误差分析与判定
倍福使用AdsRemote组件实现和C#的ADS通讯
有关 sql中的 concat()函数问题,如何拼接
(2022牛客多校五)C-Bit Transmission(思维)
Go 实现分布式锁
第06章 索引的数据结构【2.索引及调优篇】【MySQL高级】
2022年数据泄露平均成本高达435万美元,创历史新高!
2022.07.31(LC_6132_使数组中所有元素都等于零)
spark架构
LeetCode 2360. 图中的最长环
_2_顺序表