当前位置:网站首页>【TiDB】TiCDC canal_ Practical application of JSON
【TiDB】TiCDC canal_ Practical application of JSON
2022-06-30 18:42:00 【Fish is not fish】
Background knowledge
Before I begin, I will briefly introduce two things to you :
1. current ` Cache and DB Uniformity ` The implementation architecture of :
The basic process is shown in the figure :
MySQL Additions and deletions --> Canal( camouflage slave) Get changes –> kafka receive topic write in --> api consumption kafka topic Get changes --> Invalid cache
2.Canal
Canal It is a tool for synchronization based on log analysis and change developed by Alibaba in order to meet the needs of data synchronization business between two foreign computer rooms , Thus, a large number of incremental database subscription and consumption businesses are derived .
Official document address :otter
Architecture diagram :![[ Failed to transfer the external chain picture , The origin station may have anti-theft chain mechanism , It is suggested to save the pictures and upload them directly (img-Ja2KElxg-1656325050614)(<https://tidb-blog.oss-cn-beijing.aliyuncs.com/media/ screenshots 2022-06-02 Afternoon 3.11.11-1654153891834.png>)]](/img/58/3322a45cae8c0d2be75214964cfda4.png)
In short canal Just pretend to be MySQL Of slave, Then it looks like master-slave synchronization , To get changes .
canal The usage scenarios of are generally 2 Species :
1. cache DB Consistency implementation .
2. Synchronize data to the data warehouse platform .
TiCDC Canal-JSON
Canal-JSON What is it? ?
Canal-JSON In fact, it is a data exchange format protocol defined by Alibaba , In itself is for MySQL The design of the .
Why use TiCDC Canal-JSON agreement ?
The reason is that our database was in MySQL Running up , Then recently MySQL Moved to TiDB above , To ensure the consistency of the previous cache, the business logic is implemented , and canal And does not support TiDB, So we need a replacement canal Component to get TiDB And write to the backend Message Queue.
After my research ,TiDB There are two ways to achieve our needs :
1. Yes, it is TiDB binlog Tools can write changes to MQ.
2. Yes, it is TiCDC Create a synchronization task , The specified data format is Canal-JSON style .
And after checking TICDC canal_json and canal The implementation comparison is as follows , See the official documents for more information :TiCDC Canal-JSON Protocol
| Difference | TiCDC | Canal |
|---|---|---|
| update | old The block contains the original data of all the old Columns | old The block contains only the data of the column being updated |
| data type | There are no type parameters , such as char(16) Display only char | and canal Will be displayed char(16) |
| commit_ts Unique identification | sink_uri enable-tidb-extension When on, it will display More than a _tidb Field shows commit_ts | nothing |
| delete type | v5.4 Before ,old and data Same content v5.4 after ,old by null | nothing |
So after the above research , We finally decided to use the second TiCDC The way , reason 2 spot :
canal and TiCDC canal_json Most implementations are the same , Small differences .
Development friendly , Our development was right before canal Familiar with , Can be very convenient and quick to start , Change less code .
TiCDC canal_json How to use ?
First of all, a very critical point ,canal_json Or opportunities TiCDC Realized , So we must have TiCDC colony .
The following operations are based on TiDB 6.0 edition .
1. First create a profile
The reason for creating the configuration file is that we do not actually need all the tables under a certain business library , There are only some important hot spots in the table .
because canal You can configure the white list and blacklist in the configuration file , as follows
# White list , Said is dbname All tables in the database
canal.instance.filter.regex=dbname\\..*
# The blacklist , Avoid synchronizing useless incremental data
canal.instance.filter.black.regex=
TiCDC It is impossible to directly specify which tables of those libraries we need to synchronize , So we need to specify... In the configuration file
# For example, here we only need to synchronize this one TiDB Clustered ticdc_canal_test Table of database , Everything else is out of sync
$ cat /root/ticdc_canal_test.toml
[filter]
rules = ['ticdc_canal_test.*']
Here is just a simple demonstration , For more detailed filtering rules, you can refer to the official documents : Configuration file configuration
2. Create a synchronization task
After configuration, we can start to configure the synchronization task , First, learn about some common parameters :
--changefeed-id Mission ID
--sink-uri Address downstream of synchronization task , At present, we support mysql/tidb/kafka/pulsar
--start-ts Where the synchronization task starts , Here is ts, The default creation task current time , Be similar to mysql Set up the starting file point for synchronization
--target-ts Sync task end location , The default is empty. , That is, always synchronize
--config Specify profile , Filter rules these
Once we know what this means, we can start creating a task :
#1. Create a sync to kafka The synchronization task of
$ cdc cli changefeed create --pd=http://pd_ip:42379 --changefeed-id="ticdc-canal-json-test" --config="/root/ticdc_canal_test.toml" --sink-uri="kafka://kafka_ip:9092/ticdc_canal_test_topic?kafka-version=2.6.0&protocol=canal-json&enable-tidb-extension=true"
[2022/06/02 16:23:27.994 +08:00] [WARN] [kafka.go:451] ["broker's `message.max.bytes` less than the `max-message-bytes`,use broker's `message.max.bytes` to initialize the Kafka producer"] [message.max.bytes=1048588] [max-message-bytes=10485760]
[2022/06/02 16:23:27.994 +08:00] [WARN] [kafka.go:461] ["partition-num is not set, use the default partition count"] [topic=ticdc_canal_test_topic] [partitions=3]
Create changefeed successfully!
ID: ticdc-canal-json-test
Info: {
"sink-uri":"kafka://kafka_ip:9092/ticdc_canal_test_topic?kafka-version=2.6.0\u0026protocol=canal-json\u0026enable-tidb-extension=true","opts":{
},"create-time":"2022-06-02T16:23:27.974959914+08:00","start-ts":433627649254096897,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{
"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"filter":{
"rules":["ticdc_canal_test.*"],"ignore-txn-start-ts":null},"mounter":{
"worker-num":16},"sink":{
"dispatchers":null,"protocol":"canal-json","column-selectors":null},"cyclic-replication":{
"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{
"type":"table-number","polling-time":-1},"consistent":{
"level":"none","max-log-size":64,"flush-interval":1000,"storage":""}},"state":"normal","error":null,"sync-point-enabled":false,"sync-point-interval":600000000000,"creator-version":"v6.0.0"}
#2. Check the task status just now , You can see the details
$ cdc cli changefeed query --pd=http://pd_ip:42379 --changefeed-id="ticdc-canal-json-test"
{
"info": {
"sink-uri": "kafka://kafka_ip:9092/ticdc_canal_test_topic?kafka-version=2.6.0\u0026protocol=canal-json\u0026enable-tidb-extension=true",
"opts": {
},
"create-time": "2022-06-02T16:23:27.974959914+08:00",
"start-ts": 433627649254096897,
"target-ts": 0,
"admin-job-type": 0,
"sort-engine": "unified",
"sort-dir": "",
"config": {
"case-sensitive": true,
"enable-old-value": true,
"force-replicate": false,
"check-gc-safe-point": true,
"filter": {
"rules": [
"ticdc_canal_test.*"
],
"ignore-txn-start-ts": null
},
"mounter": {
"worker-num": 16
},
"sink": {
"dispatchers": null,
"protocol": "canal-json",
"column-selectors": null
},
"cyclic-replication": {
"enable": false,
"replica-id": 0,
"filter-replica-ids": null,
"id-buckets": 0,
"sync-ddl": false
},
"scheduler": {
"type": "table-number",
"polling-time": -1
},
"consistent": {
"level": "none",
"max-log-size": 64,
"flush-interval": 1000,
"storage": ""
}
},
"state": "normal",
"error": null,
"sync-point-enabled": false,
"sync-point-interval": 600000000000,
"creator-version": "v6.0.0"
},
"status": {
"resolved-ts": 433627686504759297,
"checkpoint-ts": 433627686504759297,
"admin-job-type": 0
},
"count": 0,
"task-status": [
{
"capture-id": "48f5f942-025d-48c3-a7c3-e06c70334ef2",
"status": {
"tables": null,
"operation": null,
"admin-job-type": 0
}
}
]
}
3. Database operation changes
MySQL [(none)]> use ticdc_canal_test
Database changed
MySQL [ticdc_canal_test]> show tables;
Empty set (0.00 sec)
MySQL [ticdc_canal_test]> CREATE TABLE `jiawei_test2` (
-> `id` int(11) NOT NULL,
-> `name` varchar(10) COLLATE utf8mb4_general_ci DEFAULT NULL,
-> PRIMARY KEY (`id`));
Query OK, 0 rows affected (0.09 sec)
MySQL [ticdc_canal_test]> show create table jiawei_test2
-> ;
+--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table |
+--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| jiawei_test2 | CREATE TABLE `jiawei_test2` (
`id` int(11) NOT NULL,
`name` varchar(10) COLLATE utf8mb4_general_ci DEFAULT NULL,
PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin |
+--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)
MySQL [ticdc_canal_test]> insert into jiawei_test2(id,name) values(1,'numer1'),(2,'number2'),(3,'number3');
Query OK, 3 rows affected (0.01 sec)
Records: 3 Duplicates: 0 Warnings: 0
MySQL [ticdc_canal_test]> select * from jiawei_test2;
+----+---------+
| id | name |
+----+---------+
| 1 | numer1 |
| 2 | number2 |
| 3 | number3 |
+----+---------+
3 rows in set (0.01 sec)
MySQL [ticdc_canal_test]> update jiawei_test2 set name='tidb';
Query OK, 3 rows affected (0.01 sec)
Rows matched: 3 Changed: 3 Warnings: 0
MySQL [ticdc_canal_test]> select * from jiawei_test2;
+----+------+
| id | name |
+----+------+
| 1 | tidb |
| 2 | tidb |
| 3 | tidb |
+----+------+
3 rows in set (0.01 sec)
MySQL [ticdc_canal_test]> delete from jiawei_test2 where id=1;
Query OK, 1 row affected (0.01 sec)
MySQL [ticdc_canal_test]> select * from jiawei_test2;
+----+------+
| id | name |
+----+------+
| 2 | tidb |
| 3 | tidb |
+----+------+
2 rows in set (0.00 sec)
MySQL [ticdc_canal_test]> alter table jiawei_test2 add column c1 int;
Query OK, 0 rows affected (0.27 sec)
MySQL [ticdc_canal_test]> select * from jiawei_test2;
+----+------+------+
| id | name | c1 |
+----+------+------+
| 2 | tidb | NULL |
| 3 | tidb | NULL |
+----+------+------+
2 rows in set (0.00 sec)
MySQL [ticdc_canal_test]> alter table jiawei_test2 add index idx_name(name);
Query OK, 0 rows affected (2.80 sec)
MySQL [ticdc_canal_test]> alter table jiawei_test2 drop column c1;
Query OK, 0 rows affected (0.28 sec)
MySQL [ticdc_canal_test]> select * from jiawei_test2;
+----+------+
| id | name |
+----+------+
| 2 | tidb |
| 3 | tidb |
+----+------+
2 rows in set (0.01 sec)
4. see topic Content
# Test the consumption information
$ ./bin/kafka-console-consumer.sh --bootstrap-server kafka_ip:9092 --topic test_ticdc_canal_json --from-beginning
# Create table messages
{
"id":0,"database":"ticdc_canal_json_test","table":"jiawei_test2","pkNames":null,"isDdl":true,"type":"CREATE","es":1654163280810,"ts":1654163282102,"sql":"CREATE TABLE `jiawei_test2` (`id` INT(11) NOT NULL,`name` VARCHAR(10) COLLATE utf8mb4_general_ci DEFAULT NULL,PRIMARY KEY(`id`))","sqlType":null,"mysqlType":null,"data":null,"old":null}
# Insert data message
{
"id":0,"database":"ticdc_canal_json_test","table":"jiawei_test2","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1654163303159,"ts":1654163305460,"sql":"","sqlType":{
"id":4,"name":12},"mysqlType":{
"id":"int","name":"varchar"},"data":[{
"id":"1","name":"numer1"}],"old":null}
{
"id":0,"database":"ticdc_canal_json_test","table":"jiawei_test2","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1654163303159,"ts":1654163305461,"sql":"","sqlType":{
"id":4,"name":12},"mysqlType":{
"id":"int","name":"varchar"},"data":[{
"id":"2","name":"number2"}],"old":null}
{
"id":0,"database":"ticdc_canal_json_test","table":"jiawei_test2","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1654163303159,"ts":1654163305461,"sql":"","sqlType":{
"id":4,"name":12},"mysqlType":{
"id":"int","name":"varchar"},"data":[{
"id":"3","name":"number3"}],"old":null}
# Update data messages , You can see old Yes contains the original values of all columns
{
"id":0,"database":"ticdc_canal_json_test","table":"jiawei_test2","pkNames":["id"],"isDdl":false,"type":"UPDATE","es":1654163312959,"ts":1654163314469,"sql":"","sqlType":{
"id":4,"name":12},"mysqlType":{
"id":"int","name":"varchar"},"data":[{
"id":"1","name":"tidb"}],"old":[{
"id":"1","name":"numer1"}]}
{
"id":0,"database":"ticdc_canal_json_test","table":"jiawei_test2","pkNames":["id"],"isDdl":false,"type":"UPDATE","es":1654163312959,"ts":1654163314469,"sql":"","sqlType":{
"id":4,"name":12},"mysqlType":{
"id":"int","name":"varchar"},"data":[{
"id":"2","name":"tidb"}],"old":[{
"id":"2","name":"number2"}]}
{
"id":0,"database":"ticdc_canal_json_test","table":"jiawei_test2","pkNames":["id"],"isDdl":false,"type":"UPDATE","es":1654163312959,"ts":1654163314470,"sql":"","sqlType":{
"id":4,"name":12},"mysqlType":{
"id":"int","name":"varchar"},"data":[{
"id":"3","name":"tidb"}],"old":[{
"id":"3","name":"number3"}]}
# Delete data information , It can be seen that 6.0 Version of old yes null
{
"id":0,"database":"ticdc_canal_json_test","table":"jiawei_test2","pkNames":["id"],"isDdl":false,"type":"DELETE","es":1654163329309,"ts":1654163330485,"sql":"","sqlType":{
"id":4,"name":12},"mysqlType":{
"id":"int","name":"varchar"},"data":[{
"id":"1","name":"tidb"}],"old":null}
#DDL test , Add index , Add or delete columns
{
"id":0,"database":"ticdc_canal_json_test","table":"jiawei_test2","pkNames":null,"isDdl":true,"type":"ALTER","es":1654163338309,"ts":1654163340302,"sql":"ALTER TABLE `jiawei_test2` ADD COLUMN `c1` INT","sqlType":null,"mysqlType":null,"data":null,"old":null}
{
"id":0,"database":"ticdc_canal_json_test","table":"jiawei_test2","pkNames":null,"isDdl":true,"type":"CINDEX","es":1654163357509,"ts":1654163360101,"sql":"ALTER TABLE `jiawei_test2` ADD INDEX `idx_name`(`name`)","sqlType":null,"mysqlType":null,"data":null,"old":null}
{
"id":0,"database":"ticdc_canal_json_test","table":"jiawei_test2","pkNames":null,"isDdl":true,"type":"ALTER","es":1654163366509,"ts":1654163369502,"sql":"ALTER TABLE `jiawei_test2` DROP COLUMN `c1`","sqlType":null,"mysqlType":null,"data":null,"old":null}
matters needing attention
It is recommended not to open during synchronization enable-tidb-extension This option , If this extra field is not required , Because this will generate a lot of WATERMARK Event, It is inconvenient for us to observe and consume
![[ Failed to transfer the external chain picture , The origin station may have anti-theft chain mechanism , It is suggested to save the pictures and upload them directly (img-Ly5KBPXp-1656325050615)(<https://tidb-blog.oss-cn-beijing.aliyuncs.com/media/ screenshots 2022-06-02 Afternoon 5.24.36-1654164050826.png>)]](/img/11/26f3b66c794129378d93a91bfe696f.png)
summary
At present, we have been running on the project for a long time , Very stable , Here is a brief introduction to the basic usage ,
Want more partition operations ,canal Supported by ,TiCDC canal_json Basically supported .
For the following, please refer to the official document for more detailed parameter settings : Parameter setting
边栏推荐
- Hcip (Huawei Senior Network Security Engineer) (Experiment 8) (MPLS basic experiment)
- Digital intelligent supplier management system solution for coal industry: data driven, supplier intelligent platform helps enterprises reduce costs and increase efficiency
- EasyNVR平台设备通道均在线,操作出现“网络请求失败”是什么原因?
- 又一篇CVPR 2022论文被指抄袭,平安保险研究者控诉IBM苏黎世团队
- 4个技巧告诉你,如何使用SMS促进业务销售?
- The new Post-00 Software Test Engineer in 2022 is a champion
- In distributed scenarios, do you know how to generate unique IDs?
- AI首席架构师10-AICA-蓝翔 《飞桨框架设计与核心技术》
- Switching routing (VLAN) experiment
- ForkJoinPool
猜你喜欢

Redis (II) -- persistence

小程序容器技术,促进园区运营效率提升

Redis (VIII) - enterprise level solution (I)

Redis - persistent RDB and persistent AOF

英飞凌--GTM架构-Generic Timer Module

Switching routing (VLAN) experiment

What if the apple watch fails to power on? Apple watch can not boot solution!

分布式场景下,你知道有几种生成唯一ID的方式嘛?

Redis (V) - advanced data types

Multipass中文文档-设置图形界面
随机推荐
[PROJECT] Xiaomao school (IX)
MRO工业品采购管理系统:赋能MRO企业采购各节点,构建数字化采购新体系
uni-app进阶之自定义【day13】
[cloud resident co creation] Huawei iconnect enables IOT terminals to connect at one touch
清华只能排第3?2022软科中国大学AI专业排名发布
基于SSH的网上商城设计
The online procurement system of the electronic components industry accurately matches the procurement demand and leverages the digital development of the electronic industry
Elastic 8.0: opening a new era of speed, scale, relevance and simplicity
Talk about the SQL server version of DTM sub transaction barrier function
Rhai 脚本引擎的简单应用示例
How to do a good job in software system demand research? Seven weapons make it easy for you to do it
Redis - persistent RDB and persistent AOF
EasyNVR平台设备通道均在线,操作出现“网络请求失败”是什么原因?
MySQL cannot find mysql Temporary solution of sock file
MRO industrial products procurement management system: enable MRO enterprise procurement nodes to build a new digital procurement system
Switching routing (VLAN) experiment
秉持'家在中国'理念 2022 BMW儿童交通安全训练营启动
【TiDB】TiCDC canal_json的实际应用
OneFlow源码解析:算子签名的自动推断
Countdowncatch and completabilefuture and cyclicbarrier