当前位置:网站首页>基于Flink CDC打通数据实时入湖
基于Flink CDC打通数据实时入湖
2022-07-29 14:14:00 【InfoQ】
Flink CDC介绍

{
"before": { --更新之前的数据
"id": 001,
"name": "tom"
},
"after": { --更新之后的数据
"id": 001,
"name": "jerry"
},
"source": {...},
"op": "u",
"ts_ms": 1589362330904,
"transaction": null
}
- 业务解耦:无需入侵业务,和业务完全解耦,也就是业务端无感知数据同步的存在。
- 性能消耗:业务数据库性能消耗小,数据同步延迟低。
- 同步易用:使用SQL方式执行CDC同步任务,极大的降低使用维护门槛。
- 数据完整:完整的数据库变更记录,不会丢失任何记录,Flink 自身支持 Exactly Once。
Apache Iceberg介绍

- ACID:不会读到不完整的commit数据,基于乐观锁实现,支持并发commit,支持Row-level delete,支持upsert操作。
- 增量快照:Commit后的数据即可见,在Flink实时入湖场景下,数据可见根据checkpoint的时间间隔来确定的,增量形式也可回溯历史快照。
- 开放的表格式:对于一个真正的开放表格式,支持多种数据存储格式,如:parquet、orc、avro等,支持多种计算引擎,如:Spark、Flink、Hive、Trino/Presto。
- 流批接口支持:支持流式写入、批量写入,支持流式读取、批量读取。下文的测试中,主要测试了流式写入和批量读取的功能。
Flink CDC打通数据实时导入Iceberg实践
数据入湖环境准备
id STRING,
name STRING
) WITH (
'connector' = 'kafka',
'topic' = 'topic_name',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-json'
);
CREATE TABLE iceberg_catalog.default.IcebergTable ( id STRING, name STRING );SET table.dynamic-table-options.enabled=true;
INSERT INTO IcebergTable /*+OPTIONS('equality-field-columns'='id')*/ SELECT * FROM KafkaTable;SET execution.type=batch;
SELECT COUNT(*) FROM IcebergTable;SET execution.type=streaming;
SELECT COUNT(*) FROM IcebergTable;数据入湖速度测试
TaskManager 内存4G,slot:1
Checkpoint 1分钟
测试数据列数 10列
测试数据行数 1000万
iceberg存储格式 parquetINSERT INTO IcebergTable SELECT * FROM KafkaTable;
--并行度1 12.2万/秒
--并行度2 19.6万/秒
--并行度4 28.3万/秒INSERT INTO IcebergTable /*+OPTIONS('equality-field-columns'='id')*/ SELECT * FROM KafkaTable;
--导入的数据 只有数据插入 只有数据更新
--并行度1 3.2万/秒 2.9万/秒
--并行度2 4.9万/秒 4.2万/秒
--并行度4 6.1万/秒 5.1万/秒- append方式导入速度远大于upsert导入数据速度。在使用的时候,如没有更新数据的场景时,则不需要upsert方式导入数据。
- 导入速度随着并行度的增加而增加。
- upsert方式数据的插入和更新速度相差不大,主要得益于MOR原因。
数据入湖任务运维
Table table = ...
Actions.forTable(table)
.rewriteDataFiles()
.targetSizeInBytes(100 * 1024 * 1024) // 100 MB
.execute();Table table = ...
Actions.forTable(table)
.expireSnapshots()
.expireOlderThan(System.currentTimeMillis())
.retainLast(5)
.execute();Table table = ...
Actions.forTable(table)
.removeOrphanFiles()
.execute();
数据入湖问题讨论
未来规划
整合Iceberg到实时计算平台

准实时数仓探索

边栏推荐
- 上线前配置
- Programmers are a group with a high incidence of occupational diseases. Don’t be naive to think that it’s just as simple as being bald.
- 【论文阅读】异常检测的视频通过Self-Supervised和多任务学习
- 如何返回一个数字的所有质因数?
- 使用云服务器从0开始搭建云端Jupyter Lab|Notebook
- hyperbench:plugin.Open(“./fabric“): plugin was built with a different version of package golang.
- 蚂蚁三面滑铁卢!遭分布式截胡,靠这些笔记潜修30天,挺进京东
- 嵌入式开发经验分享,把学习当作一种兴趣
- 力扣 206.反转链表--递归解决
- 有关包装类的一道经典面试题
猜你喜欢

Google Cloud X Kyligence|如何从业务视角管理数据湖?

【JS面试题】面试官问我:遍历一个数组用 for 和 forEach 哪个更快?

AQS源码阅读与强软弱虚4种引用以及ThreadLocal原理与源码

测试时间的评估:开发时间的1/3~1/2

Alibaba CTO Cheng Li: open source is the source of basic software!

力扣 206.反转链表--递归解决

打卡广汽本田喜悦安全驾驶中心,体验最刁钻的场地训练

【论文阅读】异常检测的视频通过Self-Supervised和多任务学习

【C语言】AI三子棋的成长之路

【Postman】下载与安装(新手图文教程)
随机推荐
力扣之顺序表
StarRocks 2.3 新版本特性介绍
C51 存储类型与存储模式
基于降噪自编码器与改进卷积神经网络的采煤机健康状态识别
深开鸿:万物智联的大江上,升起一轮开源鸿蒙月
MySQL 是如何实现 ACID 的?
1124. 骑马修栅栏
已解决SyntaxError: invalid character in identifier
验证二叉树的前序序列化[抽象前序遍历]
FPGA刷题——跨时钟域传输(FIFO+打拍+握手)
关于内部类
有关包装类的一道经典面试题
面试官:生产环境中 CPU 利用率飙高怎么办?
带你搞懂 Redis 中的两个策略
Shared memory - shmget filling holes
共享内存 - shmget填坑记
国内helm快速安装和添加常用charts仓库
xss内容总结
TCP流量控制和拥塞控制
C#线程操作UI控件