当前位置:网站首页>【基础架构】Flink/Flink-CDC的部署和配置(MySQL / ES)
【基础架构】Flink/Flink-CDC的部署和配置(MySQL / ES)
2022-07-06 11:33:00 【0xYGC】
简介
方法 / 步骤
一: 部署Flink
- 添加Flink 到环境变量
#flink
export FLINK_HOME=/usr/local/flink/flink-1.15.0/
export PATH=$FLINK_HOME/bin:$PATH
# 重载环境变量配置
source /etc/profile
- Flink 配置
# 开启外网访问
rest.bind-address: 0.0.0.0
- 启动Flink
# 启动Flink集群
./start-cluster.sh
# 停止Flink集群
#./stop-cluster.sh
启动成功以后访问 服务的8081端口,可看到Flink Web UI 界面:
二: 配置同步插件Flink_CDC并配置Demo
2.1 上传Flink_CDC驱动包和MySQL驱动包:放到主文件的lib目录下
elasticsearch连接器-后面的是flink的版本,要保证和flink版本一致。
ES SQL驱动包
2.2流式ETL作业demo
# 校验是否开启binlog 如果显示OFF则代表未开启
show variables like 'log_bin';
如果没有开启,找到配置文件添加配置
[mysqld]
#开启binlog
log-bin = mysql-bin
#选择row模式
binlog-format = ROW
#配置mysql replication需要定义,不能喝canal的slaveId重复
server_id = 1
2.2.1 MySQL创建数据库和表 products,orders,并插入数据
-- MySQL
CREATE DATABASE flinkcdc;
USE flinkcdc;
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");
CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
三: 创建Flink_CDC虚拟表
3.1 启动Flink SQL CLI
./bin/sql-client.sh
- 开启 checkpoint,每隔3秒做一次 checkpoint
Flink SQL> SET execution.checkpointing.interval = 3s;
3.2 控制台创建CDC虚拟表
- 使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据:
CREATE TABLE products (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.3.23',
'port' = '3307',
'username' = 'root',
'password' = 'My123456',
'database-name' = 'flinkcdc',
'table-name' = 'products'
);
CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.3.23',
'port' = '3307',
'username' = 'root',
'password' = 'My123456',
'database-name' = 'flinkcdc',
'table-name' = 'orders'
);
- 查询是否添加成功
select * from orders;
select * from products;
四: Flink_CDC创建ES数据
4.1 创建ES数据
- 创建 enriched_orders 表, 用来将关联后的订单数据写入 Elasticsearch 中
CREATE TABLE enriched_orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
product_name STRING,
product_description STRING,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://192.168.1.71:9200',
'index' = 'enriched_orders'
);
- 创建 enriched_orders 表, 用来将关联后的订单数据写入 Elasticsearch 中
insert into enriched_orders
select
o.order_id as order_id,
o.order_date as order_date,
o.customer_name as customer_name,
o.price as price,
o.product_id as product_id,
o.order_status as order_status,
p.name as product_name,
p.description as product_description
from orders as o
left join products as p on o.product_id=p.id;
4.2 访问 Kibana 可看到订单宽表的数据:
接下来,修改 MySQL 和 Postgres 数据库中表的数据,Kibana中显示的订单数据也将实时更新:
参考资料 & 致谢
[1] [GitHub仓库] flink
[2] [GitHub仓库] link-cdc-connectors
[3] [FlinkCDC实时同步MySQL数据到ES
[4] 从 MySQL 到 ElasticSearch
边栏推荐
- Php+redis realizes the function of canceling orders over time
- 关于图像的读取及处理等
- Problems encountered in using RT thread component fish
- 学习探索-无缝轮播图
- Swagger2 reports an error illegal DefaultValue null for parameter type integer
- [paper notes] transunet: transformers make strongencoders for medical image segmentation
- ModuleNotFoundError: No module named ‘PIL‘解决方法
- Mysql Information Schema 学习(二)--Innodb表
- C language daily practice - day 22: Zero foundation learning dynamic planning
- R语言使用dt函数生成t分布密度函数数据、使用plot函数可视化t分布密度函数数据(t Distribution)
猜你喜欢
Low CPU load and high loadavg processing method
三面蚂蚁金服成功拿到offer,Android开发社招面试经验
Pychrm Community Edition calls matplotlib pyplot. Solution of imshow() function image not popping up
Mind map + source code + Notes + project, ByteDance + JD +360+ Netease interview question sorting
安装Mysql报错:Could not create or access the registry key needed for the...
Interface test tool - postman
Documents to be used in IC design process
今日直播 | “人玑协同 未来已来”2022弘玑生态伙伴大会蓄势待发
How word displays modification traces
Based on butterfly species recognition
随机推荐
保证接口数据安全的10种方案
Mind map + source code + Notes + project, ByteDance + JD +360+ Netease interview question sorting
ACTF 2022圆满落幕,0ops战队二连冠!!
CPU负载很低,loadavg很高处理方法
Xingnuochi technology's IPO was terminated: it was planned to raise 350million yuan, with an annual revenue of 367million yuan
【计算情与思】扫地僧、打字员、信息恐慌与奥本海默
深入分析,Android面试真题解析火爆全网
AutoCAD - what is the default lineweight for centerline drawing and CAD? Can I modify it?
面试突击63:MySQL 中如何去重?
R语言ggplot2可视化:使用ggpubr包的ggstripchart函数可视化分组点状条带图(dot strip plot)、设置add参数为不同水平点状条带图添加箱图
A method of removing text blur based on pixel repair
通俗的讲解,带你入门协程
Sanmian ant financial successfully got the offer, and has experience in Android development agency recruitment and interview
R language ggplot2 visualization: use the ggstripchart function of ggpubr package to visualize the grouped dot strip plot, and set the add parameter to add box plots for different levels of dot strip
全套教学资料,阿里快手拼多多等7家大厂Android面试真题
R语言使用dt函数生成t分布密度函数数据、使用plot函数可视化t分布密度函数数据(t Distribution)
A popular explanation will help you get started
零基础入门PolarDB-X:搭建高可用系统并联动数据大屏
Detailed idea and code implementation of infix expression to suffix expression
接雨水问题解析