当前位置:网站首页>Load MySQL table data consumption quick installation configuration through kafka/flink
Load MySQL table data consumption quick installation configuration through kafka/flink
2022-06-24 16:16:00 【User 7689089】
explain : For data migration tools , A lot of packaging kafka and flink Of , out of curiosity , I tried to download it kafka and flink Try to deploy , This time, I will simply record the installation process , There are many trampling holes . Some typical mistakes , After finishing the follow-up work, I will also share . This operation is a single machine , Single instance , A single node , It does not involve distributed or cluster configuration .
This paper is divided into three parts 3 Stages :
One 、mysql Install the parts
Two 、kafka install and configure
3、 ... and 、kafka Consumption and testing
Four 、flink adopt sql-client Client load read mysql surface
========== Software version :
operating system :Centos7.4
1、mysql-5.7.22-linux-glibc2.12-x86_64
link :https://pan.baidu.com/s/1KlR-rRoHC97aX2j2ha05ng
Extraction code :ksi9
=======================
2、apache-zookeeper-3.5.6-bin.tar.gz
link :https://pan.baidu.com/s/1zOSeOK_ZiPmNzP8EuwwTBA
Extraction code :k1is
=====================
3、confluentinc-kafka-connect-jdbc-10.1.1.zip
link :https://pan.baidu.com/s/1jTOUiXNdNOBQnTiuuiDcOA
Extraction code :spwr
====================
4、kafka_2.11-2.4.0.tgz
link :https://pan.baidu.com/s/1u3Q_4F1nQSFWj7qG6ESDZA
Extraction code :x2oy
=================
5、flink-1.12.2-bin-scala_2.11.tgz
link :https://pan.baidu.com/s/1tPhpAmLlEhbeV8y-hNnb_A
Extraction code :qswm
===========java Version and use of jar package :
mysql-connector-java-8.0.23.jar
link :https://pan.baidu.com/s/1XDQkUMzJm7dRn-L74Ar-PQ
Extraction code :shfy
===================================
flink-sql-connector-mysql-cdc-1.0.0.jar
link :https://pan.baidu.com/s/13z5ocJaebmOM71TXKOCnLQ
Extraction code :2ine
=============================
[[email protected] ~]# java -version
openjdk version "1.8.0_131"
OpenJDK Runtime Environment (build 1.8.0_131-b12)
OpenJDK 64-Bit Server VM (build 25.131-b12, mixed mode)
====================================================================
One 、 First MySQL Installation and initialization
Here I will quickly post the main steps , The following unified software directories are placed in /usr/local Next
[[email protected] ~]# useradd mysql
[[email protected] ~]# passwd mysql
[[email protected] ~]# tar -xf mysql-5.7.22-linux-glibc2.12-x86_64.tar.gz
[[email protected] ~]# mv mysql-5.7.22-linux-glibc2.12 /usr/local/mysql
[[email protected] ~]# mkdir -p /usr/local/mysql/data
[[email protected] ~]# chown mysql.mysql /usr/local/mysql/ -R
To configure mysql The default variable parameter file :
[[email protected] ~]# vim /etc/my.cnf
[[email protected] ~]# cat /etc/my.cnf
[mysqld]
basedir=/usr/local/mysql
datadir=/usr/local/mysql/data
socket=/tmp/mysql.sock
port=3306
server-id=100
log-bin
[[email protected] ~]# chown mysql.mysql /etc/my.cnf
[[email protected] ~]# su - mysql
Last login: Thu Apr 29 16:41:25 CST 2021 on pts/1
[[email protected] ~]# su - mysql
Last login: Thu Apr 29 16:12:57 CST 2021 on pts/1
[[email protected] ~]$
[[email protected] ~]$ exit
logout
[[email protected] ~]# chown mysql.mysql /usr/local/mysql/ -R
[[email protected] ~]# mkdir -p /usr/local/mysql/data
[[email protected] ~]#
[[email protected] ~]# chown mysql.mysql /usr/local/mysql/ -R
[[email protected] ~]#
[[email protected] ~]# vim /etc/my.cnf
[[email protected] ~]# cat /etc/my.cnf
[mysqld]
basedir=/usr/local/mysql
datadir=/usr/local/mysql/data
socket=/tmp/mysql.sock
port=3306
server-id=100
log-bin
[[email protected] ~]# chown mysql.mysql /etc/my.cnf
[[email protected] ~]#
[[email protected] ~]# su - mysql
Last login: Thu Apr 29 16:41:25 CST 2021 on pts/1
[[email protected] ~]$
[[email protected] ~]$ vim .bash_profile
hold mysql Add the path information of , Convenient to call mysql Client program
export MYSQL_HOME=/usr/local/mysql
export PATH=$PATH:$MYSQL_HOME/bin
[[email protected] ~]$ source .bash_profile
initialization mysql, And start the mysql database
[[email protected] ~]$ mysqld --initialize --basedir=/usr/local/mysql --datadir=/usr/local/mysql/data --socket=/tmp/mysql.sock --port=3306
2021-04-29T08:44:26.802747Z 0 [Warning] Gtid table is not ready to be used. Table 'mysql.gtid_executed' cannot be opened.
2021-04-29T08:44:26.805748Z 1 [Note] A temporary password is generated for [email protected]: FvfKr?zGg3B9
[[email protected] ~]$
[[email protected] ~]$ mysqld_safe --defaults-file=/etc/my.cnf &
[1] 4135
[[email protected] ~]$ jobs
[1]+ Running mysqld_safe --defaults-file=/etc/my.cnf &
Modify the initial root password , And create a test library db1, And create root Remote login account ,[email protected]‘%’
[[email protected] ~]$ mysql -uroot -p'FvfKr?zGg3B9'
mysql: [Warning] Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 2
Server version: 5.7.22-log
mysql> alter user [email protected] identified by '123123';
Query OK, 0 rows affected (0.00 sec)
mysql> flush privileges;
Query OK, 0 rows affected (0.00 sec)
mysql> create user [email protected]'%' identified by '123123';
mysql>grant all privileges on *.* to [email protected]'%';
mysql>create database db1; -------- With the back kafka Consume the library specified to be read
mysql>create table db1.t1(id int primary key ,name varchar(20),time1 timestamp default now()); -------kafka To read , And consumption table
==================
Two 、kafka Quick configuration
Use root Operating system accounts
First, decompression. kafka Need to use zookeeper To do it broker Connector registration record , Used as a meta Information storage ,consumer The consumption state of ,group Management and offset We first unzip and start zookeeper.
[[email protected] ~]# tar -xf apache-zookeeper-3.5.6-bin.tar.gz
[[email protected] ~]# mv apache-zookeeper-3.5.6 /usr/local/zookeeper
[[email protected] ~]# cd /usr/local/zookeeper/
[[email protected] zookeeper]# ls
bin conf docs lib LICENSE.txt NOTICE.txt README.md README_packaging.txt
[[email protected] zookeeper]# cd conf/
[[email protected] conf]# ls
configuration.xsl log4j.properties zoo_sample.cfg
zookeeper Default read zoo.cfg The configuration file , Here, the default system gives a zoo_sample.cfg file , We can directly cp And rename it .
Here's about zk Other configurations for , Just use the default value .
[[email protected] conf]# cp zoo_sample.cfg zoo.cfg
[[email protected] conf]#
Next is kafka Decompression and configuration file configuration
[[email protected] ~]# tar -xf kafka_2.11-2.4.0.tgz
[[email protected] ~]# mv kafka_2.11-2.4 /usr/local/kafka
[[email protected] ~]# cd /usr/local/kafka/
[[email protected] kafka]# ls
bin config libs LICENSE NOTICE site-docs
[[email protected] kafka]# cd config/ ---------- Switch to the profile directory , find server Configuration file at the beginning
[[email protected] config]# ls
connect-console-sink.properties connect-file-sink.properties connect-mirror-maker.properties log4j.properties tools-log4j.properties
connect-console-source.properties connect-file-source.properties connect-standalone.properties producer.properties trogdor.conf
connect-distributed.properties connect-log4j.properties consumer.properties server.properties zookeeper.properties
[[email protected] config]#
[[email protected] config]# vim server.properties
Configure that the internal and external single network cards of the listening port can write the same , Or the external network card does not write
################################################
listeners=PLAINTEXT://192.168.100.10:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://192.168.100.10:9092
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181 ------------zk The default address and port of
# Timeout in ms for connecting to zookeepe
zookeeper.connection.timeout.ms=6000
Unzip plug-in , And configuration kafka Of conection Configuration file for
[[email protected] ~]# unzip confluentinc-kafka-connect-jdbc-10.1.1.zip
[[email protected] ~]# cd /usr/local/kafka/
Can move to kafka A directory of , And named it connect-jdbc
[[email protected] kafka]# ls
bin config connect-jdbc libs LICENSE logs NOTICE site-docs
[[email protected]~]# cd /usr/local/kafka/config
Modify the configuration file of a stand-alone connection
[[email protected] config]# vim connect-standalone.properties
The modification is as follows :
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=192.168.100.10:9092
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
# The directory where the plug-in to be loaded is located , Here is the kafka Of lib And the newly moved plug-ins lib Here's a statement
plugin.path=/usr/local/kafka/lib,/usr/local/kafka/connect-jdbc/lib
[[email protected] lib]# pwd
/usr/local/kafka/connect-jdbc/lib
[[email protected] lib]#
[[email protected] lib]# This plug-in directory contains kafka Call link mysql Of jdbc Of jar Interface drive
[[email protected] lib]# ls
checker-qual-3.5.0.jar mssql-jdbc-8.4.1.jre8.jar oraclepki-19.7.0.0.jar postgresql-42.2.19.jar ucp-19.7.0.0.ja
common-utils-6.0.0.jar ojdbc8-19.7.0.0.jar orai18n-19.7.0.0.jar simplefan-19.7.0.0.jar xdb-19.7.0.0.ja
jtds-1.3.1.jar ojdbc8-production-19.7.0.0.pom osdt_cert-19.7.0.0.jar slf4j-api-1.7.30.jar xmlparserv2-19.7.0.0.ja
kafka-connect-jdbc-10.1.1.jar ons-19.7.0.0.jar osdt_core-19.7.0.0.jar sqlite-jdbc-3.25.2.ja
[[email protected] lib]#
Add the parameter file of the configuration connector
[[email protected] etc]# pwd
/usr/local/kafka/connect-jdbc/etc
[[email protected] etc]# ls
sink-quickstart-sqlite.properties source-quickstart-sqlite.properties
[[email protected] etc]# cp source-quickstart-sqlite.properties /usr/local/kafka/config/mysql..properties
[[email protected] config]# vim mysql.properties
The configuration is as follows :
# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.
#connection.url=jdbc:sqlite:test.db
connection.url=jdbc:mysql://192.168.100.10:3306/db1?user=root&password=123123 # -- Attention format db1? , User name and password are used in the middle & Symbol ,
mode=incrementing
incrementing.column.name=id -- Incremental reference column
topic.prefix=mysql- --kafka Of topic The name will begin with
table.whitelist=t1,t2,t3 -- The... To be read will be loaded mysql White list of tables in the database
# Define when identifiers should be quoted in DDL and DML statements.
==================
3、 ... and 、kafka Consumption and testing
Next launch zookeeper and kafka service
Before starting the service , We can configure it first root The path variables of each component , It is convenient for us to call the command . Production is not recommended for root Override to do variable configuration , Once the path configuration is wrong , Will affect the global system .
[[email protected] ~]# vim .bash_profile
export ZK_HOME=/usr/local/zookeepe
export KAFKA_HOME=/usr/local/kafka
export FLINK_HOME=/usr/local/flink ------flink This preconfigured , I'll use
export PATH=$PATH:$ZK_HOME/bin:$KAFKA_HOME/bin:$FLINK_HOME/bin
[[email protected] ~]#
[[email protected] ~]# source .bash_profile
start-up zookeeper service
[[email protected] ~]# zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg ---- It will load by default zoo.cfg The configuration file
Starting zookeeper ... STARTED
[[email protected] ~]# ps -ef|grep zookeeper #-- View service progress
start-up kafka service
[[email protected] ~]# kafka-server-start.sh /usr/local/kafka/config/server.properties &
[1] 19309
[[email protected] ~]#
[2021-04-29 19:49:54,740] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2021-04-29 19:49:55,351] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2021-04-29 19:49:55,352] INFO starting (kafka.server.KafkaServer)
[[email protected] ~]# jobs
[1]+ Running kafka-server-start.sh /usr/local/kafka/config/server.properties &
[[email protected] ~]#
3、 ... and +、 Start testing data loading and consumption
Use kafka Of kafka-topic.sh Try creating a test topic, You can test whether the service is available , The specific way is as follows :
[[email protected] ~]# kafka-topics.sh --create --zookeeper 192.168.100.10:2181 --replication-factor 1 --partitions 3 --topic test-topic
Created topic test-topic.
View the created topic list
[[email protected] ~]# kafka-topics.sh --list --zookeeper 192.168.100.10:2181
test-topic
[[email protected] ~]# start-up kafka To mysql The connectors
Confirm whether all can be loaded into mysql in db1 In the library t1 surface
[[email protected] ~]#
[[email protected] ~]# connect-standalone.sh /usr/local/kafka/config/connect-standalone.properties /usr/local/kafka/config/mysql.properties &
If it can be loaded successfully mysql Of db1.t1 surface , You may see the following prompt
[2021-04-29 19:59:19,874] INFO WorkerSourceTask{id=source-mysql-jdbc-autoincrement-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:209)
[2021-04-29 19:59:19,877] INFO Begin using SQL query: SELECT * FROM `db1`.`t1` WHERE `db1`.`t1`.`id` > ? ORDER BY `db1`.`t1`.`id` ASC (io.confluent.connect.jdbc.source.TableQuerier:164)
Read kafka Loaded mysql Table data
Next, start the consumer side , To consume kafka Has gone from mysql Data loaded on the production side , First view the loaded topic Information
[[email protected] config]# kafka-topics.sh --list --zookeeper 192.168.100.10:2181
__consumer_offsets
mysql-t1 --- This is where mysql.properties Defined in parameter topic.prefix=mysql- Of kafka name topic The prefix of
test-topic
==========================
stay mysql Insert the following data :
mysql> select * from db1.t1;
[[email protected] config]# kafka-console-consumer.sh --bootstrap-server 192.168.100.10:9092 --topic mysql-t1 --from-beginning
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"time1"}],"optional":false,"name":"t1"},"payload":{"id":10,"name":"test-kafka-consumer","time1":1619719214000}} --timestap This is converted to a timestamp value
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"time1"}],"optional":false,"name":"t1"},"payload":{"id":20,"name":"","time1":1619721861000}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"time1"}],"optional":false,"name":"t1"},"payload":{"id":30,"name":"xxxxxxxxxxxx","time1":1619721869000}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"time1"}],"optional":false,"name":"t1"},"payload":{"id":40,"name":"AAAAAAAAAAA","time1":1619721920000}}
Four 、 Use flink To load the mysql The data of
flink Here we just need to unzip , And then call flink-sql-connector-mysql-cdc-1.0.0.jar drive Capture mysql Of binlog The change of , To dynamically refresh data changes .
as follows :
[[email protected]localhost flink]# tar -xf flink-1.12.2-bin-scala_2.11.tgz
[[email protected] ~]# mv flink-1.12.2-bin /usr/local/flink
[[email protected] ~]#
[[email protected] ~]# start-cluster.sh ---- Direct start flink service
Starting cluster.
Starting standalonesession daemon on host localhost.
Starting taskexecutor daemon on host localhost.
[[email protected] ~]#
[[email protected] ~]# sql-client.sh embedded --- call flink Of sql client
No default environment specified.
Searching for '/usr/local/flink/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/usr/local/flink/conf/sql-client-defaults.yaml
No session environment specified.
Command history file path: /root/.flink-sql-history
Flink SQL> CREATE TABLE flink_tab( id int primary key, name string,time1 string) --mysql In the library t1 The column names of must match
> WITH (
> 'connector' = 'mysql-cdc', -- The connector
> 'hostname' = '192.168.100.10', --mysql Address
> 'port' = '3306', -- mysql port
> 'username' = 'root', --mysql user name
> 'password' = '123123', -- mysql password
> 'database-name' = 'db1', -- Database name
> 'table-name' = 't1' -- Database table name
> );
[INFO] Table has been created.
Flink SQL>
Flink SQL> select *from flink_tab; ------- see
The errors encountered will be updated later , The software packages used here are packaged and shared , If you are interested, you can try to find out , The principle can be understood according to the official website and online sharing .
flink Official website , About sql-clent Configuration of :https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html
边栏推荐
- MySQL date timestamp conversion
- 打破内存墙的新利器成行业“热搜”!持久内存让打工人也能玩转海量数据+高维模型
- Rush for IPO, Hello, I'm in a hurry
- Flink kubernetes application deployment
- ThinkPHP vulnerability exploitation tool
- 构建Go命令行程序工具链
- Global and Chinese market of music synthesizer 2022-2028: Research Report on technology, participants, trends, market size and share
- 炒期货在哪里开户最正规安全?怎么期货开户?
- Siggraph 2022 | truly restore the hand muscles. This time, the digital human hands have bones, muscles and skin
- C. K-th not divisible by n (Mathematics + thinking) codeforces round 640 (Div. 4)
猜你喜欢

Implement Domain Driven Design - use ABP framework - domain logic & application logic

几种常见的DoS攻击
![[application recommendation] the hands-on experience and model selection suggestions of apifox & apipost in the recent fire](/img/dd/24df91a8a1cf1f1b9ac635abd6863a.png)
[application recommendation] the hands-on experience and model selection suggestions of apifox & apipost in the recent fire

【应用推荐】最近大火的Apifox & Apipost 上手体验与选型建议

SIGGRAPH 2022 | 真实还原手部肌肉,数字人双手这次有了骨骼、肌肉、皮肤

60 个神级 VS Code 插件!!

I just came back from the Ali software test. I worked for Alibaba P7 in 3+1, with an annual salary of 28*15

Still worried about missing measurements? Let's use Jacobo to calculate the code coverage

Using alicloud RDS for SQL Server Performance insight to optimize database load - first understanding of performance insight
![[my advanced OpenGL learning journey] learning notes of OpenGL coordinate system](/img/21/48802245fea2921fd5e4a9a2d9ad18.jpg)
[my advanced OpenGL learning journey] learning notes of OpenGL coordinate system
随机推荐
PyTorch中的转置卷积详解
转置卷积学习笔记
Summer Challenge harmonyos - to do list with date effect
ZOJ - 4104 sequence in the pocket
leetcode 139. Word Break 單詞拆分(中等)
The decline of China's product managers: starting from the nostalgia for jobs
炒期货在哪里开户最正规安全?怎么期货开户?
Leetcode notes of Google boss | necessary for school recruitment!
Pytorch transpose convolution
Global and Chinese markets of stainless steel barbecue ovens 2022-2028: Research Report on technology, participants, trends, market size and share
Recommend several super practical data analysis tools
My network relationship with "apifox"
C. Three displays codeforces round 485 (Div. 2)
Goby+awvs realize attack surface detection
Some experiences of project K several operations in the global template
Some adventurer hybrid versions with potential safety hazards will be recalled
Solution to the problem that FreeRTOS does not execute new tasks
CAP:多重注意力机制,有趣的细粒度分类方案 | AAAI 2021
Nifi from introduction to practice (nanny level tutorial) - environment
2021-04-24: handwriting Code: topology sorting.