当前位置:网站首页>Chapter 3.4: starrocks data import - Flink connector and CDC second level data synchronization

Chapter 3.4: starrocks data import - Flink connector and CDC second level data synchronization

2022-07-04 01:47:00 Driftwood follows the wind

Flink As the current popular streaming computing framework , It's docking StarRocks when , If used directly JDBC The way “ streaming ” Write data , Yes StarRocks It's not friendly ,StarRocks As a MVCC The database of , The core idea of its introduction is “ Save micro batch + Frequency reduction ”. So ,StarRocks Independently developed flink-connector-starrocks, Its internal implementation is still implemented by saving batches of data cache Stream Load Import .

StarRocks Flink Connector Now it's open source , Its github The address is :

GitHub - StarRocks/flink-connector-starrocksicon-default.png?t=M0H8https://github.com/StarRocks/flink-connector-starrocks

While compiling this article ,flink-connector-starrocks In addition to supporting StarRocks Middle write data (Sink), Also supported from StarRocks Read data from (Source) The function of , Let's introduce it later .

StarRocks In the official website document Flink Connector The introduction of is very detailed , The examples used in the code and the detailed parameters are given :

Design background @ Flink-connector-starrocks @ StarRocks Docsicon-default.png?t=M0H8https://docs.starrocks.com/zh-cn/main/loading/Flink-connector-starrocks

Here we use Flink-SQL Simple demonstration , Introduce a few points that need attention . First of all, build a demonstration environment :

node IP

Deployment Services










The user name and password are root


Broker name :hdfs_broker






Recommended 1.13, The minimum support 1.11


The official beta also supports Source and Sink Of Connector


MySQL CDC rely on


Read Kafka rely on


JDBC MySQL rely on




Zookeeper version: 3.4.13




No certification


MySQL Community Server



The user name and password are root

above jar Packages need to be copied to flink Of lib Under the table of contents . among ,CDC Special attention should be paid to the version of , And Flink The corresponding relationship between versions is shown in the following table :

Flink CDC Connector version

Flink edition













Reference address :

About Flink CDC — Flink CDC 2.0.0 documentationicon-default.png?t=M0H8https://ververica.github.io/flink-cdc-connectors/release-2.0/content/about.html#supported-flink-versions

Next , Let's briefly explain five common scenarios in business , among StarRocks The table creation in takes the primary key model as an example , Because the primary key model can better support real-time / Frequently updated scenes .

Scene one :Flink Read Kafka Data writing StarRocks

Routine Load yes StarRocks What you bring with you can be consumed Kafka How to import data , Its characteristic is simple and easy to use , Independent of external components , But if you need to Kafka The data in is more complex ETL,Routine Load May not be competent , Then we can consider using Flink To spend Kafka Data in , After cleaning and conversion , Again sink to StarRocks.

Let's take an example of real-time reports that are very common in business , We use Flink Yes Kafka Add the written data in the for real-time processing , Then the data will be stored synchronously StarRocks.

This scenario is used jar The bag has two :flink-connector-kafka_2.11-1.13.5.jar and flink-connector-starrocks-1.1.14-snapshot_flink-1.13_2.11.jar, Confirm before scene simulation jar The bag has been put in flink/lib Under the table of contents .

1.1 Data preparation

stay Kafka Create theme in behavior and province:

kafka-topics.sh --zookeeper --create --replication-factor 1 --partitions 1 --topic behavior

kafka-topics.sh --zookeeper --create --replication-factor 1 --partitions 1 --topic province

To the subject behavior The production data :

kafka-console-producer.sh  --broker-list  --topic behavior

The production data :


10002,ls,19, 11,add


To the subject province The production data :

kafka-console-producer.sh  --broker-list  --topic province

The production data :

11, Beijing

61, shaanxi

1.2 StarRocks Get ready

StarRocks Create a primary key model table in s_province:

mysql> create database starrocks;

mysql> use starrocks;

mysql> CREATE TABLE IF NOT EXISTS starrocks.`s_province` (

  `uid` int(10) NOT NULL COMMENT "",

  `p_id` int(2) NOT NULL COMMENT "",

  `p_name` varchar(30) NULL COMMENT ""





"replication_num" = "1"


1.3 Flink Get ready

start-up Flink:

[[email protected] bin]# ./start-cluster.sh

start-up sql-client:

[[email protected] bin]# ./sql-client.sh embedded

perform Flink SQL, Create upstream and downstream mapping tables :

Source part , establish Flink towards Kafka Mapping table kafka_source_behavior:

Flink SQL> CREATE TABLE kafka_source_behavior (

    uuid int,

    name string,

    age int,

    province_id int,

    behavior string

) WITH (

    'connector' = 'kafka',

    'topic' = 'behavior',

    'properties.bootstrap.servers' = '',

    'properties.group.id' = 'source_behavior',

    'scan.startup.mode' = 'earliest-offset',

    'format' = 'csv'


Create a mapping table kafka_source_province:

Flink SQL> CREATE TABLE kafka_source_province (

    pid int,

    p_name string

) WITH (

    'connector' = 'kafka',

    'topic' = 'province',

    'properties.bootstrap.servers' = '',

    'properties.group.id' = 'source_province',

    'scan.startup.mode' = 'earliest-offset',

    'format' = 'csv'


Sink part , establish Flink towards StarRocks Mapping table sink_province:

Flink SQL> CREATE TABLE sink_province (

   uid INT,

   p_id INT,

   p_name STRING,



   'connector' = 'starrocks',



   'database-name' = 'starrocks',

   'table-name' = 's_province',

   'username' = 'root',

   'password' = 'root',

   'sink.buffer-flush.interval-ms' = '5000',

   'sink.properties.column_separator' = '\x01',

   'sink.properties.row_delimiter' = '\x02'


1.4 Perform synchronization tasks

perform Flink SQL, Start synchronization task :

Flink SQL> insert into sink_province select b.uuid as uid, b.province_id as p_id, p.p_name from kafka_source_behavior b join kafka_source_province p on b.province_id = p.pid;

1.5 StarRocks View the data

[[email protected] ~]# mysql -h192.168.110.101 -P9030 -uroot –proot

mysql> use starrocks;

mysql> select * from s_province;


| uid        | p_id     | p_name  |


| 10003   |   61      | shaanxi         |

| 10002   |   11      | Beijing         |

| 10001   |   11      | Beijing         |


Scene two :Flink JDBC Read MySQL Data writing StarRocks

Use Flink JDBC Mode reading MySQL There are few real-time scenes of data , because JDBC Next Flink Only when the command is executed MySQL The data table , So it is more suitable for offline scenarios . Suppose there are complex MySQL data , We can do that Flink Middle distance regular task , To get the data after cleaning , Write after completion StarRocks.

This scenario is used jar Package has a :flink-connector-jdbc_2.11-1.13.5.jar and flink-connector-starrocks-1.1.14-snapshot_flink-1.13_2.11.jar.

2.1 MySQL Get ready

stay node02 Chinese landing MySQL:

[[email protected] ~]# mysql -uroot –proot

stay MySQL Create a table s_user:

mysql> use ODS;

mysql> CREATE TABLE `s_user` (

   `id` INT(11) NOT NULL,


   `p_id` INT(2) DEFAULT NULL,

   PRIMARY KEY (`id`)


insert data :

mysql> insert into s_user values(10086,'lm',61),(10010, 'ls',11), (10000,'ll',61);

2.2 StarRocks Get ready

go to node01, visit StarRocks:

[[email protected] ~]# mysql -h192.168.110.101 -P9030 -uroot -proot

stay StarRocks Create table s_user:

mysql> use starrocks;

mysql> CREATE TABLE IF NOT EXISTS starrocks.`s_user` (

   `id` int(10) NOT NULL COMMENT "",

   `name` varchar(20) NOT NULL COMMENT "",

   `p_id` INT(2) NULL COMMENT ""





"replication_num" = "1"


2.3 Flink Create a mapping table

start-up Flink( If the previous service is not stopped , You can skip here ):

[[email protected] bin]# ./start-cluster.sh

start-up sql-client:

[[email protected] bin]# ./sql-client.sh embedded

Source part , Create a map to MySQL Mapping table source_mysql_suser:

Flink SQL> CREATE TABLE source_mysql_suser (

   id INT,

   name STRING,

   p_id INT,



   'connector' = 'jdbc',

   'url' = 'jdbc:mysql://',

   'table-name' = 's_user',

   'username' = 'root',

   'password' = 'root'


Sink part , Create to StarRocks Mapping table sink_starrocks_suser:

Flink SQL> CREATE TABLE sink_starrocks_suser (

   id INT,

   name STRING,

   p_id INT,



   'connector' = 'starrocks',



   'database-name' = 'starrocks',

   'table-name' = 's_user',

   'username' = 'root',

   'password' = 'root',

   'sink.buffer-flush.interval-ms' = '5000',

   'sink.properties.column_separator' = '\x01',

   'sink.properties.row_delimiter' = '\x02'


2.4 Flink Clean the data and write StarRocks

Here is just a simple one where Screening , The actual business may be multi table join Complex scene of :

Flink SQL> insert into sink_starrocks_suser select id,name,p_id from source_mysql_suser where p_id = 61;

Data writing StarRocks after ,Flink The task is completed and ended . At this point, if we are right MySQL in s_user Add, delete or modify the data of the table ,Flink Will not perceive .

2.5 StarRocks View the data

[[email protected] ~]# mysql -h192.168.110.101 -P9030 -uroot –proot

mysql> use starrocks;

mysql> select * from s_user;


| id          | name   | p_id   |


| 10000   | ll          |   61    |

| 10086   | lm        |   61    |


Scene three :Flink Read StarRocks Data writing MySQL

Also use the MySQL s_user Table and StarRocks Of s_user surface , This time we will reverse the business process , Read StarRocks The data in is written to other business libraries , for example MySQL.

What we use here jar Package or :flink-connector-jdbc_2.11-1.13.5.jar and flink-connector-starrocks-1.1.14-snapshot_flink-1.13_2.11.jar( Public beta , Support Source).

Source For some, please refer to the forum description and git dev Branch :

flink-connector-source Function internal test package - Function use related - StarRocks Database Forum icon-default.png?t=M0H8https://forum.starrocks.com/t/topic/1179


3.1 Flink Create a mapping table

start-up Flink( If the previous service is not stopped , You can skip here ):

[[email protected] bin]# ./start-cluster.sh

start-up sql-client:

[[email protected] bin]# ./sql-client.sh embedded

Source part , establish StarRocks The mapping table source_starrocks_suser:

Flink SQL> CREATE TABLE source_starrocks_suser (

   id INT,

   name STRING,

   p_id INT


   'connector' = 'starrocks',



   'database-name' = 'starrocks',

   'table-name' = 's_user',

   'username' = 'root',

   'password' = 'root'


Sink part , Create to MySQL Mapping table sink_mysql_suser:

Flink SQL> CREATE TABLE sink_mysql_suser (

   id INT,

   name STRING,

   p_id INT,



   'connector' = 'jdbc',

   'url' = 'jdbc:mysql://',

   'table-name' = 's_user',

   'username' = 'root',

   'password' = 'root'


3.2 MySQL Get ready

stay node02 Chinese landing MySQL:

[[email protected] ~]# mysql -uroot –proot

Empty MySQL s_user Table data , Prepare for importing new data later :

mysql> use ODS;

mysql> truncate table s_user;

3.3 Flink Execute the import task

Here is a simple carding operation , The actual business may affect StarRocks Group or join Wait for processing and then import . stay node01 Of sql-client Execute the import task in :

Flink SQL> insert into sink_mysql_suser select id,name,p_id from source_starrocks_suser;

3.4 see MySQL data

mysql> select * from s_user;


| id          | name    | p_id  |


| 10000   | ll           |   61   |

| 10086   | lm         |   61   |


Scene 4 :Flink CDC Sync MySQL Data to StarRocks

Use in scenario 2 Flink JDBC To read MySQL Data time , We have explained JDBC The way is “ Disposable ” Import of , If we want Flink perception MySQL Data source changes , And realize data synchronization in near real time , You need to use Flink CDC.

CDC It's change data capture (Change Data Capture) Exchangeable Face Technology , It can put the source database (Source) Record of data changes , Synchronize to one or more data destinations (Sink). Intuitively speaking, when the data of the data source changes , adopt CDC It can make the data synchronization in the target database change ( Is limited to DML operation ).

Here we also use the front MySQL Of s_user Tables and StarRocks Of s_user Table .

Scenario 4 is needed here jar Package has a :flink-sql-connector-mysql-cdc-2.0.2.jar and flink-connector-starrocks-1.1.14-snapshot_flink-1.13_2.11.jar.

4.1 MySQL Get ready

First , stay node02 In Chinese, it means MySQL Turn on binlog( The format is ROW Pattern ):

[[email protected] ~]# vi /etc/my.cnf

Add... At the end of the configuration file :

log-bin=mysql-bin  # Turn on binlog

binlog-format=ROW # choice ROW Pattern

server_id=1       # To configure MySQL replaction

Save after exit , restart MySQL service :

[[email protected] ~]# systemctl restart mysqld

4.2 StarRocks Get ready

stay StarRocks Middle empty s_user Table data , It does not affect the subsequent synchronization tasks :

[[email protected] ~]# mysql -h192.168.110.101 -P9030 -uroot –proot

mysql> use starrocks;

mysql> truncate table s_user;

4.3 Flink Get ready

start-up Flink( If the previous service is not stopped , You can skip here ):

[[email protected] bin]# ./start-cluster.sh

start-up sql-client:

[[email protected] bin]# ./sql-client.sh embedded

Source part , establish MySQL The mapping table cdc_mysql_suser:

Flink SQL> CREATE TABLE cdc_mysql_suser (

   id INT,

   name STRING,

   p_id INT

) WITH (

   'connector' = 'mysql-cdc',

   'hostname' = '',

   'port' = '3306',

   'username' = 'root',

   'password' = 'root',

   'database-name' = 'ODS',


   'table-name' = 's_user'


Sink part , Create to StarRocks Of cdc_starrocks_suser:

Flink SQL> CREATE TABLE cdc_starrocks_suser (

   id INT,

   name STRING,

   p_id INT,



   'connector' = 'starrocks',



   'database-name' = 'starrocks',

   'table-name' = 's_user',

   'username' = 'root',

   'password' = 'root',

   'sink.buffer-flush.interval-ms' = '5000',

   'sink.properties.column_separator' = '\x01',

   'sink.properties.row_delimiter' = '\x02'


4.4 Perform synchronization tasks

Flink SQL> insert into cdc_starrocks_suser select id,name,p_id from cdc_mysql_suser;

Different from the JDBC, stay CDC scenario ,Flink SQL After execution, the synchronization task will continue , When MySQL Data changes in ,Flink Will quickly perceive , And synchronize the changes to StarRocks in .

4.5 Data observation

stay MySQL Observation data in the Library :

[[email protected] ~]# mysql -uroot –proot

mysql> use ODS;

mysql> select * from s_user;


| id          | name   | p_id   |


| 10000  | ll           |   61    |

| 10086  | lm         |   61    |


StarRocks Observation data in the Library

[[email protected] ~]# mysql -h192.168.110.101 -P9030 -uroot –proot

mysql> use starrocks;

mysql> select * from s_user;


| id          | name   | p_id   |


| 10000  | ll           |   61    |

| 10086  | lm         |   61    |


stay MySQL in , Add, delete and modify data :

mysql> INSERT INTO s_user VALUES(12345,'SR',61);

mysql> DELETE FROM s_user WHERE id = 10010;

mysql> UPDATE s_user SET `name`='No.1' WHERE id = 10086;

After completion , Direct view StarRocks The data in the table :

mysql> select * from s_user;


| id          | name   | p_id   |


| 12345   | SR       |   61    |

| 10086   | No.1    |  61     |


It can be confirmed that MySQL Increase of source table data 、 Data changes caused by modification and deletion , Can be synchronized to StarRocks In the target table .

Scene five : adopt CDC+SMT Realization MySQL Second level synchronization of multi table data

Scenario 4 is data synchronization for a single table , That method can only synchronize data , Cannot synchronize table structures , We need to create the corresponding table in the target library first , Then execute the synchronization task to synchronize the data . However, if more data tables need to be synchronized or the whole database needs to be synchronized , stay StarRocks It will be more troublesome to build tables one by one in , stay Flink Writing tasks one by one will also be relatively cumbersome .

In order to friendly solve the problem of multi table synchronization ,StarRocks Released StarRocks-migrate-tools( abbreviation smt) Tools , To quickly generate StarRocks Table structure and Flink-SQL Mapping table and synchronization statement .Smt Now it can be used for MySQL、PostgreSQL、Oracle and hive, The synchronization of the last three databases is still in public beta , Let's start with MySQL To demonstrate , follow-up Release After the version is released, it will be supplemented one by one .

StarRocks-migrate-tools Download address :


Official website operation introduction document :

Design background @ Flink-connector-starrocks @ StarRocks Docsicon-default.png?t=M0H8https://docs.starrocks.com/zh-cn/main/loading/Flink-connector-starrocks#%E4%BD%BF%E7%94%A8-flink-connector-%E5%86%99%E5%85%A5%E5%AE%9E%E7%8E%B0-mysql-%E6%95%B0%E6%8D%AE%E5%90%8C%E6%AD%A5

5.1 MySQL Get ready

We are node02 Enabled binlog Of MySQL Create database CDC, And create tables in it departments and jobs, Import a small amount of data after creation .

stay node02 Log on MySQL:

[[email protected] ~]# mysql -uroot –proot

Create table departments:


mysql> USE CDC;

mysql> CREATE TABLE `departments` (

   `department_id` int(4) NOT NULL AUTO_INCREMENT,

   `department_name` varchar(3) DEFAULT NULL,

   `manager_id` int(6) DEFAULT NULL,

   `location_id` int(4) DEFAULT NULL,

   PRIMARY KEY (`department_id`)


Is a table departments insert data :

mysql> insert  into `departments`(`department_id`,`department_name`,`manager_id`,`location_id`) values (10,'Adm',200,1700),(20,'Mar',201,1800),(30,'Pur',114,1700),(40,'Hum',203,2400),(50,'Shi',121,1500),(60,'IT',103,1400),(70,'Pub',204,2700),(80,'Sal',145,2500),(90,'Exe',100,1700),(100,'Fin',108,1700),(110,'Acc',205,1700),(120,'Tre',NULL,1700),(130,'Cor',NULL,1700),(140,'Con',NULL,1700),(150,'Sha',NULL,1700),(160,'Ben',NULL,1700),(170,'Man',NULL,1700),(180,'Con',NULL,1700),(190,'Con',NULL,1700),(200,'Ope',NULL,1700),(210,'IT ',NULL,1700),(220,'NOC',NULL,1700),(230,'IT ',NULL,1700),(240,'Gov',NULL,1700),(250,'Ret',NULL,1700),(260,'Rec',NULL,1700),(270,'Pay',NULL,1700);

Create table jobs:

mysql> CREATE TABLE `jobs` (

   `job_id` varchar(10) NOT NULL,

   `job_title` varchar(35) DEFAULT NULL,

   `min_salary` int(6) DEFAULT NULL,

   `max_salary` int(6) DEFAULT NULL,

   PRIMARY KEY (`job_id`)


Is a table jobs insert data :

mysql> insert  into `jobs`(`job_id`,`job_title`,`min_salary`,`max_salary`) values ('AC_ACCOUNT','Public Accountant',4200,9000),('AC_MGR','Accounting Manager',8200,16000),('AD_ASST','Administration Assistant',3000,6000),('AD_PRES','President',20000,40000),('AD_VP','Administration Vice President',15000,30000),('FI_ACCOUNT','Accountant',4200,9000),('FI_MGR','Finance Manager',8200,16000),('HR_REP','Human Resources Representative',4000,9000),('IT_PROG','Programmer',4000,10000),('MK_MAN','Marketing Manager',9000,15000),('MK_REP','Marketing Representative',4000,9000),('PR_REP','Public Relations Representative',4500,10500),('PU_CLERK','Purchasing Clerk',2500,5500),('PU_MAN','Purchasing Manager',8000,15000),('SA_MAN','Sales Manager',10000,20000),('SA_REP','Sales Representative',6000,12000),('SH_CLERK','Shipping Clerk',2500,5500),('ST_CLERK','Stock Clerk',2000,5000),('ST_MAN','Stock Manager',5500,8500);

5.2 To configure SMT Tools

download smt Tools , Modify the configuration file after decompression :

[[email protected] smt]# vi conf/config_prod.conf

First configuration MySQL part :


host =  #MySQL Server IP

port = 3306  #MySQL Service port

user = root  # user name

password = root  # password

# currently available types: `mysql`, `pgsql`, `oracle`, `hive`

type = mysql  # Type selection MySQL, at present PostgreSQLOracle and Hive In public beta

# # only takes effect on `type == hive`.

# # Available values: kerberos, none, nosasl, kerberos_http, none_http, zk, ldap

# authentication = kerberos


# number of backends in StarRocks

be_num = 1  # To configure StarRocks BE Of nodes , In order to generate more reasonable bucket Number of statement creation

# `decimal_v3` is supported since StarRocks-1.18.1

use_decimal_v3 = true  # Use higher precision Decimal type ,1.18 Later versions support

# file to save the converted DDL SQL

output_dir = ./result  # Subsequent generation sql File storage directory

# !!!`database` `table` `schema` are case sensitive in `oracle`!!!


# pattern to match databases for setting properties

# !!! database should be a `whole instance(or pdb) name` but not a regex when it comes with an `oracle db` !!!

database = CDC  # Configure the database that needs to be synchronized , You need to write regular expressions

# pattern to match tables for setting properties

table = departments|jobs  # Configure the tables that need to be synchronized , You need to write regular expressions

# `schema` only takes effect on `postgresql` and `oracle`

schema = ^public$  # Sync MySQL Don't worry about this

To configure StarRocks Cluster information :


### flink sink configurations  # This part is related to Flink Sink Part of the writing is similar

### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated






flink.starrocks.sink.properties.format=json  # With json Format save batch

flink.starrocks.sink.properties.strip_outer_array=true  # Expand to array

flink.starrocks.sink.buffer-flush.interval-ms=10000  # Save batch 10 Import every second

# # used to set the server-id for mysql-cdc jobs instead of using a random server-id

# flink.cdc.server-id = 5000

5.3 SMT Tool use

After the previous configuration , perform smt Tools :

[[email protected] smt]# ./starrocks-migrate-tool

It will be configured in ./result Generate... Under path sql Statement file :

[[email protected] result]# ll

total 24

-rw-r--r-- 1 root root 2229 Jan 19 21:48 flink-create.1.sql

-rw-r--r-- 1 root root 2229 Jan 19 21:48 flink-create.all.sql

-rw-r--r-- 1 root root  732 Jan 19 21:48 starrocks-create.1.sql

-rw-r--r-- 1 root root  732 Jan 19 21:48 starrocks-create.all.sql

-rw-r--r-- 1 root root  838 Jan 19 21:48 starrocks-external-create.1.sql

-rw-r--r-- 1 root root  838 Jan 19 21:48 starrocks-external-create.all.sql

5.4 Generate Flink table And start synchronizing

[[email protected] ~]# mysql -h192.168.110.101 -P9030 -uroot -proot < /opt/module/smt/result/starrocks-create.all.sql

[[email protected] bin]# ./sql-client.sh -f /opt/module/smt/result/flink-create.all.sql

5.5 Observe the status of the task

[[email protected] bin]# ./flink list

Waiting for response...

------------------ Running/Restarting Jobs -------------------

19.01.2022 21:55:30 : 80c4e81de2d0d7e34c8f1aac1c22a8c4 : insert-into_default_catalog.CDC.departments_sink (RUNNING)

19.01.2022 21:55:34 : b2b76afe7d33196a09a274142d9128cf : insert-into_default_catalog.CDC.jobs_sink (RUNNING)

5.6 Data observation

There is no more demonstration of changing data , The same as in scenario 4 , When the data in the data source changes ,StarRocks The data in will also change synchronously , Realize near real-time synchronization of data .

This scenario is particularly suitable for data synchronization of dimension tables , Because the current StarRocks Not yet update grammar , We can put the dimension table whose data needs to be updated frequently in MySQL in , Use Flink CDC+SMT In real time StarRocks Data synchronization in , Realize flexible multi table Association query .


本文为[Driftwood follows the wind]所创,转载请带上原文链接,感谢
