当前位置:网站首页>Flink_CDC construction and simple use
Flink_CDC construction and simple use
2022-07-30 10:13:00 【m0_54850467】
Flink_CDC搭建及简单使用
1.CDC简介:
CDC (Change Data Capture) ,在广义的概念上,只要能捕获数据变更的技术,都可以称为 CDC .但通常我们说的CDC 技术主要面向数据库(包括常见的mysql,Oracle, MongoDB等)的变更,是一种用于捕获数据库中数据变更的技术.
目前市面上的CDC技术非常多,常见的主要包括Flink CDC,DataX,Canal,Sqoop,Kettle,Oracle Goldengate,Debezium等.DataX,Sqoop和kettle的CDC实现技术主要是基于查询的方式实现的,通过离线调度查询作业,实现批处理请求.这种作业方式无法保证数据的一致性,实时性也较差.Flink CDC,Canal,Debezium和Oracle Goldengate是基于日志的CDC技术.这种技术,利用流处理的方式,实时处理日志数据,保证了数据的一致性,为其他服务提供了实时数据.
2.Flink_CDC简介:
At present, the company mainly throughcanal监控mysql的binlog日志,Then send log data in real time tokafka中,通过flink程序,The log data real-time distributed to other services.这种方式,数据链路长,Poor real-time performance of,Operations is more complicated.
Flink_CDC技术的出现,To solve the traditional database real-time synchronization pain points.Flink_CDC通过伪装成msql的slave节点,实时读取masterAll nodes quantity and incremental data,It is able to capture all the data changes,捕获完整的变更记录,No like queryCDCIt launched a full table scan filter,Efficiently and without invasion of the code,Complete with business decoupling,Operations and simple.
3.Flink_CDC部署:
3.1 依赖版本
环境:Linux(Centos7)
Flink : 1.31.1
Flink_CDC: flink-sql-connector-mysql-cdc-2.1.0.jar
mysql版本:8.0.13
mysql驱动包:mysql-connector-java-8.0.13.jar
3.2环境搭建
3.2.1安装java环境(不再赘述);
3.2.2安装数据库(不在赘述);
3.2.3搭建Flink环境(单机模式);
1.获取flink版本.
cd /home
wget https://archive.apache.org/dist/flink/flink-1.9.0/flink-1.13.1-bin-scala_2.11.tgz
2.解压flink:
tar -zxvf flink-1.13.1-bin-scala_2.11.tgz
3.编辑flink配置文件,配置java环境
cd flink-1.13.1
vim conf/flink-conf.yaml
添加配置:env.java.home=/home/jdk/jdk1.8.0_291
4.上传flink_CDC驱动包和mysql驱动包:
cd flink-1.13.1/lib
上传:
flink-sql-connector-mysql-cdc-2.1.0.jar
mysql-connector-java-8.0.13.jar
5.启动flink集群:
/bin/start-cluster.sh
3.3创建mysql表:
CREATE TABLE `products` (
`id` int NOT NULL,
`name` varchar(45) DEFAULT NULL,
`description` varchar(45) DEFAULT NULL,
`weight` decimal(10,3) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
3.4启动flink-sql-client
This mainly throughflink的sqlThe client to test.
./flink-1.13.1/bin/sql-client.sh
3.5创建Flink_CDC虚拟表:
CREATE TABLE `products_cdc` (
id INT NOT NULL,
name varchar(32),
description varchar(45),
weight DECIMAL(10,3)
) WITH (
'scan.incremental.snapshot.enabled' = 'false',
'connector' = 'mysql-cdc',
'hostname' = '0.0.0.0',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'test_db',
'table-name' = 'products'
);
###如果未设置'scan.incremental.snapshot.enabled' = 'false',会报错:
Caused by: org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'
at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.validatePrimaryKeyIfEnableParallel(MySqlTableSourceFactory.java:186)
at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.createDynamicTableSource(MySqlTableSourceFactory.java:85)
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:134)
... 30 more
###报错原因:
MySQL CDCThe source table inFlink 1.13Version will be syntax check,在MySQL CDC DDL WITH参数中,未设置主键(Primary Key)信息.因为Flink 1.13版本,New support according to thePK分片,The function of multiple concurrent reads data.
###解决方案:
如果在Flink 1.13版本,You need more concurrent readMySQL数据,则在DDL中添加PK信息.
如果在Flink 1.13版本,You don't need more concurrent readMySQL数据,则在DDL中添加scan.incremental.snapshot.enabled 参数,And set the parameter value tofalse,无需设置PK信息.
3.6查询CDC表数据:
3.6.1 查看表数据
select * from products_cdc;
3.6.2 在数据库中新增一条数据:
insert into products(id,name,description,weight) values(5,‘gg’,‘haha’,60);
3.6.3观察products_cdc表数据变化;
到此,通过flink-sql-clientTo incremental gainmysqlFull amount change and incremental data of the sample is over.
先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在.深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小.自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前.因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担.添加下方名片,即可获取全套学习资料哦
边栏推荐
- 实战演练 | 在 MySQL 中计算每日平均日期或时间间隔
- C#中Config文件中,密码的 特殊符号的书写方法。
- 论文阅读:SegFormer: Simple and Efficient Design for Semantic Segmentation with Transformers
- Re21: Read the paper MSJudge Legal Judgment Prediction with Multi-Stage Case Representation Learning in the Real
- ESP32 入门篇(一)使用 VS Code 进行开发环境安装
- Practical Walkthrough | Calculate Daily Average Date or Time Interval in MySQL
- 柱状图 直方图 条形图 的区别
- Paper reading: SegFormer: Simple and Efficient Design for Semantic Segmentation with Transformers
- 【HMS core】【FAQ】HMS Toolkit典型问题合集1
- flowable工作流所有业务概念
猜你喜欢
Re16:读论文 ILDC for CJPE: Indian Legal Documents Corpus for Court Judgment Prediction and Explanation
GNOME 新功能:安全启动被禁用时警告用户
水电表预付费系统
PyQt5-绘制不同类型的直线
Quick Start Tutorial for flyway
柱状图 直方图 条形图 的区别
Re20:读论文 What About the Precedent: An Information-Theoretic Analysis of Common Law
Detailed explanation of JVM memory layout, class loading mechanism and garbage collection mechanism
4、yolov5-6.0 ERROR: AttributeError: ‘Upsample‘ object has no attribute ‘recompute_scale_factor‘ 解决方案
使用 Neuron 接入 Modbus TCP 及 Modbus RTU 协议设备
随机推荐
包、类及四大权限和static
Four ways the Metaverse is changing the way humans work
论文阅读:SegFormer: Simple and Efficient Design for Semantic Segmentation with Transformers
软考 系统架构设计师 简明教程 | 系统运行与软件维护
Version management of public Jar packages
Basic operations of sequence table in C language
105. 从前序与中序遍历序列构造二叉树(视频讲解!!)
【云原生】Kubernetes入门详细讲解
一个近乎完美的 Unity 全平台热更方案
The thread pool method opens the thread -- the difference between submit() and execute()
九九乘法表
最长公共序列、串问题总结
快解析结合友加畅捷通t1飞跃版
实战演练 | 在 MySQL 中计算每日平均日期或时间间隔
你真的懂Redis的5种基本数据结构吗?
(文字)无框按钮设置
ESP32 入门篇(一)使用 VS Code 进行开发环境安装
PyQt5-用像素点绘制正弦曲线
新一代开源免费的终端工具,太酷了
ospf2 two-point two-way republish (question 2)