当前位置:网站首页>Flink SQL knows why (XI): weight removal is not only count distinct, but also powerful duplication
Flink SQL knows why (XI): weight removal is not only count distinct, but also powerful duplication
2022-07-03 13:10:00 【Big data sheep said】
What do you think , Baby , Not three in a row ???( Focus on + give the thumbs-up + Look again ), Affirmation of bloggers , It will urge bloggers to continuously output more high-quality practical content !!!
1. Preface
Source official account back office reply 1.13.2 deduplication The wonderful way of analysis obtain .
The following is the article directory , It also corresponds to the conclusion of this paper , Small partners can see the conclusion first and quickly understand what help the blogger expects this article to bring to small partners :
Introduction to background and application scenarios : Bloggers expect you to know ,flink sql Of deduplication In fact, that is row_number = 1, So it can be heavy at the same time , It can also retain the original field data
A practical case : Bloggers report repeated scenes with a log , To lead to the following flink sql deduplication Solution
be based on Deduplication Solution and principle analysis of : Bloggers expect you to know ,deduplication in , When row_number order by proctime( The processing time ) The principle of weight removal is to give each partition key Maintain a value state. If at present value state Not empty , shows id I've been here , At present, this data does not need to be distributed . If value state It's empty , be id Not yet... Not yet , hold value state After marking , Distribute the current data .
Summary and prospect
2. Introduction to background and application scenarios
Have you ever encountered a scene :
Because the data sent from the upstream is duplicate or the log source data is repeatedly reported , Lead to downstream calculation count,sum Time is too much
Want to do the same time of de recalculation , All fields of the original table can be retained and distributed normally
So what solutions can you think of ?
A small partner familiar with offline computing may soon be able to give the answer . you 're right ,hive sql Medium row_number = 1.flink sql It also provides as like as two peas. ,xdm, Solve this problem perfectly .
Let's start the official chapter .
3. A practical case
First, let's take a real case to see in the scenario of specific input values , What the output value should look like .
scene : The fields of buried point data reporting are id( Identify the only log ),timestamp( Event timestamp ),page( The current page at which the event occurred ),param1,param2,paramN…. However, when the log is reported, the log report is repeated due to some mechanisms , There are many downstream , Therefore, we need to do a weight removal , Downstream to consume heavy data .
A wave of input data :
id | timestamp | page | param1 | param2 | paramN |
---|---|---|---|---|---|
1 | 2021-11-01 00:01:00 | A | xxx1 | xxx2 | xxxN |
1 | 2021-11-01 00:01:00 | A | xxx1 | xxx2 | xxxN |
2 | 2021-11-01 00:01:00 | A | xxx3 | xxx2 | xxxN |
2 | 2021-11-01 00:01:00 | A | xxx3 | xxx2 | xxxN |
3 | 2021-11-01 00:03:00 | C | xxx5 | xxx2 | xxxN |
The second and fourth articles are the repeatedly reported data , The expected output data are as follows :
id | timestamp | page | param1 | param2 | paramN |
---|---|---|---|---|---|
1 | 2021-11-01 00:01:00 | A | xxx1 | xxx2 | xxxN |
2 | 2021-11-01 00:01:00 | A | xxx3 | xxx2 | xxxN |
3 | 2021-11-01 00:03:00 | C | xxx5 | xxx2 | xxxN |
4. be based on Deduplication Solution and principle analysis of
4.1.sql How to write it
Or the above case , Let's look at the final sql How to write :
select id,
timestamp,
page,
param1,
param2,
paramN
from (
SELECT
id,
timestamp,
page,
param1,
param2,
paramN
-- proctime Represents the processing time, i.e source In the table PROCTIME()
row_number() over(partition by id order by proctime) as rn
FROM source_table
)
where rn = 1
above sql It should be well understood . Because we don't care about the time of duplicate data reporting , So we can use it directly here order by proctime
To deal with , Go to the first item according to the time before and after the data .
4.2.proctime Next flink Generated operator graph and sql Operator semantics
The operator diagram is as follows :
deduplication
source operator :source adopt keyby The way to deduplication When the operator sends data , among keyby Of key Namely sql Medium id
deduplication operator :deduplication Operator for each partition key All maintained a value state Used to remove heavy . Every time a piece of data comes, it will start from the current partition key Of value state To get value, If it's not empty , It means that there has been data , The current data is duplicate data , You won't send it to the downstream operator , If it is empty , It means that there is no data before , The current data is the first data , The current value state Value is set to true, Send data to downstream operator
4.3.proctime Next deduplication Principle analysis
The specific de duplication operator is deduplication. We go through transformation You can see that the de duplication operator is shown in the figure below :
transformation
The above de duplication logic focuses on org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepFirstRowFunction
Of processFirstRowOnProcTime
, As shown in the figure below :
ProcTimeDeduplicateKeepFirstRowFunction
5. Summary and prospect
Source official account back office reply 1.13.2 deduplication The wonderful way of analysis obtain .
This paper mainly introduces deduplication Application scenario and its operation principle , It mainly includes the following two parts :
Introduction to background and application scenarios : Bloggers expect you to know ,flink sql Of deduplication In fact, that is row_number = 1, So it can be heavy at the same time , It can also retain the original field data
A practical case : Bloggers report repeated scenes with a log , To lead to the following flink sql deduplication Solution
be based on Deduplication Solution and principle analysis of : Bloggers expect you to know ,deduplication in , When row_number order by proctime( The processing time ) The principle of weight removal is to give each partition key Maintain a value state. If at present value state Not empty , shows id I've been here , At present, this data does not need to be distributed . If value state It's empty , be id Not yet... Not yet , hold value state After marking , Distribute the current data .
Summary and prospect
边栏推荐
- 解决 System has not been booted with systemd as init system (PID 1). Can‘t operate.
- Slf4j log facade
- 【R】 [density clustering, hierarchical clustering, expectation maximization clustering]
- [Database Principle and Application Tutorial (4th Edition | wechat Edition) Chen Zhibo] [Chapter III exercises]
- C graphical tutorial (Fourth Edition)_ Chapter 18 enumerator and iterator: enumerator samplep340
- ncnn神经网络计算框架在香橙派OrangePi 3 LTS开发板中的使用介绍
- [Database Principle and Application Tutorial (4th Edition | wechat Edition) Chen Zhibo] [Chapter 6 exercises]
- 基于Linu开发的项目视频
- C graphical tutorial (Fourth Edition)_ Chapter 20 asynchronous programming: examples - cases without asynchronous
- Finite State Machine FSM
猜你喜欢
基于同步坐标变换的谐波电流检测
[problem exploration and solution of one or more filters or listeners failing to start]
OpenHarmony应用开发之ETS开发方式中的Image组件
Method overloading and rewriting
【数据库原理复习题】
Finite State Machine FSM
解决 System has not been booted with systemd as init system (PID 1). Can‘t operate.
如何在微信小程序中获取用户位置?
【R】 [density clustering, hierarchical clustering, expectation maximization clustering]
Social community forum app ultra-high appearance UI interface
随机推荐
GaN图腾柱无桥 Boost PFC(单相)七-PFC占空比前馈
[Database Principle and Application Tutorial (4th Edition | wechat Edition) Chen Zhibo] [Chapter III exercises]
[Database Principle and Application Tutorial (4th Edition | wechat Edition) Chen Zhibo] [Chapter IV exercises]
Useful blog links
Sword finger offer 14- ii Cut rope II
关于CPU缓冲行的理解
[colab] [7 methods of using external data]
SQL learning notes (I)
Brief introduction to mvcc
Loan calculator my pressure is high
[Database Principle and Application Tutorial (4th Edition | wechat Edition) Chen Zhibo] [sqlserver2012 comprehensive exercise]
Mysqlbetween implementation selects the data range between two values
[exercise 7] [Database Principle]
Solve system has not been booted with SYSTEMd as init system (PID 1) Can‘t operate.
Node.js: express + MySQL的使用
The foreground uses RSA asymmetric security to encrypt user information
Cadre de logback
Elk note 24 -- replace logstash consumption log with gohangout
elk笔记24--用gohangout替代logstash消费日志
Quickly learn member inner classes and local inner classes