当前位置:网站首页>[infrastructure] deployment and configuration of Flink / Flink CDC (MySQL / es)

[infrastructure] deployment and configuration of Flink / Flink CDC (MySQL / es)

2022-07-06 19:37:00 0xYGC

brief introduction

Integrated resource package

Method / step

One : Deploy Flink

  • add to Flink To environment variable
#flink
export FLINK_HOME=/usr/local/flink/flink-1.15.0/
export PATH=$FLINK_HOME/bin:$PATH

#  Overload environment variable configuration 
source /etc/profile
  • Flink To configure
#  Open Internet access 
rest.bind-address: 0.0.0.0
  • start-up Flink
#  start-up Flink colony 
./start-cluster.sh

#  stop it Flink colony 
#./stop-cluster.sh

Visit after successful startup Service 8081 port , You can see Flink Web UI Interface :
 Insert picture description here

Two : Configure the synchronization plug-in Flink_CDC And configuration Demo

2.1 Upload Flink_CDC Drive package and MySQL Drive pack : Put it in the main file lib Under the table of contents

elasticsearch The connector - The back is flink Version of , We need to make sure that we are in touch with each other flink Versions, .
ES SQL Drive pack
 Insert picture description here

2.2 streaming ETL Homework demo

#  Check whether it is turned on binlog  If it shows OFF It means it is not turned on 
show variables like 'log_bin';

If it's not on , Find the configuration file and add the configuration

[mysqld]
# Turn on binlog
log-bin = mysql-bin

# choice row Pattern 
binlog-format = ROW

# To configure mysql replication Need to define , Can't drink canal Of slaveId repeat 
server_id = 1 

2.2.1 MySQL Create databases and tables products,orders, And insert data

-- MySQL
CREATE DATABASE flinkcdc;
USE flinkcdc;
CREATE TABLE products (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
       (default,"car battery","12V car battery"),
       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
       (default,"hammer","12oz carpenter's hammer"),
       (default,"hammer","14oz carpenter's hammer"),
       (default,"hammer","16oz carpenter's hammer"),
       (default,"rocks","box of assorted rocks"),
       (default,"jacket","water resistent black wind breaker"),
       (default,"spare tire","24 inch spare tire");

CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;

INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
       (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
       (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

3、 ... and : establish Flink_CDC Virtual table

3.1 start-up Flink SQL CLI

./bin/sql-client.sh

 Insert picture description here

  • Turn on checkpoint, every other 3 Do it every second checkpoint
Flink SQL> SET execution.checkpointing.interval = 3s;

3.2 Console creation CDC Virtual table

  • Use Flink SQL CLI Create the corresponding table , Used to synchronize the data of these underlying database tables :
CREATE TABLE products (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.3.23',
    'port' = '3307',
    'username' = 'root',
    'password' = 'My123456',
    'database-name' = 'flinkcdc',
    'table-name' = 'products'
  );

CREATE TABLE orders (
    order_id INT,
    order_date TIMESTAMP(0),
    customer_name STRING,
    price DECIMAL(10, 5),
    product_id INT,
    order_status BOOLEAN,
    PRIMARY KEY (order_id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.3.23',
    'port' = '3307',
    'username' = 'root',
    'password' = 'My123456',
    'database-name' = 'flinkcdc',
    'table-name' = 'orders'
  );


  • Whether the query is added successfully
select * from orders;
select * from products;

Four : Flink_CDC establish ES data

4.1 establish ES data

  • establish enriched_orders surface , Used to write the associated order data into Elasticsearch in
CREATE TABLE enriched_orders (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   product_name STRING,
   product_description STRING,
   PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
     'connector' = 'elasticsearch-7',
     'hosts' = 'http://192.168.1.71:9200',
     'index' = 'enriched_orders'
 );
  • establish enriched_orders surface , Used to write the associated order data into Elasticsearch in
insert into enriched_orders
select 
	o.order_id      as order_id,
	o.order_date    as order_date,
	o.customer_name as customer_name,
	o.price         as price,
	o.product_id    as product_id,
	o.order_status  as order_status,
	p.name          as product_name,
	p.description   as product_description
from orders as o
left join products as p on o.product_id=p.id;

 Insert picture description here

4.2 visit Kibana You can see the data of the order wide table :

 Insert picture description here
Next , modify MySQL and Postgres Data from tables in the database ,Kibana The order data displayed in will also be updated in real time :

Reference material & thank

[1] [GitHub Warehouse ] flink
[2] [GitHub Warehouse ] link-cdc-connectors
[3] [FlinkCDC Real time synchronization MySQL Data to ES
[4] from MySQL To ElasticSearch

原网站

版权声明
本文为[0xYGC]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/187/202207061133223768.html