当前位置:网站首页>Flink_CDC construction and simple use
Flink_CDC construction and simple use
2022-08-02 01:00:00 【m0_67392811】
Flink_CDC搭建及简单使用
1.CDC简介:
CDC (Change Data Capture) ,在广义的概念上,只要能捕获数据变更的技术,都可以称为 CDC .但通常我们说的CDC 技术主要面向数据库(包括常见的mysql,Oracle, MongoDB等)的变更,是一种用于捕获数据库中数据变更的技术.
目前市面上的CDC技术非常多,常见的主要包括Flink CDC,DataX,Canal,Sqoop,Kettle,Oracle Goldengate,Debezium等.DataX,Sqoop和kettle的CDC实现技术主要是基于查询的方式实现的,通过离线调度查询作业,实现批处理请求.这种作业方式无法保证数据的一致性,实时性也较差.Flink CDC,Canal,Debezium和Oracle Goldengate是基于日志的CDC技术.这种技术,利用流处理的方式,实时处理日志数据,保证了数据的一致性,为其他服务提供了实时数据.
2.Flink_CDC简介:
目前公司主要是通过canal监控mysql的binlog日志,然后将日志数据实时发送到kafka中,通过flink程序,将日志数据实时下发到其他服务中.这种方式,数据链路长,实时性效果较差,运维也比较复杂.
Flink_CDC技术的出现,解决了传统数据库实时同步的痛点.Flink_CDC通过伪装成msql的slave节点,实时读取master节点全量和增量数据,它能够捕获所有数据的变化,捕获完整的变更记录,无需像查询CDC那样发起全表的扫描过滤,高效且无需入侵代码,完全与业务解耦,运维及其简单.
3.Flink_CDC部署:
3.1 依赖版本
环境:Linux(Centos7)
Flink : 1.31.1
Flink_CDC: flink-sql-connector-mysql-cdc-2.1.0.jar
mysql版本:8.0.13
mysql驱动包:mysql-connector-java-8.0.13.jar
3.2环境搭建
3.2.1安装java环境(不再赘述);
3.2.2安装数据库(不在赘述);
3.2.3搭建Flink环境(单机模式);
1.获取flink版本.
cd /home
wget https://archive.apache.org/dist/flink/flink-1.9.0/flink-1.13.1-bin-scala_2.11.tgz
2.解压flink:
tar -zxvf flink-1.13.1-bin-scala_2.11.tgz
3.编辑flink配置文件,配置java环境
cd flink-1.13.1
vim conf/flink-conf.yaml
添加配置:env.java.home=/home/jdk/jdk1.8.0_291
4.上传flink_CDC驱动包和mysql驱动包:
cd flink-1.13.1/lib
上传:
flink-sql-connector-mysql-cdc-2.1.0.jar
mysql-connector-java-8.0.13.jar

5.启动flink集群:
/bin/start-cluster.sh
3.3创建mysql表:
CREATE TABLE `products` (
`id` int NOT NULL,
`name` varchar(45) DEFAULT NULL,
`description` varchar(45) DEFAULT NULL,
`weight` decimal(10,3) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
3.4启动flink-sql-client
本次主要是通过flink的sql客户端来测试的.
./flink-1.13.1/bin/sql-client.sh
3.5创建Flink_CDC虚拟表:
CREATE TABLE `products_cdc` (
id INT NOT NULL,
name varchar(32),
description varchar(45),
weight DECIMAL(10,3)
) WITH (
'scan.incremental.snapshot.enabled' = 'false',
'connector' = 'mysql-cdc',
'hostname' = '0.0.0.0',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'test_db',
'table-name' = 'products'
);
###如果未设置'scan.incremental.snapshot.enabled' = 'false',会报错:
Caused by: org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'
at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.validatePrimaryKeyIfEnableParallel(MySqlTableSourceFactory.java:186)
at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.createDynamicTableSource(MySqlTableSourceFactory.java:85)
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:134)
... 30 more
###报错原因:
MySQL CDC源表在Flink 1.13版本会进行语法检查,在MySQL CDC DDL WITH参数中,未设置主键(Primary Key)信息.因为Flink 1.13版本,新增支持按PK分片,进行多并发读取数据的功能.
###解决方案:
如果在Flink 1.13版本,您需要多并发读取MySQL数据,则在DDL中添加PK信息.
如果在Flink 1.13版本,您不需要多并发读取MySQL数据,则在DDL中添加scan.incremental.snapshot.enabled 参数,且把该参数值设置为false,无需设置PK信息.
3.6查询CDC表数据:
3.6.1 查看表数据
select * from products_cdc;
3.6.2 在数据库中新增一条数据:
insert into products(id,name,description,weight) values(5,‘gg’,‘haha’,60);
3.6.3观察products_cdc表数据变化;
到此,通过flink-sql-client来增量获取mysql全量和增量数据变化的样例已结束.
先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在.深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小.自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前.因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担.添加下方名片,即可获取全套学习资料哦
边栏推荐
- MYSQL(基本篇)——一篇文章带你走进MYSQL的奇妙世界
- Go 1.18 的那些事——工作区、模糊测试、泛型
- ECMAScript 2022 正式发布,有你了解过的吗?
- 交返是做日内交易的必要条件
- go mode tidy出现报错go warning “all“ matched no packages
- ERROR 2003 (HY000) Can‘t connect to MySQL server on ‘localhost3306‘ (10061)
- Redis - message publish and subscribe
- go泛型使用方法
- 百度、百图生科 | HelixFold-Single: 使用蛋白质语言模型作为替代进行无MSA蛋白质结构预测
- Kubernetes — 核心资源对象 — 存储
猜你喜欢

String splitting function strtok exercise

Docker安装canal、mysql进行简单测试与实现redis和mysql缓存一致性

【CodeTON Round 2 (Div. 1 + Div. 2, Rated, Prizes!)(A~D)】

网络请求技术--跨域

NFT到底有哪些实际用途?

Go语学习笔记 - gorm使用 - gorm处理错误 Web框架Gin(十)

Flex布局详解

好的期货公司开户让人省心省钱

Pytorch seq2seq model architecture to achieve English translation tasks

632. Minimum interval
随机推荐
傅立叶变换相关公式
These 4 computer notepad software, you have to try
管理基础知识19
PHP to read data from TXT file
22.卷积神经网络实战-Lenet5
flyway的快速入门教程
Constructor, this keyword, method overloading, local variables and member variables
Kunpeng compile and debug plug-in actual combat
Are test points the same as test cases?
期货开户如何确定期货公司正规性?
ERROR 1819 (HY000) Your password does not satisfy the current policy requirements
管理基础知识14
管理基础知识21
管理基础知识11
Automatic conversion of Oracle-style implicit joins to ANSI JOINs using jOOQ
IDEA找不到Database解决方法
flask获取post请求参数
Realize deletion - a specified letter in a string, such as: the string "abcd", delete the "a" letter in it, the remaining "bcd", you can also pass multiple characters to be deleted, and pass "ab" can
Docker安装canal、mysql进行简单测试与实现redis和mysql缓存一致性
input禁止输入