当前位置:网站首页>Flick two open, realized a batch lookup join (with source code)
Flick two open, realized a batch lookup join (with source code)
2022-07-02 11:01:00 【Direction_ Wind】
Reprinted from : A official account article of big data sheep , It's only for learning , There is no profitable behavior .
flink Second opening , Realized a batch lookup join( Source code attached )
- 1. Preface
- 2. A practical case
- 3.batch lookup join Realization
- 4.xdm How to use this function ?
1. Preface
Take the book back , The last section talked about the blogger's discovery due to flink sql in lookup join Performance problems in accessing external dimension tables .
This gave birth to an idea , With Redis Dimension table as an example ,Redis Support pipeline Batch access mode , therefore flink sql lookup join Can you follow DataStream The same way , Save a batch of data first , And then use Redis pipeline External bulk storage . Bloggers affectionately call this function flink sql batch lookup join, This section is about bloggers based on flink Source code implementation of this function .
I don't say much nonsense , Let's go directly to the contents and conclusions of this article , Small partners can see the conclusion first and quickly understand what help the blogger expects this article to bring to small partners :
Come directly to a practical case : Bloggers associate user profiles with exposed user log streams ( Age 、 Gender ) Dimension table as an example batch lookup join Have basic abilities ( How to configure parameters , How to write sql, What is the final effect ).
batch lookup join: This paper mainly introduces batch lookup join The function is from flink transformation set out , Be sure to batch lookup join Changes involved and their implementation ideas 、 principle . Will also teach you some ideas to change the source code to achieve some of the functions you want .
Summary and prospect : current batch lookup join Implementation is not in line with sql Original semantics of , You can follow sql Do something yourself
2. A practical case
2.1. Expected input 、 Output data
Let's take a look at the specific scene , What should the output value of the corresponding input value look like .
Demand index : Use exposure user log stream (show_log) Associated user portrait dimension table (user_profile) Associated with the user's portrait ( Gender , age group ) data .
A wave of input data :
Expose user log stream (show_log) data ( The data is stored in kafka in ):
| log_id | timestamp | user_id |
|---|---|---|
| 1 | 2021-11-01 | |
| 2 | 2021-11-01 | 00:03:00 |
| 3 | 2021-11-01 | 00:05:00 |
| 4 | 2021-11-01 | 00:06:00 |
| 5 | 2021-11-01 | 00:07:00 |
User portrait dimension table (user_profile) data ( The data is stored in redis in ):
| user_id( Primary key ) | age | sex |
|---|---|---|
| a | 12-18 | male |
| b | 18-24 | Woman |
| c | 18-24 | male |
Be careful :redis The data structure in is stored according to key,value To store . among key by user_id,value by age,sex Of json. As shown in the figure below :

The expected output data are as follows :
| log_id | timestamp | user_id | age | sex |
|---|---|---|---|---|
| 1 | 2021-11-01 00:01:03 | a | 12-18 | male |
| 2 | 2021-11-01 00:03:00 | b | 18-24 | Woman |
| 3 | 2021-11-01 00:05:00 | c | 18-24 | male |
| 4 | 2021-11-01 00:06:00 | b | 18-24 | Woman |
| 5 | 2021-11-01 00:07:00 | c | 18-24 | male |
2.2.batch lookup join sql Code
batch lookup join sql The code is the same as the original lookup join sql The code is as like as two peas . as follows sql.
CREATE TABLE show_log (
log_id BIGINT,
`timestamp` as cast(CURRENT_TIMESTAMP as timestamp(3)),
user_id STRING,
proctime AS PROCTIME()
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.user_id.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE user_profile (
user_id STRING,
age STRING,
sex STRING
) WITH (
'connector' = 'redis',
'hostname' = '127.0.0.1',
'port' = '6379',
'format' = 'json',
'lookup.cache.max-rows' = '500',
'lookup.cache.ttl' = '3600',
'lookup.max-retries' = '1'
);
CREATE TABLE sink_table (
log_id BIGINT,
`timestamp` TIMESTAMP(3),
user_id STRING,
proctime TIMESTAMP(3),
age STRING,
sex STRING
) WITH (
'connector' = 'print'
);
-- lookup join Of query Logic
INSERT INTO sink_table
SELECT
s.log_id as log_id
, s.`timestamp` as `timestamp`
, s.user_id as user_id
, s.proctime as proctime
, u.sex as sex
, u.age as age
FROM show_log AS s
LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS u
ON s.user_id = u.user_id
You can see lookup join and batch lookup join The code for is exactly the same , The only difference is ,batch lookup join Need to set up table config Parameters , As shown in the figure below :

2.3.batch lookup join effect
Will be original lookup join and batch lookup join Compare the effect of :
Native lookup join: Every time you enter a piece of data , Access the external dimension table to obtain the result and output a piece of data , As shown in the figure below .
Bloggers achieve batch lookup join: Yes, every time you save enough 30 Pieces of data or every 5s( Prevent small amounts of data , Do not output data for a long time ) using redis pipeline Ability to access external storage once . Then output the results in batches , As shown in the figure below . The throughput is greatly improved .
3.batch lookup join Realization
3.1. How do you know which part of the source code should be changed ?
Bloggers will ask you how to change the source code to realize their functions through the following questions .
- What are the better ideas for changing the source code ?
- Conclusion : The first is to refer to the implementation of similar modules ( Can't write , Copy... But I can !), For example, this article wants to realize batch lookup join, Always refer to the original lookup join To achieve .
- Everyone is changing flink Source code , because flink There are too many modules in the source code , The project is huge , Often the problem encountered in the first step is not how to realize this function , But where should we change it to achieve it !
- Conclusion : One flink The task of (DataStream\Table\SQL) The essence of all the essence is concentrated. transformation in !!! As long as the implementation of the operator is involved , Little buddy Meng can go to transformation In search of . You can set breakpoints in every operator Or open Method, you can see which step is actually constructed and initialized . In this way, we can trace back along the call stack to determine which part of the code to change .
3.2.lookup join principle
3.2.1.transformation
In the realization of batch lookup join Before , Of course, from the original lookup join Start with the implementation of , have a look flink How did the government achieve it , Specifically transformation As shown in the figure below :

The specific implementation logic is carried in org.apache.flink.streaming.api.operators.ProcessOperator,org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner in .
3.2.2.LookupJoinRunner
LookupJoinRunner The data processing logic in focuses on processElement in .

You can see the picture above ,LookupJoinRunner Another layer is embedded fetcher To achieve specific lookup Logic .
- among fetcher: It is based on flink sql lookup join Logically generated lookup join Code examples of ;
- among collector:collector The main function of is to convert the original data RowData and lookup To the RowData The data is merged into JoinedRowData result , Then the output .
Let's take a look at the details fetcher and collector.
3.2.3.fetcher

Put this fetcher Code for copy Come out and have a look .

fetcher It's embedded RedisRowDataLookupFunction As the final function to access the external dimension table .
3.2.4.RedisRowDataLookupFunction
visit redis Get data .

3.2.5.collector

Put this collector Code for copy Come out and have a look .

3.3.lookup join The operator implements the call chain
Does it feel like a lookup join The call chain is complex .
because batch lookup join It's a complete reference lookup join To achieve the , So next, the blogger will introduce the overall call chain relationship , This will facilitate subsequent design batch lookup join When implementing the scheme, determine which part of the code to modify .

The overall call logic is as follows :
- ProcessOpeartor hold original RowData Pass to LookupJoinRunner
- LookupJoinRunner hold original RowData Pass it on to sql Code generated fetcher
- fetcher Zhongba original RowData Pass to RedisRowDataLookupFunction Then go to lookup Dimension table ,lookup The result data is lookup RowData
- collector hold original RowData and lookup RowData Merge data into JoinedRowData Then the output .
3.4.batch lookup join Design thinking
Still the same , Let's first look at the design idea and the final conclusion ,batch lookup join The operator call chain is designed as follows :

Explain the design idea in detail :
- If you want to access external storage in batches (Redis) The data of . It can be inferred that RedisRowDataLookupFunction Your input needs to be List< original RowData> , The output needs to be List. Where the input data is input to RedisRowDataLookupFunction In the after , Use Redis pipeline To access external storage in batches , And then put the results List Output .
- from RedisRowDataLookupFunction The output data of is List Deduce collector The input data format must be List< original RowData>. Because in lookup join in collector The logic of this is to original RowData and lookup RowData Merge into JoinedRowData, Output the result . therefore collector Here is the will List< original RowData> and List Traverse and merge , Output one by one JoinedRowData.
- Again RedisRowDataLookupFunction The input data for is fetcher Incoming , Then infer fetcher The input data format must be List< original RowData>.
- because fetcher Input is List< original RowData>, be LookupJoinRunner Output to fetcher The data also needs to be List< original RowData>. however ProcessOpeartor Only to LookupJoinRunner original RowData, Therefore, we can get every savings 30 Pieces of data or every 5s The logic of can determine the need in LookupJoinRunner I did it. .
Ideas have , that batch lookup join The changes involved can also be confirmed .
- Create a new one BatchLookupJoinRunner: Implement batch saving logic ( Every save 30 Pieces of data or every 5s), The data saved in batches are placed in ListState in , To prevent loss , stay table config Medium is.dim.batch.mode Set to true Use this BatchLookupJoinRunner.
- Code generated fetcher: The original input original RowData Change it to List< original RowData>.
- Create a new one RedisRowDataBatchLookupFunction: Realize the batch data to be input List< original RowData> Use it after you get it redis pipeline External bulk storage , Get List The result data is given to collector.
- Code generated collector: Original lookup join The input original RowData,lookup RowData Change it to List< original RowData>,List, Add traversal loop List< original RowData>,List, Merge in order List Every one of them original RowData,lookup RowData Output JoinedRowData The logic of .
3.5.batch lookup join The final effect of
3.5.1.transformation
You can see is.dim.batch.mode Set to true when ,transformation as follows .transformation The key processing logic in is BatchLookupJoinRunner

3.5.2.BatchLookupJoinRunner

3.5.3.fetcher

3.5.4.RedisRowDataBatchLookupFunction
RedisRowDataBatchLookupFunction Get the input List data , call Redis pipeline External bulk storage .

3.5.5.collector
sql Generated collector The code is as follows :

3.6. Items to be improved
At present, the deficiencies of the above scheme are as follows :
- batch The execution logic of and sql The original semantics are inconsistent . Because from sql It seems that there is no such batch lookup join Semantic .
- Each of them 5s The blogger simply realized the following , Completely based on data-driven every 5s Save a batch of , Not based on onTimer Driven . It may appear after a piece of data comes ,5 min There's no data in , Then the data will not be output .
- Without considering the abstraction of implementation code , Focus on realizing functions , So many changes based on source code are direct copy Came up with another way to implement .
4.xdm How to use this function ?
Packaging project Replace the following two modules install (mvn clean install) To the local warehouse .
Then quote two in your project blink The bag can be used . The way to use it is to just put table config Of is.dim.batch.mode Set to true, The code also follows lookup join Just write in the same way .
边栏推荐
- VSCode工具使用
- 华为应用市场应用统计数据问题大揭秘
- UVM learning - object attribute of UVM phase
- 二叉树专题--AcWing 3540. 二叉搜索树建树(实用板子 构建二叉搜索树 并输出前、中、后序遍历)
- 1287_ Implementation analysis of prvtaskistasksuspended() interface in FreeRTOS
- 首份中国企业敏捷实践白皮书发布| 附完整下载
- 点云投影图片
- 软件产品管理系统有哪些?12个最佳产品管理工具盘点
- 2022-06-17
- Special topic of binary tree -- Logu p1229 traversal problem (the number of traversals in the middle order is calculated when the pre and post order traversals of the multiplication principle are know
猜你喜欢

快应用中实现自定义抽屉组件

Hdu1236 ranking (structure Sorting)

From Read and save in bag file Jpg pictures and PCD point cloud

HDU1234 开门人和关门人(水题)

一招快速实现自定义快应用titlebar

JSP webshell免杀——JSP的基础

The URL in the RTSP setup header of the axis device cannot take a parameter

Easyexcel, a concise, fast and memory saving excel processing tool

华为快应用中如何实现同时传递事件对象和自定义参数

Special topic of binary tree -- acwing 18 Rebuild the binary tree (construct the binary tree by traversing the front and middle order)
随机推荐
Read H264 parameters from mediarecord recording
计算序列之和
Record attributeerror: 'nonetype' object has no attribute 'nextcall‘
【AGC】构建服务3-认证服务示例
华为快应用中如何实现同时传递事件对象和自定义参数
Special topic of binary tree -- acwing 18 Rebuild the binary tree (construct the binary tree by traversing the front and middle order)
软件产品管理系统有哪些?12个最佳产品管理工具盘点
Hdu1236 ranking (structure Sorting)
洛谷 P4281 [AHOI2008]紧急集合 / 聚会(树上倍增 LCA)
二叉树专题--AcWing 47. 二叉树中和为某一值的路径(前序遍历)
Convert yv12 to rgb565 image conversion, with YUV to RGB test
二叉树专题--AcWing 19. 二叉树的下一个节点(找树中节点的后继)
Open the encrypted SQLite method with sqlcipher
《实习报告》Skywalking分布式链路追踪?
LeetCode+ 76 - 80 暴搜专题
长投学堂上面的账户安全吗?
JSP webshell free -- the basis of JSP
主键策略问题
PCL point cloud to depth image
One trick to quickly realize custom application titlebar