当前位置:网站首页>Flink_CDC construction and simple use
Flink_CDC construction and simple use
2022-08-02 01:00:00 【m0_67392811】
Flink_CDC搭建及简单使用
1.CDC简介:
CDC (Change Data Capture) ,在广义的概念上,只要能捕获数据变更的技术,都可以称为 CDC .但通常我们说的CDC 技术主要面向数据库(包括常见的mysql,Oracle, MongoDB等)的变更,是一种用于捕获数据库中数据变更的技术.
目前市面上的CDC技术非常多,常见的主要包括Flink CDC,DataX,Canal,Sqoop,Kettle,Oracle Goldengate,Debezium等.DataX,Sqoop和kettle的CDC实现技术主要是基于查询的方式实现的,通过离线调度查询作业,实现批处理请求.这种作业方式无法保证数据的一致性,实时性也较差.Flink CDC,Canal,Debezium和Oracle Goldengate是基于日志的CDC技术.这种技术,利用流处理的方式,实时处理日志数据,保证了数据的一致性,为其他服务提供了实时数据.
2.Flink_CDC简介:
目前公司主要是通过canal监控mysql的binlog日志,然后将日志数据实时发送到kafka中,通过flink程序,将日志数据实时下发到其他服务中.这种方式,数据链路长,实时性效果较差,运维也比较复杂.
Flink_CDC技术的出现,解决了传统数据库实时同步的痛点.Flink_CDC通过伪装成msql的slave节点,实时读取master节点全量和增量数据,它能够捕获所有数据的变化,捕获完整的变更记录,无需像查询CDC那样发起全表的扫描过滤,高效且无需入侵代码,完全与业务解耦,运维及其简单.
3.Flink_CDC部署:
3.1 依赖版本
环境:Linux(Centos7)
Flink : 1.31.1
Flink_CDC: flink-sql-connector-mysql-cdc-2.1.0.jar
mysql版本:8.0.13
mysql驱动包:mysql-connector-java-8.0.13.jar
3.2环境搭建
3.2.1安装java环境(不再赘述);
3.2.2安装数据库(不在赘述);
3.2.3搭建Flink环境(单机模式);
1.获取flink版本.
cd /home
wget https://archive.apache.org/dist/flink/flink-1.9.0/flink-1.13.1-bin-scala_2.11.tgz
2.解压flink:
tar -zxvf flink-1.13.1-bin-scala_2.11.tgz
3.编辑flink配置文件,配置java环境
cd flink-1.13.1
vim conf/flink-conf.yaml
添加配置:env.java.home=/home/jdk/jdk1.8.0_291
4.上传flink_CDC驱动包和mysql驱动包:
cd flink-1.13.1/lib
上传:
flink-sql-connector-mysql-cdc-2.1.0.jar
mysql-connector-java-8.0.13.jar
5.启动flink集群:
/bin/start-cluster.sh
3.3创建mysql表:
CREATE TABLE `products` (
`id` int NOT NULL,
`name` varchar(45) DEFAULT NULL,
`description` varchar(45) DEFAULT NULL,
`weight` decimal(10,3) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
3.4启动flink-sql-client
本次主要是通过flink的sql客户端来测试的.
./flink-1.13.1/bin/sql-client.sh
3.5创建Flink_CDC虚拟表:
CREATE TABLE `products_cdc` (
id INT NOT NULL,
name varchar(32),
description varchar(45),
weight DECIMAL(10,3)
) WITH (
'scan.incremental.snapshot.enabled' = 'false',
'connector' = 'mysql-cdc',
'hostname' = '0.0.0.0',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'test_db',
'table-name' = 'products'
);
###如果未设置'scan.incremental.snapshot.enabled' = 'false',会报错:
Caused by: org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'
at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.validatePrimaryKeyIfEnableParallel(MySqlTableSourceFactory.java:186)
at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.createDynamicTableSource(MySqlTableSourceFactory.java:85)
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:134)
... 30 more
###报错原因:
MySQL CDC源表在Flink 1.13版本会进行语法检查,在MySQL CDC DDL WITH参数中,未设置主键(Primary Key)信息.因为Flink 1.13版本,新增支持按PK分片,进行多并发读取数据的功能.
###解决方案:
如果在Flink 1.13版本,您需要多并发读取MySQL数据,则在DDL中添加PK信息.
如果在Flink 1.13版本,您不需要多并发读取MySQL数据,则在DDL中添加scan.incremental.snapshot.enabled 参数,且把该参数值设置为false,无需设置PK信息.
3.6查询CDC表数据:
3.6.1 查看表数据
select * from products_cdc;
3.6.2 在数据库中新增一条数据:
insert into products(id,name,description,weight) values(5,‘gg’,‘haha’,60);
3.6.3观察products_cdc表数据变化;
到此,通过flink-sql-client来增量获取mysql全量和增量数据变化的样例已结束.
先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在.深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小.自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前.因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担.添加下方名片,即可获取全套学习资料哦
边栏推荐
猜你喜欢
IDEA版Postman插件Restful Fast Request,细节到位,功能好用
feign异常传递的两种方式 fallbackfactory和全局处理 获取服务端自定义异常
MLX90640 红外热成像仪测温模块开发笔记(完整版)
Debian侵犯Rust商标,妥协改名还是会得到豁免?
go笔记——map
Microsoft PC Manager V2.1 beta version officially released
第 45 届ICPC亚洲区域赛(上海)G-Fibonacci
BGP first experiment
ERROR 2003 (HY000) Can‘t connect to MySQL server on ‘localhost3306‘ (10061)
from origin ‘null‘ has been blocked by CORS policy Cross origin requests are only supported for
随机推荐
青蛙跳台阶
GO GOPROXY代理设置
管理基础知识19
How to use the go language standard library fmt package
管理基础知识18
dayjs时间处理库的基本使用
C语言实现扫雷游戏
uni-app project summary
NFT到底有哪些实际用途?
Go语学习笔记 - gorm使用 - gorm处理错误 Web框架Gin(十)
Don't concatenate strings with jOOQ
Looking back at 5 recessionary times in history: How might this time be different?
go笔记——map
期货开户如何确定期货公司正规性?
网络请求技术--跨域
交返是做日内交易的必要条件
管理基础知识13
抖音数据接口API-获取用户主页信息-监控直播开启
严格模式,use strict
字符串分割函数strtok练习