当前位置:网站首页>【TiDB】TiCDC canal_ Practical application of JSON
【TiDB】TiCDC canal_ Practical application of JSON
2022-06-30 18:42:00 【Fish is not fish】
Background knowledge
Before I begin, I will briefly introduce two things to you :
1. current ` Cache and DB Uniformity ` The implementation architecture of :
The basic process is shown in the figure :
MySQL Additions and deletions --> Canal( camouflage slave) Get changes –> kafka receive topic write in --> api consumption kafka topic Get changes --> Invalid cache
2.Canal
Canal It is a tool for synchronization based on log analysis and change developed by Alibaba in order to meet the needs of data synchronization business between two foreign computer rooms , Thus, a large number of incremental database subscription and consumption businesses are derived .
Official document address :otter
Architecture diagram :![[ Failed to transfer the external chain picture , The origin station may have anti-theft chain mechanism , It is suggested to save the pictures and upload them directly (img-Ja2KElxg-1656325050614)(<https://tidb-blog.oss-cn-beijing.aliyuncs.com/media/ screenshots 2022-06-02 Afternoon 3.11.11-1654153891834.png>)]](/img/58/3322a45cae8c0d2be75214964cfda4.png)
In short canal Just pretend to be MySQL Of slave, Then it looks like master-slave synchronization , To get changes .
canal The usage scenarios of are generally 2 Species :
1. cache DB Consistency implementation .
2. Synchronize data to the data warehouse platform .
TiCDC Canal-JSON
Canal-JSON What is it? ?
Canal-JSON In fact, it is a data exchange format protocol defined by Alibaba , In itself is for MySQL The design of the .
Why use TiCDC Canal-JSON agreement ?
The reason is that our database was in MySQL Running up , Then recently MySQL Moved to TiDB above , To ensure the consistency of the previous cache, the business logic is implemented , and canal And does not support TiDB, So we need a replacement canal Component to get TiDB And write to the backend Message Queue.
After my research ,TiDB There are two ways to achieve our needs :
1. Yes, it is TiDB binlog Tools can write changes to MQ.
2. Yes, it is TiCDC Create a synchronization task , The specified data format is Canal-JSON style .
And after checking TICDC canal_json and canal The implementation comparison is as follows , See the official documents for more information :TiCDC Canal-JSON Protocol
| Difference | TiCDC | Canal |
|---|---|---|
| update | old The block contains the original data of all the old Columns | old The block contains only the data of the column being updated |
| data type | There are no type parameters , such as char(16) Display only char | and canal Will be displayed char(16) |
| commit_ts Unique identification | sink_uri enable-tidb-extension When on, it will display More than a _tidb Field shows commit_ts | nothing |
| delete type | v5.4 Before ,old and data Same content v5.4 after ,old by null | nothing |
So after the above research , We finally decided to use the second TiCDC The way , reason 2 spot :
canal and TiCDC canal_json Most implementations are the same , Small differences .
Development friendly , Our development was right before canal Familiar with , Can be very convenient and quick to start , Change less code .
TiCDC canal_json How to use ?
First of all, a very critical point ,canal_json Or opportunities TiCDC Realized , So we must have TiCDC colony .
The following operations are based on TiDB 6.0 edition .
1. First create a profile
The reason for creating the configuration file is that we do not actually need all the tables under a certain business library , There are only some important hot spots in the table .
because canal You can configure the white list and blacklist in the configuration file , as follows
# White list , Said is dbname All tables in the database
canal.instance.filter.regex=dbname\\..*
# The blacklist , Avoid synchronizing useless incremental data
canal.instance.filter.black.regex=
TiCDC It is impossible to directly specify which tables of those libraries we need to synchronize , So we need to specify... In the configuration file
# For example, here we only need to synchronize this one TiDB Clustered ticdc_canal_test Table of database , Everything else is out of sync
$ cat /root/ticdc_canal_test.toml
[filter]
rules = ['ticdc_canal_test.*']
Here is just a simple demonstration , For more detailed filtering rules, you can refer to the official documents : Configuration file configuration
2. Create a synchronization task
After configuration, we can start to configure the synchronization task , First, learn about some common parameters :
--changefeed-id Mission ID
--sink-uri Address downstream of synchronization task , At present, we support mysql/tidb/kafka/pulsar
--start-ts Where the synchronization task starts , Here is ts, The default creation task current time , Be similar to mysql Set up the starting file point for synchronization
--target-ts Sync task end location , The default is empty. , That is, always synchronize
--config Specify profile , Filter rules these
Once we know what this means, we can start creating a task :
#1. Create a sync to kafka The synchronization task of
$ cdc cli changefeed create --pd=http://pd_ip:42379 --changefeed-id="ticdc-canal-json-test" --config="/root/ticdc_canal_test.toml" --sink-uri="kafka://kafka_ip:9092/ticdc_canal_test_topic?kafka-version=2.6.0&protocol=canal-json&enable-tidb-extension=true"
[2022/06/02 16:23:27.994 +08:00] [WARN] [kafka.go:451] ["broker's `message.max.bytes` less than the `max-message-bytes`,use broker's `message.max.bytes` to initialize the Kafka producer"] [message.max.bytes=1048588] [max-message-bytes=10485760]
[2022/06/02 16:23:27.994 +08:00] [WARN] [kafka.go:461] ["partition-num is not set, use the default partition count"] [topic=ticdc_canal_test_topic] [partitions=3]
Create changefeed successfully!
ID: ticdc-canal-json-test
Info: {
"sink-uri":"kafka://kafka_ip:9092/ticdc_canal_test_topic?kafka-version=2.6.0\u0026protocol=canal-json\u0026enable-tidb-extension=true","opts":{
},"create-time":"2022-06-02T16:23:27.974959914+08:00","start-ts":433627649254096897,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{
"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"filter":{
"rules":["ticdc_canal_test.*"],"ignore-txn-start-ts":null},"mounter":{
"worker-num":16},"sink":{
"dispatchers":null,"protocol":"canal-json","column-selectors":null},"cyclic-replication":{
"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{
"type":"table-number","polling-time":-1},"consistent":{
"level":"none","max-log-size":64,"flush-interval":1000,"storage":""}},"state":"normal","error":null,"sync-point-enabled":false,"sync-point-interval":600000000000,"creator-version":"v6.0.0"}
#2. Check the task status just now , You can see the details
$ cdc cli changefeed query --pd=http://pd_ip:42379 --changefeed-id="ticdc-canal-json-test"
{
"info": {
"sink-uri": "kafka://kafka_ip:9092/ticdc_canal_test_topic?kafka-version=2.6.0\u0026protocol=canal-json\u0026enable-tidb-extension=true",
"opts": {
},
"create-time": "2022-06-02T16:23:27.974959914+08:00",
"start-ts": 433627649254096897,
"target-ts": 0,
"admin-job-type": 0,
"sort-engine": "unified",
"sort-dir": "",
"config": {
"case-sensitive": true,
"enable-old-value": true,
"force-replicate": false,
"check-gc-safe-point": true,
"filter": {
"rules": [
"ticdc_canal_test.*"
],
"ignore-txn-start-ts": null
},
"mounter": {
"worker-num": 16
},
"sink": {
"dispatchers": null,
"protocol": "canal-json",
"column-selectors": null
},
"cyclic-replication": {
"enable": false,
"replica-id": 0,
"filter-replica-ids": null,
"id-buckets": 0,
"sync-ddl": false
},
"scheduler": {
"type": "table-number",
"polling-time": -1
},
"consistent": {
"level": "none",
"max-log-size": 64,
"flush-interval": 1000,
"storage": ""
}
},
"state": "normal",
"error": null,
"sync-point-enabled": false,
"sync-point-interval": 600000000000,
"creator-version": "v6.0.0"
},
"status": {
"resolved-ts": 433627686504759297,
"checkpoint-ts": 433627686504759297,
"admin-job-type": 0
},
"count": 0,
"task-status": [
{
"capture-id": "48f5f942-025d-48c3-a7c3-e06c70334ef2",
"status": {
"tables": null,
"operation": null,
"admin-job-type": 0
}
}
]
}
3. Database operation changes
MySQL [(none)]> use ticdc_canal_test
Database changed
MySQL [ticdc_canal_test]> show tables;
Empty set (0.00 sec)
MySQL [ticdc_canal_test]> CREATE TABLE `jiawei_test2` (
-> `id` int(11) NOT NULL,
-> `name` varchar(10) COLLATE utf8mb4_general_ci DEFAULT NULL,
-> PRIMARY KEY (`id`));
Query OK, 0 rows affected (0.09 sec)
MySQL [ticdc_canal_test]> show create table jiawei_test2
-> ;
+--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table |
+--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| jiawei_test2 | CREATE TABLE `jiawei_test2` (
`id` int(11) NOT NULL,
`name` varchar(10) COLLATE utf8mb4_general_ci DEFAULT NULL,
PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin |
+--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)
MySQL [ticdc_canal_test]> insert into jiawei_test2(id,name) values(1,'numer1'),(2,'number2'),(3,'number3');
Query OK, 3 rows affected (0.01 sec)
Records: 3 Duplicates: 0 Warnings: 0
MySQL [ticdc_canal_test]> select * from jiawei_test2;
+----+---------+
| id | name |
+----+---------+
| 1 | numer1 |
| 2 | number2 |
| 3 | number3 |
+----+---------+
3 rows in set (0.01 sec)
MySQL [ticdc_canal_test]> update jiawei_test2 set name='tidb';
Query OK, 3 rows affected (0.01 sec)
Rows matched: 3 Changed: 3 Warnings: 0
MySQL [ticdc_canal_test]> select * from jiawei_test2;
+----+------+
| id | name |
+----+------+
| 1 | tidb |
| 2 | tidb |
| 3 | tidb |
+----+------+
3 rows in set (0.01 sec)
MySQL [ticdc_canal_test]> delete from jiawei_test2 where id=1;
Query OK, 1 row affected (0.01 sec)
MySQL [ticdc_canal_test]> select * from jiawei_test2;
+----+------+
| id | name |
+----+------+
| 2 | tidb |
| 3 | tidb |
+----+------+
2 rows in set (0.00 sec)
MySQL [ticdc_canal_test]> alter table jiawei_test2 add column c1 int;
Query OK, 0 rows affected (0.27 sec)
MySQL [ticdc_canal_test]> select * from jiawei_test2;
+----+------+------+
| id | name | c1 |
+----+------+------+
| 2 | tidb | NULL |
| 3 | tidb | NULL |
+----+------+------+
2 rows in set (0.00 sec)
MySQL [ticdc_canal_test]> alter table jiawei_test2 add index idx_name(name);
Query OK, 0 rows affected (2.80 sec)
MySQL [ticdc_canal_test]> alter table jiawei_test2 drop column c1;
Query OK, 0 rows affected (0.28 sec)
MySQL [ticdc_canal_test]> select * from jiawei_test2;
+----+------+
| id | name |
+----+------+
| 2 | tidb |
| 3 | tidb |
+----+------+
2 rows in set (0.01 sec)
4. see topic Content
# Test the consumption information
$ ./bin/kafka-console-consumer.sh --bootstrap-server kafka_ip:9092 --topic test_ticdc_canal_json --from-beginning
# Create table messages
{
"id":0,"database":"ticdc_canal_json_test","table":"jiawei_test2","pkNames":null,"isDdl":true,"type":"CREATE","es":1654163280810,"ts":1654163282102,"sql":"CREATE TABLE `jiawei_test2` (`id` INT(11) NOT NULL,`name` VARCHAR(10) COLLATE utf8mb4_general_ci DEFAULT NULL,PRIMARY KEY(`id`))","sqlType":null,"mysqlType":null,"data":null,"old":null}
# Insert data message
{
"id":0,"database":"ticdc_canal_json_test","table":"jiawei_test2","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1654163303159,"ts":1654163305460,"sql":"","sqlType":{
"id":4,"name":12},"mysqlType":{
"id":"int","name":"varchar"},"data":[{
"id":"1","name":"numer1"}],"old":null}
{
"id":0,"database":"ticdc_canal_json_test","table":"jiawei_test2","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1654163303159,"ts":1654163305461,"sql":"","sqlType":{
"id":4,"name":12},"mysqlType":{
"id":"int","name":"varchar"},"data":[{
"id":"2","name":"number2"}],"old":null}
{
"id":0,"database":"ticdc_canal_json_test","table":"jiawei_test2","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1654163303159,"ts":1654163305461,"sql":"","sqlType":{
"id":4,"name":12},"mysqlType":{
"id":"int","name":"varchar"},"data":[{
"id":"3","name":"number3"}],"old":null}
# Update data messages , You can see old Yes contains the original values of all columns
{
"id":0,"database":"ticdc_canal_json_test","table":"jiawei_test2","pkNames":["id"],"isDdl":false,"type":"UPDATE","es":1654163312959,"ts":1654163314469,"sql":"","sqlType":{
"id":4,"name":12},"mysqlType":{
"id":"int","name":"varchar"},"data":[{
"id":"1","name":"tidb"}],"old":[{
"id":"1","name":"numer1"}]}
{
"id":0,"database":"ticdc_canal_json_test","table":"jiawei_test2","pkNames":["id"],"isDdl":false,"type":"UPDATE","es":1654163312959,"ts":1654163314469,"sql":"","sqlType":{
"id":4,"name":12},"mysqlType":{
"id":"int","name":"varchar"},"data":[{
"id":"2","name":"tidb"}],"old":[{
"id":"2","name":"number2"}]}
{
"id":0,"database":"ticdc_canal_json_test","table":"jiawei_test2","pkNames":["id"],"isDdl":false,"type":"UPDATE","es":1654163312959,"ts":1654163314470,"sql":"","sqlType":{
"id":4,"name":12},"mysqlType":{
"id":"int","name":"varchar"},"data":[{
"id":"3","name":"tidb"}],"old":[{
"id":"3","name":"number3"}]}
# Delete data information , It can be seen that 6.0 Version of old yes null
{
"id":0,"database":"ticdc_canal_json_test","table":"jiawei_test2","pkNames":["id"],"isDdl":false,"type":"DELETE","es":1654163329309,"ts":1654163330485,"sql":"","sqlType":{
"id":4,"name":12},"mysqlType":{
"id":"int","name":"varchar"},"data":[{
"id":"1","name":"tidb"}],"old":null}
#DDL test , Add index , Add or delete columns
{
"id":0,"database":"ticdc_canal_json_test","table":"jiawei_test2","pkNames":null,"isDdl":true,"type":"ALTER","es":1654163338309,"ts":1654163340302,"sql":"ALTER TABLE `jiawei_test2` ADD COLUMN `c1` INT","sqlType":null,"mysqlType":null,"data":null,"old":null}
{
"id":0,"database":"ticdc_canal_json_test","table":"jiawei_test2","pkNames":null,"isDdl":true,"type":"CINDEX","es":1654163357509,"ts":1654163360101,"sql":"ALTER TABLE `jiawei_test2` ADD INDEX `idx_name`(`name`)","sqlType":null,"mysqlType":null,"data":null,"old":null}
{
"id":0,"database":"ticdc_canal_json_test","table":"jiawei_test2","pkNames":null,"isDdl":true,"type":"ALTER","es":1654163366509,"ts":1654163369502,"sql":"ALTER TABLE `jiawei_test2` DROP COLUMN `c1`","sqlType":null,"mysqlType":null,"data":null,"old":null}
matters needing attention
It is recommended not to open during synchronization enable-tidb-extension This option , If this extra field is not required , Because this will generate a lot of WATERMARK Event, It is inconvenient for us to observe and consume
![[ Failed to transfer the external chain picture , The origin station may have anti-theft chain mechanism , It is suggested to save the pictures and upload them directly (img-Ly5KBPXp-1656325050615)(<https://tidb-blog.oss-cn-beijing.aliyuncs.com/media/ screenshots 2022-06-02 Afternoon 5.24.36-1654164050826.png>)]](/img/11/26f3b66c794129378d93a91bfe696f.png)
summary
At present, we have been running on the project for a long time , Very stable , Here is a brief introduction to the basic usage ,
Want more partition operations ,canal Supported by ,TiCDC canal_json Basically supported .
For the following, please refer to the official document for more detailed parameter settings : Parameter setting
边栏推荐
- uni-app进阶之内嵌应用【day14】
- Openlayers roller shutter map
- C# Winform程序界面优化实例
- Countdowncatch and completabilefuture and cyclicbarrier
- Communication network electronic billing system based on SSH
- Multipass Chinese document - setting graphical interface
- Solution of enterprise supply chain system in medical industry: realize collaborative visualization of medical digital intelligent supply chain
- Multipass中文文档-设置图形界面
- Solve the problem of unable to connect to command metric stream and related problems in the hystrix dashboard
- 电子元器件招标采购商城:优化传统采购业务,提速企业数字化升级
猜你喜欢

MRO工业品采购管理系统:赋能MRO企业采购各节点,构建数字化采购新体系

Optimize with netcorebeauty Net core independent deployment directory structure

秉持'家在中国'理念 2022 BMW儿童交通安全训练营启动

Redis (VIII) - enterprise level solution (I)

LeetCode动态规划经典题(一)

AI chief architect 10-aica-lanxiang, propeller frame design and core technology

C# Winform程序界面优化实例

漏洞复现----38、ThinkPHP5 5.0.23 远程代码执行漏洞

ForkJoinPool
![[PROJECT] Xiaomao school (IX)](/img/01/f7fc609e7a156d6e60ce6482ba2ac1.jpg)
[PROJECT] Xiaomao school (IX)
随机推荐
Volcano engine was selected into the first "panorama of edge computing industry" in China
云安全日报220630:IBM数据保护平台发现执行任意代码漏洞,需要尽快升级
autocad中文语言锁定只读警报怎么解决?
清华只能排第3?2022软科中国大学AI专业排名发布
[software testing] basic knowledge of software testing you need to know
Redis - persistent RDB and persistent AOF
If you want to learn software testing, you must see series, 2022 software testing engineer's career development
ASP. Net authentication code login
如何做好软件系统的需求调研,七种武器让你轻松搞定
MySQL找不到mysql.sock文件的臨時解
小程序容器技术,促进园区运营效率提升
这里数据过滤支持啥样的sql语句
Redis (VI) - master-slave replication
C# Winform程序界面优化实例
EasyNVR平台设备通道均在线,操作出现“网络请求失败”是什么原因?
「经验」浅谈聚类分析在工作中的应用
NFT挖矿游GameFi链游系统开发搭建
Rhai 脚本引擎的简单应用示例
Sword finger offer 16 Integer power of numeric value
Talk about the SQL server version of DTM sub transaction barrier function