当前位置:网站首页>Practice of Flink CDC + Hudi massive data entering the lake in SF

Practice of Flink CDC + Hudi massive data entering the lake in SF

2022-06-12 00:45:00 Alibaba cloud developers

brief introduction : Qinlihui in 5.21 Flink CDC Meetup The share of .

This article is compiled from tanlihui, R & D Engineer of Shunfeng big data, in 5 month 21 Japan Flink CDC Meetup Speech . The main contents include :

  1. SF data integration background
  2. Flink CDC Practical problems and optimization
  3. The future planning

Click to view live playback & speech PDF

One 、 SF data integration background

img

SF is an express logistics service provider , The main business includes time express 、 Economic express 、 Intra city distribution and cold chain transportation .

A series of systems are needed behind the transportation process , Such as order management system 、 Smart property system 、 And many transitions 、 Many sensors in cars or airplanes , Will produce a lot of data . If you need to analyze these data , So data integration is a very important step .

img

SF data integration has experienced several years of development , Mainly divided into two parts , One is offline data integration , One is real-time data integration . Offline data integration to DataX Mainly , This paper mainly introduces the real-time data integration scheme .

2017 year , be based on Jstorm + Canal The first version of the real-time data integration scheme is implemented . But there are many problems in this scheme , For example, data consistency cannot be guaranteed 、 Low throughput 、 Difficult to maintain . 2019 year , With Flink The continuous development of the community , It complements many important features , Therefore, based on Flink + Canal The second version of the real-time data integration scheme is implemented . But this plan is still not perfect , Experienced internal research and practice ,2022 Beginning of the year , We are turning to Flink CDC .

img

The picture above shows Flink + Canal Real time data into the lake architecture .

Flink After starting , First read the current Binlog Information , Marked as StartOffset , adopt select Method to collect full data , Send it downstream Kafka. After the complete collection , Again from startOffset Collect incremental log information , Sent to Kafka. Final Kafka Data from Spark Write to after consumption Hudi.

But there are three problems with this architecture :

  • There is duplication between full and incremental data : Because the table will not be locked during the collection process , If there is any data change during the full volume acquisition , And collected these data , Then these data will be consistent with Binlog Duplicate data in ;
  • It needs to be done downstream Upsert or Merge Write to eliminate duplicate data , Ensure final consistency of data ;
  • Two sets of computing engines are required , Plus the message queue Kafka To write data to the data lake Hudi in , The process involves many components 、 Link length , And it consumes a lot of resources .

Based on the above questions , We sorted out the core requirements of data integration :

img

  1. Full increment automatic switching , And ensure the accuracy of the data .Flink + Canal The architecture can realize full and incremental automatic switching , But the accuracy of the data cannot be guaranteed ;
  2. Minimize the impact on the source database , For example, try not to use locks during synchronization 、 Flow control, etc ;
  3. Data collection that can add new tables to existing tasks , This is a very core requirement , Because in a complex production environment , Data integration after all tables are ready will lead to inefficiency . Besides , If you can't merge tasks , It takes many missions , Collect many times Binlog The data of , May lead to DB The machine bandwidth is full ;
  4. It can collect full and incremental logs at the same time , Adding a new table cannot pause log collection to ensure data accuracy , This method will delay the collection of other table logs ;
  5. Ensure that the data is in the same primary key ID Next occurs in historical order , Events that do not occur after the occurrence are sent to the downstream first .

img

Flink CDC Well solved the business pain points , And in scalability 、 stability 、 The community is very active .

  • First , It can connect seamlessly Flink ecology , Reuse Flink A number of sink Ability , Use Flink The ability to clean up and transform data ;
  • secondly , It can automatically switch between full quantity and increment , And ensure the accuracy of the data ;
  • Third , It can support lock free reading 、 Breakpoint continuation 、 Horizontal expansion , Especially in horizontal expansion , In theory , When enough resources are given , Performance bottlenecks generally do not occur in CDC Side , It's the data source / Whether the target source can support reading / Write so much data .

Two 、Flink CDC Practical problems and optimization

img

The picture above shows Flink CDC 2.0 Architecture principle of . It's based on FLIP-27 Realization , The core steps are as follows :

  1. Enumerator First, split the full data into multiple SnapshotSplit, Then follow the first step in the figure above to SnapshotSplit Send to SourceReader perform . Data will be modified during execution to ensure data consistency ;
  2. SnapshotSplit When the reading is completed, send a message to Enumerator Report the read completed block information ;
  3. repeat (1) (2) Two steps , Until the full amount of data is read ;
  4. After reading the full amount of data ,Enumerator It will be completed in full according to the previous split Information , Construct a BinlogSplit. Send to SourceRead perform , Read incremental log data .

Question 1 : Adding a new table will stop Binlog Log stream

img

Adding new tables to existing tasks is a very important requirement , Flink CDC 2.0 This function is also supported . But to ensure data consistency ,Flink CDC 2.0 In the process of adding a new table , Need to stop Binlog Log stream reading , Then read the full data of the new table . After reading the full data of the new table , And then put the Binlog Task restart . This also means that adding new tables will affect the log collection progress of other tables . However, we hope that both full and incremental tasks can be carried out at the same time , To solve this problem , We are right. Flink CDC Expanded , It supports parallel reading of full and incremental log streams , Steps are as follows :

img

  1. After the program starts , stay Enumerator Created in BinlogSplit , At the top of the allocation list , Assigned to SourceReader Perform incremental data collection ;
  2. It is the same as the original full data acquisition ,Enumerator Cut the whole collection into multiple pieces split block , Then assign the segmented blocks to SourceReader To perform full data collection ;
  3. After full data collection ,SourceReader towards Enumerator Report the information of the completed full data acquisition block ;
  4. repeat (2) (3) Step , Complete the collection of the full meter .

The above is the first start task , The process of reading full and incremental logs in parallel . After adding a table , The implementation steps of parallel reading are as follows :

img

  1. When resuming a mission ,Flink CDC From state Get the configuration information of the user's new table in ;
  2. By comparing user configuration information with status information , Capture the table to be added . about BinlogSplit Mission , New tables will be added binlog Data collection ; about Enumerator Mission , Will perform full segmentation on the new table ;
  3. Enumerator Cut the good SnapshotSplit Assigned to SourceReader Perform full data collection ;
  4. Repeat step (3), Until all the full data is read .

However , After the full and incremental logs are read in parallel , There is a data conflict problem again .

img

As shown in the figure above , Flink CDC Before reading the full amount of data , The current... Will be read first Binlog Location information for , Mark it as LW, Then passed select To read the full amount of data , Read into the above figure s1、s2、 s3、s4 Four pieces of data . Then read the current Binlog Location , Marked as HW, And then LW and HW Data changed in merge To the data collected in full before . After a series of operations , Finally, the full amount of data collected is s1、s2、s3、s4 and s5.

The process of incremental collection will also read Binlog Log information in , Will LW and HW Medium s2、s2、s4、s5 Four pieces of data are sent to the downstream .

There are two problems in the whole process : First , Multiple data access , There is data duplication , The red mark in the above figure indicates that there is duplicate data ; secondly , Full and incremental in two different threads , It may also be in two different JVM in , Therefore, the data sent to the downstream may be full data , It may also be incremental data , It means the same primary key ID The order of arrival downstream is not in historical order , Not in line with core requirements .

For data conflicts , We offer based on GTID Implementation of the processing scheme .

img

First , Mark the full data Snapshot label , Incremental data is marked with Binlog label ; secondly , Add a high water level to the full data GTID Information , The incremental data itself carries GTID Information , Therefore, there is no need to add . Distribute the data , There will be one downstream KeyBy operator , Then connect the data conflict handling operator , The core of data conflict is to ensure that the data sent to the downstream is not repeated , And in historical order .

If all the data collected is distributed , And there was no Binlog Data distribution , Of this data GTID Store in state And distribute this data ; If state Is not empty and this record's GTID Greater than or equal to... In the state GTID , This data is also GTID Store in state And distribute this data ;

In this way , The problem of data conflict is well solved , The data finally output to the downstream is not repeated and occurs in historical order .

img

However , New problems arise again . It can be seen from the processing algorithm that , To ensure that the data is not repeated and distributed in historical order , All records will be mapped to GTID Information is stored in the state , Causes the status to increase continuously .

Cleaning status is generally preferred TTL, but TTL Difficult to control time , And the data cannot be completely cleaned up . The second method is manual cleaning , After the full scale is completed , A record can be issued to inform downstream cleanup state Data in .

Solved all the above problems , The final scheme of parallel reading is shown in the following figure .

img

First , Put four labels on the data , They represent different states :

  • SNAPSHOT: Full amount of collected data information .
  • STATE_BINLOG: Full collection has not been completed yet , Binlog The change data of this table has been collected .
  • BINLOG: After full data collection ,Binlog Then collect the change data of this table .
  • TABLE_FINISHED: Notify downstream after full data collection , Can be cleaned state.

The specific implementation steps are as follows :

  1. Distribute Binlog , here Binlog All the data collected are STATE_BINLOG label ;
  2. Distribute SnapshotSplit Mission , At this time, the full amount of data collected is SNAPSHOT label ;
  3. Enumerator Monitor the status of the table in real time , The execution of a certain table is completed and checkpoint after , notice Binlog Mission .Binlog After the task receives the notification , Collect the following data from this table Binlog The information is marked with BINLOG label ; Besides , It also constructs a TABLE_FINISHED The records are sent to the downstream for processing ;
  4. When the data collection is complete , In addition to the data conflict handling operator , A new step has been added here : Screened from the mainstream TABLE_FINISHED Incident records , Send it to the downstream by broadcasting , The downstream clears the status information of the corresponding table according to the specific information .

Question two : Write Hudi There is data skew

img

Pictured above ,Flink CDC When collecting data from three tables , Will read first tableA The full amount of data , Read again tableB The full amount of data . Read tableA In the process of , Downstream only tableA Of sink There is data inflow .

img

We solve the problem of data skew by mixed reading of multiple tables .

Before introducing multi table mixing ,Flink CDC Finished reading tableA All of the chunk, Read again tableB All of the chunk. After realizing mixed reading of multiple tables , The order of reading becomes read tableA Of chunk1、tableB Of chunk1、tableC Of chunk1, Read again tableA Of chunk2, And so on , Finally, the problem of downstream sink The problem of data skew , Make sure that each sink There is data inflow .

img

We test the performance of multi table mixed reading , from TPCC Test data constructed by the tool , Read out 4. A watch , The total parallelism is 8, Every sink The parallelism of is 2, The writing time is changed from the original 46 Minutes fall to 20 minute , Performance improvement 2.3 times .

It should be noted that , If sink The parallelism of is equal to the total parallelism , The performance will not be significantly improved , The main function of multi table mixed reading is to obtain the data distributed by each table faster .

Question 3 : You need to manually specify schema Information

img

The user manually executes DB schema And sink Between schema The mapping relationship , Low development efficiency , Time consuming and error prone .

img

In order to lower the user's threshold , Improve development efficiency , We did Oracle catalog , So that users can in a low code way 、 Do not need to specify DB schema Information and sink schema Mapping relationship of information , You can pass Flink CDC Writes data to Hudi.

3、 ... and 、 The future planning

img

First of all , Support schema Information change synchronization . For example, the data source has schema Information changes , It can be synchronized to Kafka and Hudi in ; Support the platform to access more data source types , Enhance stability , Realize the landing of more application scenarios .

second , Support SQL The way of transformation , Use Flink CDC Synchronize data to Hudi in , Reduce the threshold of users .

Third , I hope the technology is more open , Grow with the community , Contribute to the community .

Question and answer

Q: How to deal with breakpoint continuous transmission collection ?

A: There are two types of breakpoint continuation , It is divided into total quantity and Binlog. But they are all based on Flink state The ability of , During synchronization, the progress will be stored in state in . If it fails , Next time I will start from state Medium recovery .

Q:MySQL Use in monitoring multiple tables SQL write in Hudi When in the watch , There are multiple job, Maintenance is troublesome , How to pass a single job Synchronize the entire library ?

A: We are based on GTID In the right way Flink CDC Expanded , Support adding tables in tasks , And does not affect the collection progress of other tables . Do not consider that the new table will affect the progress of other tables , It can also be based on Flink CDC 2.2 Ability to create new tables .

Q: SF will have these features in CDC In the open source version ?

A: At present, our scheme still has some limitations , For example, you must use MySQL Of GTID, Operators with data conflict processing downstream are required , Therefore, it is difficult to open source in the community .

Q:Flink CDC 2.0 The new table supports the full quantity + Incremental ?

A: Yes .

Q:GTID Will the de duplication operator become a performance bottleneck ?

A: After practice , There are no performance bottlenecks , It just does some data judgment and filtering .

Click to view live playback & speech PDF

Link to the original text :https://developer.aliyun.com/article/949402?

Copyright notice : The content of this article is contributed by alicloud real name registered users , The copyright belongs to the original author , Alicloud developer community does not own its copyright , It also does not bear the corresponding legal liability . Please check the specific rules 《 Alicloud developer community user service agreement 》 and 《 Alibaba cloud developer community intellectual property protection guidelines 》. If you find any suspected plagiarism in this community , Fill in the infringement complaint form to report , Once verified , The community will immediately delete the suspected infringement content .
原网站

版权声明
本文为[Alibaba cloud developers]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/163/202206120034216207.html