当前位置:网站首页>【基础架构】Flink/Flink-CDC的部署和配置(MySQL / ES)
【基础架构】Flink/Flink-CDC的部署和配置(MySQL / ES)
2022-07-06 11:33:00 【0xYGC】
简介
方法 / 步骤
一: 部署Flink
- 添加Flink 到环境变量
#flink
export FLINK_HOME=/usr/local/flink/flink-1.15.0/
export PATH=$FLINK_HOME/bin:$PATH
# 重载环境变量配置
source /etc/profile
- Flink 配置
# 开启外网访问
rest.bind-address: 0.0.0.0
- 启动Flink
# 启动Flink集群
./start-cluster.sh
# 停止Flink集群
#./stop-cluster.sh
启动成功以后访问 服务的8081端口,可看到Flink Web UI 界面:
二: 配置同步插件Flink_CDC并配置Demo
2.1 上传Flink_CDC驱动包和MySQL驱动包:放到主文件的lib目录下
elasticsearch连接器-后面的是flink的版本,要保证和flink版本一致。
ES SQL驱动包
2.2流式ETL作业demo
# 校验是否开启binlog 如果显示OFF则代表未开启
show variables like 'log_bin';
如果没有开启,找到配置文件添加配置
[mysqld]
#开启binlog
log-bin = mysql-bin
#选择row模式
binlog-format = ROW
#配置mysql replication需要定义,不能喝canal的slaveId重复
server_id = 1
2.2.1 MySQL创建数据库和表 products,orders,并插入数据
-- MySQL
CREATE DATABASE flinkcdc;
USE flinkcdc;
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");
CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
三: 创建Flink_CDC虚拟表
3.1 启动Flink SQL CLI
./bin/sql-client.sh
- 开启 checkpoint,每隔3秒做一次 checkpoint
Flink SQL> SET execution.checkpointing.interval = 3s;
3.2 控制台创建CDC虚拟表
- 使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据:
CREATE TABLE products (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.3.23',
'port' = '3307',
'username' = 'root',
'password' = 'My123456',
'database-name' = 'flinkcdc',
'table-name' = 'products'
);
CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.3.23',
'port' = '3307',
'username' = 'root',
'password' = 'My123456',
'database-name' = 'flinkcdc',
'table-name' = 'orders'
);
- 查询是否添加成功
select * from orders;
select * from products;
四: Flink_CDC创建ES数据
4.1 创建ES数据
- 创建 enriched_orders 表, 用来将关联后的订单数据写入 Elasticsearch 中
CREATE TABLE enriched_orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
product_name STRING,
product_description STRING,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://192.168.1.71:9200',
'index' = 'enriched_orders'
);
- 创建 enriched_orders 表, 用来将关联后的订单数据写入 Elasticsearch 中
insert into enriched_orders
select
o.order_id as order_id,
o.order_date as order_date,
o.customer_name as customer_name,
o.price as price,
o.product_id as product_id,
o.order_status as order_status,
p.name as product_name,
p.description as product_description
from orders as o
left join products as p on o.product_id=p.id;
4.2 访问 Kibana 可看到订单宽表的数据:
接下来,修改 MySQL 和 Postgres 数据库中表的数据,Kibana中显示的订单数据也将实时更新:
参考资料 & 致谢
[1] [GitHub仓库] flink
[2] [GitHub仓库] link-cdc-connectors
[3] [FlinkCDC实时同步MySQL数据到ES
[4] 从 MySQL 到 ElasticSearch
边栏推荐
- Solution of commercial supply chain management platform for packaging industry: layout smart supply system and digitally integrate the supply chain of packaging industry
- ACTF 2022圆满落幕,0ops战队二连冠!!
- tensorflow和torch代码验证cuda是否安装成功
- 谷粒商城--分布式高级篇P129~P339(完结)
- Translation D28 (with AC code POJ 26:the nearest number)
- 思維導圖+源代碼+筆記+項目,字節跳動+京東+360+網易面試題整理
- 学习探索-无缝轮播图
- Mysql Information Schema 学习(二)--Innodb表
- Tensorflow2.0 自定义训练的方式求解函数系数
- R语言使用order函数对dataframe数据进行排序、基于单个字段(变量)进行降序排序(DESCENDING)
猜你喜欢
ROS custom message publishing subscription example
Word如何显示修改痕迹
How to do smoke test
MRO工业品企业采购系统:如何精细化采购协同管理?想要升级的工业品企业必看!
Dark horse -- redis
C language daily practice - day 22: Zero foundation learning dynamic planning
10 schemes to ensure interface data security
黑馬--Redis篇
[translation] a GPU approach to particle physics
Solution of intelligent management platform for suppliers in hardware and electromechanical industry: optimize supply chain management and drive enterprise performance growth
随机推荐
GCC [7] - compilation checks the declaration of functions, and link checks the definition bugs of functions
The dplyr package of R language performs data grouping aggregation statistical transformations and calculates the grouping mean of dataframe data
凤凰架构3——事务处理
包装行业商业供应链管理平台解决方案:布局智慧供应体系,数字化整合包装行业供应链
IC设计流程中需要使用到的文件
R语言使用order函数对dataframe数据进行排序、基于单个字段(变量)进行降序排序(DESCENDING)
ROS custom message publishing subscription example
R language uses rchisq function to generate random numbers that conform to Chi square distribution, and uses plot function to visualize random numbers that conform to Chi square distribution
spark基础-scala
史上超级详细,想找工作的你还不看这份资料就晚了
凤凰架构2——访问远程服务
English topic assignment (25)
黑马--Redis篇
C language daily practice - day 22: Zero foundation learning dynamic planning
Countdown 2 days | live broadcast preview of Tencent cloud message queue data import platform
10 schemes to ensure interface data security
Mind map + source code + Notes + project, ByteDance + JD +360+ Netease interview question sorting
swagger2报错Illegal DefaultValue null for parameter type integer
Cereals Mall - Distributed Advanced p129~p339 (end)
php+redis实现超时取消订单功能