当前位置:网站首页>[infrastructure] deployment and configuration of Flink / Flink CDC (MySQL / es)
[infrastructure] deployment and configuration of Flink / Flink CDC (MySQL / es)
2022-07-06 19:37:00 【0xYGC】
brief introduction
Method / step
One : Deploy Flink
- add to Flink To environment variable
#flink
export FLINK_HOME=/usr/local/flink/flink-1.15.0/
export PATH=$FLINK_HOME/bin:$PATH
# Overload environment variable configuration
source /etc/profile
- Flink To configure
# Open Internet access
rest.bind-address: 0.0.0.0
- start-up Flink
# start-up Flink colony
./start-cluster.sh
# stop it Flink colony
#./stop-cluster.sh
Visit after successful startup Service 8081 port , You can see Flink Web UI Interface :
Two : Configure the synchronization plug-in Flink_CDC And configuration Demo
2.1 Upload Flink_CDC Drive package and MySQL Drive pack : Put it in the main file lib Under the table of contents
elasticsearch The connector - The back is flink Version of , We need to make sure that we are in touch with each other flink Versions, .
ES SQL Drive pack
2.2 streaming ETL Homework demo
# Check whether it is turned on binlog If it shows OFF It means it is not turned on
show variables like 'log_bin';
If it's not on , Find the configuration file and add the configuration
[mysqld]
# Turn on binlog
log-bin = mysql-bin
# choice row Pattern
binlog-format = ROW
# To configure mysql replication Need to define , Can't drink canal Of slaveId repeat
server_id = 1
2.2.1 MySQL Create databases and tables products,orders, And insert data
-- MySQL
CREATE DATABASE flinkcdc;
USE flinkcdc;
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");
CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
3、 ... and : establish Flink_CDC Virtual table
3.1 start-up Flink SQL CLI
./bin/sql-client.sh
- Turn on checkpoint, every other 3 Do it every second checkpoint
Flink SQL> SET execution.checkpointing.interval = 3s;
3.2 Console creation CDC Virtual table
- Use Flink SQL CLI Create the corresponding table , Used to synchronize the data of these underlying database tables :
CREATE TABLE products (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.3.23',
'port' = '3307',
'username' = 'root',
'password' = 'My123456',
'database-name' = 'flinkcdc',
'table-name' = 'products'
);
CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.3.23',
'port' = '3307',
'username' = 'root',
'password' = 'My123456',
'database-name' = 'flinkcdc',
'table-name' = 'orders'
);
- Whether the query is added successfully
select * from orders;
select * from products;
Four : Flink_CDC establish ES data
4.1 establish ES data
- establish enriched_orders surface , Used to write the associated order data into Elasticsearch in
CREATE TABLE enriched_orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
product_name STRING,
product_description STRING,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://192.168.1.71:9200',
'index' = 'enriched_orders'
);
- establish enriched_orders surface , Used to write the associated order data into Elasticsearch in
insert into enriched_orders
select
o.order_id as order_id,
o.order_date as order_date,
o.customer_name as customer_name,
o.price as price,
o.product_id as product_id,
o.order_status as order_status,
p.name as product_name,
p.description as product_description
from orders as o
left join products as p on o.product_id=p.id;
4.2 visit Kibana You can see the data of the order wide table :
Next , modify MySQL and Postgres Data from tables in the database ,Kibana The order data displayed in will also be updated in real time :
Reference material & thank
[1] [GitHub Warehouse ] flink
[2] [GitHub Warehouse ] link-cdc-connectors
[3] [FlinkCDC Real time synchronization MySQL Data to ES
[4] from MySQL To ElasticSearch
边栏推荐
- Chic Lang: attributeerror: partially initialized module 'CV2' has no attribute 'GAPI_ wip_ gst_ GStreamerPipe
- Sanmian ant financial successfully got the offer, and has experience in Android development agency recruitment and interview
- Black Horse - - Redis Chapter
- Leetcode 30. 串联所有单词的子串
- 算法面试经典100题,Android程序员最新职业规划
- Swiftui game source code Encyclopedia of Snake game based on geometryreader and preference
- Swagger2 reports an error illegal DefaultValue null for parameter type integer
- 终于可以一行代码也不用改了!ShardingSphere 原生驱动问世
- [translation] supply chain security project in toto moved to CNCF incubator
- 【翻译】Linkerd在欧洲和北美的采用率超过了Istio,2021年增长118%。
猜你喜欢
JDBC详解
【翻译】Linkerd在欧洲和北美的采用率超过了Istio,2021年增长118%。
[translation] micro survey of cloud native observation ability. Prometheus leads the trend, but there are still obstacles to understanding the health of the system
Hudi vs Delta vs Iceberg
Druid 数据库连接池 详解
Help improve the professional quality of safety talents | the first stage of personal ability certification and assessment has been successfully completed!
Mysql Information Schema 學習(一)--通用錶
【翻译】云原生观察能力微调查。普罗米修斯引领潮流,但要了解系统的健康状况仍有障碍...
系统性详解Redis操作Hash类型数据(带源码分析及测试结果)
zabbix 代理服务器 与 zabbix-snmp 监控
随机推荐
Learning and Exploration - function anti shake
[translation] linkerd's adoption rate in Europe and North America exceeded istio, with an increase of 118% in 2021.
MySQL information schema learning (II) -- InnoDB table
Leetcode topic [array] - 119 Yang Hui triangle II
蓝桥杯 微生物增殖 C语言
Zero foundation entry polardb-x: build a highly available system and link the big data screen
Reflection and illegalaccessexception exception during application
Leetcode 30. 串联所有单词的子串
zabbix 代理服务器 与 zabbix-snmp 监控
腾讯Android面试必问,10年Android开发经验
LeetCode_ Gray code_ Medium_ 89. Gray code
企业精益管理体系介绍
Intelligent supply chain management system solution for hardware and electromechanical industry: digital intelligent supply chain "creates new blood" for traditional industries
Countdown 2 days | live broadcast preview of Tencent cloud message queue data import platform
Dom 操作
Modulenotfounderror: no module named 'PIL' solution
Dark horse -- redis
力扣101题:对称二叉树
受益匪浅,安卓面试问题
Mysql Information Schema 学习(一)--通用表