当前位置:网站首页>MySQL-FlinkCDC-Hudi实时入湖
MySQL-FlinkCDC-Hudi实时入湖
2022-08-02 06:47:00 【笑一笑0628】
服务器基础环境
Maven和JDK环境版本
Hadoop版本
Hadoop环境变量配置
export HADOOP_HOME=/opt/module/hadoop-3.1.3
export HADOOP_CALSSPATH=`$HADOOP_HOME/bin/hadoop classpath`
Hudi编译环境配置
Maven的setting.xml配置修改
<mirrors>
<mirror>
<id>alimaven</id>
<mirrorOf>central,!cloudera</mirrorOf>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</mirror>
</mirrors>
下载Hudi源码包
git clone https://github.com/apache/hudi.git
修改Hudi集成Flink和Hive编译依赖版本配置
packaging/hudi-flink-bundle/
pom.xml文件根据hive环境自行修改
编译Hudi指定Flink和Hadoop和Hive版本信息
可加 –e –X 参数查看编译ERROR异常和DEBUG信息
说明:默认scala2.11、默认不包含hive依赖
mvn clean install -DskipTests-Drat.skip=true -Dflink1.13 -Dscala-2.11 -Dhadoop.version=3.1.3 -Pflink-bundle-shade-hive3
Hudi编译结果说明
hudi-flink-bundle.jar
是flink用来写入和读取数据
hudi-mr-bundle.jar
是hive需要用来读hudi数据
Flink环境配置
版本说明:Flink1.13.6和scala2.11版本
Flink_HOME下的yaml配置
# state.backend: filesystem
state.backend: rocksdb
# 开启增量checkpoint
state.backend.incremental: true
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
state.checkpoints.dir: hdfs://nameservice/flink/flink-checkpoints
classloader.check-leaked-classloader: false
classloader.resolve-order: parent-first
FLINK_HOME的lib下添加依赖
flink-sql-connector-mysql-cdc-2.2.1.jar
flink-sql-connector-kafka_2.11-1.13.6.jar
--- Hadoop home lib下copy过来
hadoop-mapreduce-client-common-3.1.3.jar
hadoop-mapreduce-client-core-3.1.3.jar
hadoop-mapreduce-client-jobclient-3.1.3.jar
guava-27.0-jre.jar
--- hudi编译jar copy过来
hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar
启动Flink Yarn Session服务
export HADOOP_HOME='/opt/module/hadoop-3.1.3'
export HADOOP_CONF_DIR='/opt/module/hadoop-3.1.3/etc/hadoop'
export HADOOP_CONFIG_DIR='/opt/module/hadoop-3.1.3/etc/hadoop'
export HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CALSSPATH=`$HADOOP_HOME/bin/hadoop classpath`
bin/yarn-session.sh -s 2 -jm 2048 -tm 2048 -nm ys_hudi -d
各类UI查看
启动Flinksql Client
bin/sql-client.sh embedded -s yarn-session
FlinkCDC sink Hudi过程
MySQL测试建表语句
create table users_cdc(
id bigint auto_increment primary key,
name varchar(20) null,
birthday timestamp default CURRENT_TIMESTAMP not null,
ts timestamp default CURRENT_TIMESTAMP not null
);
FlinkCDC DDL语句
CREATE TABLE mysql_users (
id BIGINT PRIMARY KEY NOT ENFORCED ,
name STRING,
birthday TIMESTAMP(3),
ts TIMESTAMP(3)
) WITH (
'connector'= 'mysql-cdc',
'hostname'= 'bigdata',
'port'= '3306',
'username'= 'root',
'password'= 'root',
'server-time-zone'= 'Asia/Shanghai',
'debezium.snapshot.mode'='initial',
'database-name'= 'db1',
'table-name'= 'users_cdc'
);
查询mysql-cdc表
select * from mysql_users;
由于目前MySQL users_cdc表是空,所以flinksql 查询没有数据 只有表结构;
创建临时视图,增加分区列,方便同步hive分区表
create view mycdc_v AS SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') as `partition` FROM mysql_users;
设置checkpoint间隔时间,存储路径已在flink-conf配置设置全局路径
建议:测试环境 可设置秒级别(不能太小),生产环境可设置分钟级别
set execution.checkpointing.interval=30sec;
Flinksql 创建 cdc sink hudi文件,并自动同步hive分区表DDL 语句
CREATE TABLE mysqlcdc_sync_hive01(
id bigint ,
name string,
birthday TIMESTAMP(3),
ts TIMESTAMP(3),
`partition` VARCHAR(20),
primary key(id) not enforced --必须指定uuid 主键
)
PARTITIONED BY (`partition`)
with(
'connector'='hudi'
,'path'= 'hdfs://bigdata:8020/hudi/mysql_cdc_sync_hive_01'
, 'hoodie.datasource.write.recordkey.field'= 'id'-- 主键
, 'write.precombine.field'= 'ts'-- 自动precombine的字段
, 'write.tasks'= '1'
, 'compaction.tasks'= '1'
, 'write.rate.limit'= '2000'-- 限速
, 'table.type'= 'MERGE_ON_READ'-- 默认COPY_ON_WRITE,可选MERGE_ON_READ
, 'compaction.async.enabled'= 'true'-- 是否开启异步压缩
, 'compaction.trigger.strategy'= 'num_commits'-- 按次数压缩
, 'compaction.delta_commits'= '1'-- 默认为5
, 'changelog.enabled'= 'true'-- 开启changelog变更
, 'read.streaming.enabled'= 'true'-- 开启流读
, 'read.streaming.check-interval'= '3'-- 检查间隔,默认60s
, 'hive_sync.enable'= 'true'-- 开启自动同步hive
, 'hive_sync.mode'= 'hms'-- 自动同步hive模式,默认jdbc模式
, 'hive_sync.metastore.uris'= 'thrift://bigdata:9083'-- hive metastore地址
, 'hive_sync.jdbc_url'= 'jdbc:hive2://bigdata:10000'-- hiveServer地址
, 'hive_sync.table'= 'mysql_cdc_sync_hive_01'-- hive 新建表名
, 'hive_sync.db'= 'czs'-- hive 新建数据库名
, 'hive_sync.username'= 'root'-- HMS 用户名
, 'hive_sync.password'= 'root'-- HMS 密码
, 'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp类型
);
Flink sql mysql cdc数据写入hudi文件数据
insert into mysqlcdc_sync_hive01 select id,name,birthday,ts,`partition` from mycdc_v;
![\[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IVHFBpJa-1659193677348)(C:\Users\陈张圣\AppData\Roaming\Typora\typora-user-images\image-20220728095449769.png)\]](https://img-blog.csdnimg.cn/3949de15ec1c43768c0ee1e3b3f7a8d4.png)
说明:目前还没写入测试数据,hudi目录只生成一些状态标记文件,还未生成分区目录以及.log 和.parquet数据文件,具体含义可见hudi官方文档。
Mysql数据源写入测试数据
insert into users_cdc (name) values ('cdc01');
Flinksql 查询mysql cdc insert数据
set sql-client.execution.result-mode=tableau;
select * from mysql_users; -- 查询到一条insert数据
Flink web UI页面可以看到DAG 各个环节产生一条测试数据
Flinksql 查询 sink的hudi表数据
select * from mysqlcdc_sync_hive01; --已查询到一条insert数据
Hdfs上Hudi文件目录变化情况
Hive分区表和数据自动同步情况
查看自动创建hive表结构
show create table mysql_cdc_sync_hive_01_ro;
show create table mysql_cdc_sync_hive_01_rt;
查看自动生成的表分区信息
show partitions mysql_cdc_sync_hive_01_ro;
show partitions mysql_cdc_sync_hive_01_rt;
说明:已自动生产hudi MOR模式的ro、rt
ro表只能查parquet文件数据,input是HoodieParquetInputFormat
rt表parquet文件数据和log文件数据都可查,input是HoodieParquetRealtimeInputFormat
Hive访问Hudi数据
说明:需要引入hudi-hadoop-mr-bundle
引入方式如下:
- 引入到$HIVE_HOME/lib下
- 引入到$HIVE_HOME/auxlib自定义第三方依赖修改 hive-site.xml配置文件
- Hive shell命令行引入 Session级别有效
其中1和3配置完后需要重启hive-server服务
查询Hive 分区表数据
select * from mysql_cdc_sync_hive_01_ro; --已查询到mysq insert的一条数据
select * from mysql_cdc_sync_hive_01_rt; --已查询到mysq insert的一条数据
select name,ts from mysql_cdc_sync_hive_01_ro where `partition`='20220728'; --条件查询
select count(1) from mysql_cdc_sync_hive_01_ro; --Hive ro表count查询
select count(1) from mysql_cdc_sync_hive_01_ro; --Hive rt表count查询
hive count异常解决
引入hudi-hadoop-mr-bundle
依赖
hive> add jar hdfs://bigdata:8020/czs/hudi-hadoop-mr-bundle-xxx.jar;
hive> set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
Mysql数据源写入多条测试数据
insert into users_cdc (name) values ('cdc02');
insert into users_cdc (name) values ('cdc03');
insert into users_cdc (name) values ('cdc04');
insert into users_cdc (name) values ('cdc05');
insert into users_cdc (name) values ('cdc06');
Flink web UI DAG中数据链路情况
Hdfs上Hudi文件目录的变化
状态文件说明:
(1)requested:表示一个动作已被安排,但尚未启动
(2)inflight:表示当前正在执行操作
(3)completed:表示在时间线上完成了操作
Flink jobmanager log sync hive过程详细日志
搜索mysqlcdc_sync_hive01
即定义的hudi表名
Mysql数据源更新数据
update users_cdc set name = 'cdc05-bj'where id = 5;
Flinksql 查询cdc update数据 产生两条binlog数据
说明:flinksql 查询最终只有一条+I有效数据,且数据已更新
Flink web UI DAG接受到两条binlog数据,但最终compact和sink只有一条有效数据
MySQL数据源删除数据
delete from users_cdc where id = 3;
select * from users_cdc;
Flink Web UI job DAG中捕获一条新数据:
Hudi文件类型说明:
(1)commits: 表示将一批数据原子性写入表中
(2)cleans: 清除表中不在需要的旧版本文件的后台活动
(3)delta_commit:增量提交是指将一批数据原子性写入MergeOnRead类型的表中,其中部分或者所有数据可以写入增量日志中
(4)compaction: 协调hudi中差异数据结构的后台活动,例如:将更新从基于行的日志文件变成列格式。在内部,压缩的表现为时间轴上的特殊提交
(5)rollback:表示提交操作不成功且已经回滚,会删除在写入过程中产生的数据
Flink CK情况
设置的30s一次CK
边栏推荐
- Xilinx约束学习笔记—— 时序约束
- Resolving C# non-static field, method or property "islandnum.Program.getIslandCount(int[][], int, int)" requires an object reference
- 【云原生】如何快速部署Kubernetes
- Unity Shader学习(七)纹理图像的简单使用
- Analysis of GCC compiler technology
- 自然语言处理 文本预处理(下)(张量表示、文本数据分析、文本特征处理等)
- 结构体大小计算--结构体内存对齐
- request.getSession(), the story
- Project development specification
- 【ROS基础】rosbag 的使用方法
猜你喜欢
第06章 索引的数据结构【2.索引及调优篇】【MySQL高级】
_2_顺序表
MPLS的相关技术
论文《Deep Multifaceted Transformers for Multi-objective Ranking in Large-Scale E-commerce Recommender》
Clapper that can interact with the audience in real time
2022夏暑假每日一题(六)
How does abaqus quickly import the assembly of other cae files?
关于ue4.27像素流送打包后的本地服务器问题
jvm 二之 栈帧内部结构
About the local server problem after ue4.27 pixel streaming package
随机推荐
Servlet
雷达人体存在感应器方案,智能物联网感知技术,实时感应人体存在
sql 远程访问链接服务器
Facebook社媒营销的5大技巧,迅速提高独立站转化率!
Connection reset by peer problem analysis
【暑期每日一题】洛谷 P1192 台阶问题
C#重点问题之Struct和Class的异同
How does abaqus quickly import the assembly of other cae files?
解决C#非静态字段、方法或属性“islandnum.Program.getIslandCount(int[][], int, int)”要求对象引用
CAT1 4G+以太网开发板腾讯云手机微信小程序显示温度和下发控制
July 18-July 31, 2022 (Ue4 video tutorials and documentation, 20 hours. Total 1412 hours, 8588 hours left)
docker 安装mysql
数据库概论之MySQL表的增删改查2
Connection reset by peer 问题解析
交换--STP协议
【杂】pip换国内源教程及国内源地址
请教一下,Flink SQL ,JDBC sink 入 mysql 库,想要搞一个自增主键,要怎么写
【故障诊断分析】基于matlab FFT轴承故障诊断【含Matlab源码 2001期】
逆变器锁相原理及DSP实现
【npm install 报错问题合集】- npm ERR! code ENOTEMPTY npm ERR! syscall rmdir