当前位置:网站首页>Practice of Flink CDC + Hudi massive data entering the lake in SF
Practice of Flink CDC + Hudi massive data entering the lake in SF
2022-06-12 00:45:00 【Alibaba cloud developers】
brief introduction : Qinlihui in 5.21 Flink CDC Meetup The share of .
This article is compiled from tanlihui, R & D Engineer of Shunfeng big data, in 5 month 21 Japan Flink CDC Meetup Speech . The main contents include :
- SF data integration background
- Flink CDC Practical problems and optimization
- The future planning
Click to view live playback & speech PDF
One 、 SF data integration background

SF is an express logistics service provider , The main business includes time express 、 Economic express 、 Intra city distribution and cold chain transportation .
A series of systems are needed behind the transportation process , Such as order management system 、 Smart property system 、 And many transitions 、 Many sensors in cars or airplanes , Will produce a lot of data . If you need to analyze these data , So data integration is a very important step .

SF data integration has experienced several years of development , Mainly divided into two parts , One is offline data integration , One is real-time data integration . Offline data integration to DataX Mainly , This paper mainly introduces the real-time data integration scheme .
2017 year , be based on Jstorm + Canal The first version of the real-time data integration scheme is implemented . But there are many problems in this scheme , For example, data consistency cannot be guaranteed 、 Low throughput 、 Difficult to maintain . 2019 year , With Flink The continuous development of the community , It complements many important features , Therefore, based on Flink + Canal The second version of the real-time data integration scheme is implemented . But this plan is still not perfect , Experienced internal research and practice ,2022 Beginning of the year , We are turning to Flink CDC .

The picture above shows Flink + Canal Real time data into the lake architecture .
Flink After starting , First read the current Binlog Information , Marked as StartOffset , adopt select Method to collect full data , Send it downstream Kafka. After the complete collection , Again from startOffset Collect incremental log information , Sent to Kafka. Final Kafka Data from Spark Write to after consumption Hudi.
But there are three problems with this architecture :
- There is duplication between full and incremental data : Because the table will not be locked during the collection process , If there is any data change during the full volume acquisition , And collected these data , Then these data will be consistent with Binlog Duplicate data in ;
- It needs to be done downstream Upsert or Merge Write to eliminate duplicate data , Ensure final consistency of data ;
- Two sets of computing engines are required , Plus the message queue Kafka To write data to the data lake Hudi in , The process involves many components 、 Link length , And it consumes a lot of resources .
Based on the above questions , We sorted out the core requirements of data integration :

- Full increment automatic switching , And ensure the accuracy of the data .Flink + Canal The architecture can realize full and incremental automatic switching , But the accuracy of the data cannot be guaranteed ;
- Minimize the impact on the source database , For example, try not to use locks during synchronization 、 Flow control, etc ;
- Data collection that can add new tables to existing tasks , This is a very core requirement , Because in a complex production environment , Data integration after all tables are ready will lead to inefficiency . Besides , If you can't merge tasks , It takes many missions , Collect many times Binlog The data of , May lead to DB The machine bandwidth is full ;
- It can collect full and incremental logs at the same time , Adding a new table cannot pause log collection to ensure data accuracy , This method will delay the collection of other table logs ;
- Ensure that the data is in the same primary key ID Next occurs in historical order , Events that do not occur after the occurrence are sent to the downstream first .

Flink CDC Well solved the business pain points , And in scalability 、 stability 、 The community is very active .
- First , It can connect seamlessly Flink ecology , Reuse Flink A number of sink Ability , Use Flink The ability to clean up and transform data ;
- secondly , It can automatically switch between full quantity and increment , And ensure the accuracy of the data ;
- Third , It can support lock free reading 、 Breakpoint continuation 、 Horizontal expansion , Especially in horizontal expansion , In theory , When enough resources are given , Performance bottlenecks generally do not occur in CDC Side , It's the data source / Whether the target source can support reading / Write so much data .
Two 、Flink CDC Practical problems and optimization

The picture above shows Flink CDC 2.0 Architecture principle of . It's based on FLIP-27 Realization , The core steps are as follows :
- Enumerator First, split the full data into multiple SnapshotSplit, Then follow the first step in the figure above to SnapshotSplit Send to SourceReader perform . Data will be modified during execution to ensure data consistency ;
- SnapshotSplit When the reading is completed, send a message to Enumerator Report the read completed block information ;
- repeat (1) (2) Two steps , Until the full amount of data is read ;
- After reading the full amount of data ,Enumerator It will be completed in full according to the previous split Information , Construct a BinlogSplit. Send to SourceRead perform , Read incremental log data .
Question 1 : Adding a new table will stop Binlog Log stream

Adding new tables to existing tasks is a very important requirement , Flink CDC 2.0 This function is also supported . But to ensure data consistency ,Flink CDC 2.0 In the process of adding a new table , Need to stop Binlog Log stream reading , Then read the full data of the new table . After reading the full data of the new table , And then put the Binlog Task restart . This also means that adding new tables will affect the log collection progress of other tables . However, we hope that both full and incremental tasks can be carried out at the same time , To solve this problem , We are right. Flink CDC Expanded , It supports parallel reading of full and incremental log streams , Steps are as follows :

- After the program starts , stay Enumerator Created in BinlogSplit , At the top of the allocation list , Assigned to SourceReader Perform incremental data collection ;
- It is the same as the original full data acquisition ,Enumerator Cut the whole collection into multiple pieces split block , Then assign the segmented blocks to SourceReader To perform full data collection ;
- After full data collection ,SourceReader towards Enumerator Report the information of the completed full data acquisition block ;
- repeat (2) (3) Step , Complete the collection of the full meter .
The above is the first start task , The process of reading full and incremental logs in parallel . After adding a table , The implementation steps of parallel reading are as follows :

- When resuming a mission ,Flink CDC From state Get the configuration information of the user's new table in ;
- By comparing user configuration information with status information , Capture the table to be added . about BinlogSplit Mission , New tables will be added binlog Data collection ; about Enumerator Mission , Will perform full segmentation on the new table ;
- Enumerator Cut the good SnapshotSplit Assigned to SourceReader Perform full data collection ;
- Repeat step (3), Until all the full data is read .
However , After the full and incremental logs are read in parallel , There is a data conflict problem again .

As shown in the figure above , Flink CDC Before reading the full amount of data , The current... Will be read first Binlog Location information for , Mark it as LW, Then passed select To read the full amount of data , Read into the above figure s1、s2、 s3、s4 Four pieces of data . Then read the current Binlog Location , Marked as HW, And then LW and HW Data changed in merge To the data collected in full before . After a series of operations , Finally, the full amount of data collected is s1、s2、s3、s4 and s5.
The process of incremental collection will also read Binlog Log information in , Will LW and HW Medium s2、s2、s4、s5 Four pieces of data are sent to the downstream .
There are two problems in the whole process : First , Multiple data access , There is data duplication , The red mark in the above figure indicates that there is duplicate data ; secondly , Full and incremental in two different threads , It may also be in two different JVM in , Therefore, the data sent to the downstream may be full data , It may also be incremental data , It means the same primary key ID The order of arrival downstream is not in historical order , Not in line with core requirements .
For data conflicts , We offer based on GTID Implementation of the processing scheme .

First , Mark the full data Snapshot label , Incremental data is marked with Binlog label ; secondly , Add a high water level to the full data GTID Information , The incremental data itself carries GTID Information , Therefore, there is no need to add . Distribute the data , There will be one downstream KeyBy operator , Then connect the data conflict handling operator , The core of data conflict is to ensure that the data sent to the downstream is not repeated , And in historical order .
If all the data collected is distributed , And there was no Binlog Data distribution , Of this data GTID Store in state And distribute this data ; If state Is not empty and this record's GTID Greater than or equal to... In the state GTID , This data is also GTID Store in state And distribute this data ;
In this way , The problem of data conflict is well solved , The data finally output to the downstream is not repeated and occurs in historical order .

However , New problems arise again . It can be seen from the processing algorithm that , To ensure that the data is not repeated and distributed in historical order , All records will be mapped to GTID Information is stored in the state , Causes the status to increase continuously .
Cleaning status is generally preferred TTL, but TTL Difficult to control time , And the data cannot be completely cleaned up . The second method is manual cleaning , After the full scale is completed , A record can be issued to inform downstream cleanup state Data in .
Solved all the above problems , The final scheme of parallel reading is shown in the following figure .

First , Put four labels on the data , They represent different states :
- SNAPSHOT: Full amount of collected data information .
- STATE_BINLOG: Full collection has not been completed yet , Binlog The change data of this table has been collected .
- BINLOG: After full data collection ,Binlog Then collect the change data of this table .
- TABLE_FINISHED: Notify downstream after full data collection , Can be cleaned state.
The specific implementation steps are as follows :
- Distribute Binlog , here Binlog All the data collected are STATE_BINLOG label ;
- Distribute SnapshotSplit Mission , At this time, the full amount of data collected is SNAPSHOT label ;
- Enumerator Monitor the status of the table in real time , The execution of a certain table is completed and checkpoint after , notice Binlog Mission .Binlog After the task receives the notification , Collect the following data from this table Binlog The information is marked with BINLOG label ; Besides , It also constructs a TABLE_FINISHED The records are sent to the downstream for processing ;
- When the data collection is complete , In addition to the data conflict handling operator , A new step has been added here : Screened from the mainstream TABLE_FINISHED Incident records , Send it to the downstream by broadcasting , The downstream clears the status information of the corresponding table according to the specific information .
Question two : Write Hudi There is data skew

Pictured above ,Flink CDC When collecting data from three tables , Will read first tableA The full amount of data , Read again tableB The full amount of data . Read tableA In the process of , Downstream only tableA Of sink There is data inflow .

We solve the problem of data skew by mixed reading of multiple tables .
Before introducing multi table mixing ,Flink CDC Finished reading tableA All of the chunk, Read again tableB All of the chunk. After realizing mixed reading of multiple tables , The order of reading becomes read tableA Of chunk1、tableB Of chunk1、tableC Of chunk1, Read again tableA Of chunk2, And so on , Finally, the problem of downstream sink The problem of data skew , Make sure that each sink There is data inflow .

We test the performance of multi table mixed reading , from TPCC Test data constructed by the tool , Read out 4. A watch , The total parallelism is 8, Every sink The parallelism of is 2, The writing time is changed from the original 46 Minutes fall to 20 minute , Performance improvement 2.3 times .
It should be noted that , If sink The parallelism of is equal to the total parallelism , The performance will not be significantly improved , The main function of multi table mixed reading is to obtain the data distributed by each table faster .
Question 3 : You need to manually specify schema Information

The user manually executes DB schema And sink Between schema The mapping relationship , Low development efficiency , Time consuming and error prone .

In order to lower the user's threshold , Improve development efficiency , We did Oracle catalog , So that users can in a low code way 、 Do not need to specify DB schema Information and sink schema Mapping relationship of information , You can pass Flink CDC Writes data to Hudi.
3、 ... and 、 The future planning

First of all , Support schema Information change synchronization . For example, the data source has schema Information changes , It can be synchronized to Kafka and Hudi in ; Support the platform to access more data source types , Enhance stability , Realize the landing of more application scenarios .
second , Support SQL The way of transformation , Use Flink CDC Synchronize data to Hudi in , Reduce the threshold of users .
Third , I hope the technology is more open , Grow with the community , Contribute to the community .
Question and answer
Q: How to deal with breakpoint continuous transmission collection ?
A: There are two types of breakpoint continuation , It is divided into total quantity and Binlog. But they are all based on Flink state The ability of , During synchronization, the progress will be stored in state in . If it fails , Next time I will start from state Medium recovery .
Q:MySQL Use in monitoring multiple tables SQL write in Hudi When in the watch , There are multiple job, Maintenance is troublesome , How to pass a single job Synchronize the entire library ?
A: We are based on GTID In the right way Flink CDC Expanded , Support adding tables in tasks , And does not affect the collection progress of other tables . Do not consider that the new table will affect the progress of other tables , It can also be based on Flink CDC 2.2 Ability to create new tables .
Q: SF will have these features in CDC In the open source version ?
A: At present, our scheme still has some limitations , For example, you must use MySQL Of GTID, Operators with data conflict processing downstream are required , Therefore, it is difficult to open source in the community .
Q:Flink CDC 2.0 The new table supports the full quantity + Incremental ?
A: Yes .
Q:GTID Will the de duplication operator become a performance bottleneck ?
A: After practice , There are no performance bottlenecks , It just does some data judgment and filtering .
Click to view live playback & speech PDF
Link to the original text :https://developer.aliyun.com/article/949402?
Copyright notice : The content of this article is contributed by alicloud real name registered users , The copyright belongs to the original author , Alicloud developer community does not own its copyright , It also does not bear the corresponding legal liability . Please check the specific rules 《 Alicloud developer community user service agreement 》 and 《 Alibaba cloud developer community intellectual property protection guidelines 》. If you find any suspected plagiarism in this community , Fill in the infringement complaint form to report , Once verified , The community will immediately delete the suspected infringement content .
边栏推荐
- Lambda创建流
- A day when the script boy won't be killed
- Is the o2o platform worth doing in 2022
- [answer] should the role with one end of the reflexive association be called "current version"
- C language multidimensional array and pointer - learning 24
- 【SignalR全套系列】之在.Net6中实现SignalR分组通信
- 2022 edition of global and Chinese high purity silicon carbide powder operation research and investment strategy analysis report
- QApplication a (argc, argv) and exec() in the main function of QT getting started
- What does the Red Cross of win10 folder status indicate
- Anfulai embedded weekly report (issue 254): February 21, 2022 to February 27, 2022
猜你喜欢

How to send Apple phone WPS files to QQ mailbox

Bgfx multithreaded rendering

详解异步任务:函数计算的任务触发去重

七个需要关注的测试自动化趋势

Alibaba cloud intelligent coding plug-in provides a more cosy development experience

Characteristics of JS logical operators

组态王如何利用无线Host-Link通信模块远程采集PLC数据?

Detailed explanation of merge sorting

VsCode - 保存文件自动格式化将单引号 ‘ 变成双引号 “ 的问题

Recurrent+Transformer 视频恢复领域的‘德艺双馨’
随机推荐
如何优化PlantUML流程图(时序图)
Lambda中间操作limit
Introduction to semantic vector retrieval
Mysql database: introduction to database 𞓜 addition, deletion, modification and query
A day when the script boy won't be killed
C language preprocessing instructions - learning 21
QApplication a (argc, argv) and exec() in the main function of QT getting started
Lambda快速入门
Win jar package setting boot auto start
功能测试如何1个月快速进阶自动化测试?明确这2步就问题不大了
Lambda intermediate operation distinct
How to make scripts executable anywhere
2022 edition of global and Chinese on silicon liquid crystal market supply and demand research and prospect Trend Forecast Report
Lambda create stream
Global and Chinese chromatographic silica gel resin industry research and investment direction forecast report 2022 Edition
Lambda中间操作sorted
Xiaomu's interesting PWN
Anfulai embedded weekly report (issue 254): February 21, 2022 to February 27, 2022
The long polling processing mechanism of the service end of the # yyds dry goods inventory # Nacos configuration center
Dry goods | what do testers need to do for a complete performance test?