当前位置:网站首页>基于Flink CDC打通数据实时入湖
基于Flink CDC打通数据实时入湖
2022-07-29 14:14:00 【InfoQ】
Flink CDC介绍

{
"before": { --更新之前的数据
"id": 001,
"name": "tom"
},
"after": { --更新之后的数据
"id": 001,
"name": "jerry"
},
"source": {...},
"op": "u",
"ts_ms": 1589362330904,
"transaction": null
}

- 业务解耦:无需入侵业务,和业务完全解耦,也就是业务端无感知数据同步的存在。
- 性能消耗:业务数据库性能消耗小,数据同步延迟低。
- 同步易用:使用SQL方式执行CDC同步任务,极大的降低使用维护门槛。
- 数据完整:完整的数据库变更记录,不会丢失任何记录,Flink 自身支持 Exactly Once。
Apache Iceberg介绍

- ACID:不会读到不完整的commit数据,基于乐观锁实现,支持并发commit,支持Row-level delete,支持upsert操作。
- 增量快照:Commit后的数据即可见,在Flink实时入湖场景下,数据可见根据checkpoint的时间间隔来确定的,增量形式也可回溯历史快照。
- 开放的表格式:对于一个真正的开放表格式,支持多种数据存储格式,如:parquet、orc、avro等,支持多种计算引擎,如:Spark、Flink、Hive、Trino/Presto。
- 流批接口支持:支持流式写入、批量写入,支持流式读取、批量读取。下文的测试中,主要测试了流式写入和批量读取的功能。
Flink CDC打通数据实时导入Iceberg实践
数据入湖环境准备
id STRING,
name STRING
) WITH (
'connector' = 'kafka',
'topic' = 'topic_name',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-json'
);
CREATE TABLE iceberg_catalog.default.IcebergTable ( id STRING, name STRING );
SET table.dynamic-table-options.enabled=true;
INSERT INTO IcebergTable /*+OPTIONS('equality-field-columns'='id')*/ SELECT * FROM KafkaTable;
SET execution.type=batch;
SELECT COUNT(*) FROM IcebergTable;
SET execution.type=streaming;
SELECT COUNT(*) FROM IcebergTable;
数据入湖速度测试
TaskManager 内存4G,slot:1
Checkpoint 1分钟
测试数据列数 10列
测试数据行数 1000万
iceberg存储格式 parquet
INSERT INTO IcebergTable SELECT * FROM KafkaTable;
--并行度1 12.2万/秒
--并行度2 19.6万/秒
--并行度4 28.3万/秒
INSERT INTO IcebergTable /*+OPTIONS('equality-field-columns'='id')*/ SELECT * FROM KafkaTable;
--导入的数据 只有数据插入 只有数据更新
--并行度1 3.2万/秒 2.9万/秒
--并行度2 4.9万/秒 4.2万/秒
--并行度4 6.1万/秒 5.1万/秒
- append方式导入速度远大于upsert导入数据速度。在使用的时候,如没有更新数据的场景时,则不需要upsert方式导入数据。
- 导入速度随着并行度的增加而增加。
- upsert方式数据的插入和更新速度相差不大,主要得益于MOR原因。
数据入湖任务运维
Table table = ...
Actions.forTable(table)
.rewriteDataFiles()
.targetSizeInBytes(100 * 1024 * 1024) // 100 MB
.execute();
Table table = ...
Actions.forTable(table)
.expireSnapshots()
.expireOlderThan(System.currentTimeMillis())
.retainLast(5)
.execute();
Table table = ...
Actions.forTable(table)
.removeOrphanFiles()
.execute();

数据入湖问题讨论
未来规划
整合Iceberg到实时计算平台

准实时数仓探索

边栏推荐
- 潘多拉 IOT 开发板学习(RT-Thread)—— 实验19 MQTT 协议通信实验(学习笔记)
- 日志打印不规范,被CTO骂了一顿~
- C#线程操作UI控件
- Topic 1125: - delegate * C language training
- 如何编辑CAD图库里的图纸
- Alibaba CTO Cheng Li: open source is the source of basic software!
- 验证二叉树的前序序列化[抽象前序遍历]
- mysql datetime格式化日期(mysql start with)
- 打卡广汽本田喜悦安全驾驶中心,体验最刁钻的场地训练
- 如何使用SparkSQL做一些简单的数据分析和可视化展示?
猜你喜欢
2022开放原子全球开源峰会数据库分论坛圆满召开
C#线程操作UI控件
Understand the yolov7 network structure
威纶通触摸屏制作自定义欢迎界面的几种方法介绍
进程间通信 --- system V三种通信方式(图文案例讲解)
部门例会上做测试分享,不知道分享什么内容?
【堡垒机小知识】硬件堡垒机是什么意思?其与云堡垒机有什么区别?
企业需要知道的5个 IAM 最佳实践
How to merge the code when there is a code conflict in the collaborative development of multiple people?
web会话管理与xss攻击
随机推荐
The core principles of electronic games
1191. 家谱树
plsql连接oracle使用完毕之后关闭问题
iMedicalLIS监听程序(1)
基于变胞机构的移动机器人构型设计研究综述
480-82(59、151)
Understand the yolov7 network structure
你真的会用Console.log吗?
教育部等五部门联合推荐优质课外资源,腾讯产品青少年模式首发
部门例会上做测试分享,不知道分享什么内容?
The key to cracking AI full-process development problems
有关包装类的一道经典面试题
如何使用SparkSQL做一些简单的数据分析和可视化展示?
479-82(54、11)
性能优化竟白屏,难道真是我的锅?
2022年了!还在用定时器实现动画?赶紧试试requestAnimationFrame吧!
【FreeSwitch开发实践】自定义模块创建与使用
唯物辩证法-矛盾论(普遍性+特殊性+斗争性+同一性)
交叉编译工具链的安装和配置过程
已解决SyntaxError: invalid character in identifier