当前位置:网站首页>Record - step on the pit - real-time data warehouse development - doris/pg/flink
Record - step on the pit - real-time data warehouse development - doris/pg/flink
2022-07-29 07:04:00 【cg6】
Step on the pit - Real time data warehouse development - doris/pg/flink
- Business scenario description - Real time data warehouse
- Description of the environment
- debezium - flink - doris Framework implementations 【 Step on the pit 】
- debezium - flink - PostgreSQL Framework implementations
- doris Use summary
- Try to reduce delete/insert operation
- As key Field cannot be null
- UNIQUE KEY Patterns of rollup Support for
- AGGREGATE KEY Patterns of rollup Support for
- AGGREGATE KEY Pattern - verification insert/delete
- doris UNIQUE KEY Pattern - The inquiry is not accurate
- Don't operate it frequently alter
- string/text/float/double Can't do key,date do key Data exception
- Special string parsing
- sql The error reporting is not accurate
- datediff Function parameter passing description
- Add - flink dependent
Business scenario description - Real time data warehouse
- Query performance is on the millisecond level
- Business data can Modify at will 、 Delete data Wait for the operation
- Index calculation requirements involve associated queries across multiple tables ,4 individual union all [ namely 5 individual select], Involving large tables json String parsing and calculation
- There is json String parsing , And the parsed data is aggregated , Then do the difference
Description of the environment
doris 1.1
debezium - flink - doris Framework implementations 【 Step on the pit 】
doris - UNIQUE KEY Deployment mode
Realize the idea :
flink consumption kafka Detailed data to doris, The table model is UNIQUE KEY Pattern , Wrap with normal view ETL Logic , For external inquiry ; But after verification, it is found that the query performance is at the second level , Try to optimize .
For some slow sql The optimization attempt of :
- Use rollup Single table aggregation in advance , However, partial primary keys are not supported
Screenshot - To be added - Optimize table prefix index , Query performance has improved , But it's still in the second range
- hold json The string parsing function is completed in advance, and the irrelevant fields of the table are deleted , Re execute the query , Single performance has Significantly improve , It reaches the millisecond level ; Integrate all sql, The execution time is also on the millisecond level ; Re optimize the model and re run the task , Re execution sql, The heart is tired of returning to the second level [ Because there are many mixed services deployed on the server , Environmental instability , Lead to sql Query performance is not stable ]
remarks : The deadline for troubleshooting and optimization of this mode , Change your mind : Give Way flink Not only do synchronization , Also do ETL, So the following pattern
doris - AGGREGATE KEY Deployment mode
Realize the idea :
flink consumption kafka Detailed data , At the same time, put in doris The calculation logic of is flink In the task , Resynchronize data to doris, The table model is AGGREGATE KEY Pattern , It is also wrapped in a normal view ETL Logic , For external inquiry ; At this time, the amount of data is small , It must meet the query performance .
remarks : After deploying the task, I found doris There is a problem with data rollback and data writing 【 The data will double , And may not be able to retreat 】
Analysis of reasons for doubling data :
flink It's a stateful calculation , The output of each time is the historical state + After logical calculation of the data in the current state sink To the target table , because doris Of AGGREGATE KEY The pattern is based on key Do aggregation , Which leads to doris Table data doubled
Cause analysis of abnormal data rollback : - To be verified
doris - UNIQUE KEY Deployment mode 【 Revalidation 】
Deploy the above flink sink to pg Mission , After the test data is accurate , To recall doris UNIQUE KEY Mode principle and pg It should be similar , After redeployment doris UNIQUE KEY Schema Validation , This mode also meets the requirements , I neglected it before because of time tension doirs UNIQUE KEY Mode principle .【 The realization idea is the same as debezium - flink - PostgreSQL】
Verified as follows :
1、 Table building statements and pg The statement of creating a table is basically the same , Set up key Field
2、flink ETL Aggregate code rewrite sink to doris , Verify data and sink to pg The data of the table is consistent
remarks : Take a look at the summary key Field problems
debezium - flink - PostgreSQL Framework implementations
# Realize the idea :
flink consumption kafka Detailed data , At the same time ETL Computational logic in flink In the task , Resynchronize data to pg, Then wrap it in a normal view ETL Logic , For external inquiry ; At this time, the amount of data is small ,pg It can also meet the query performance . 【 Just put doris The database is replaced by PostgreSQL database , The rest is the same 】
doris Use summary
Try to reduce delete/insert operation
1、delete/insert operation , Slow performance ; It is suggested to reduce deleted/insert Operate or increase batch writing .
As key Field cannot be null
- it is to be noted that key Field cannot be empty , Some business scenarios may serve as key Field is null , Data will fail to be written .
key Field is null An error is as follows :
Column ‘birth_date’ is NOT NULL, however, a null value is being written into it. You can set job configuration ‘table.exec.sink.not-null-enforcer’=‘drop’ to suppress this exception and drop such records silently.
UNIQUE KEY Patterns of rollup Support for
CREATE TABLE XXXX (
event_id bigint(20) NULL COMMENT "",
animal_id bigint(20) NULL COMMENT "",
org_id bigint(20) NULL COMMENT "",
pig_num bigint(20)
) ENGINE=OLAP
UNIQUE KEY(event_id, animal_id, org_id)
COMMENT "OLAP"
DISTRIBUTED BY HASH(org_id) BUCKETS 32
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
);
perform :
ALTER TABLE DB.XXX ADD ROLLUP r1(org_id, pig_num);
Error message : SQL error [1105] [HY000]: errCode = 2, detailMessage = Rollup should contains all unique keys in basetable
Add all primary keys , No problem :
ALTER TABLE test_bigdata_realtime_metrics.dwd_fpf_anc_event_list0727 ADD ROLLUP r1(event_id,org_id,animal_id, pig_num);
The official website said it was supported , If you are interested, you can check :https://doris.apache.org/zh-CN/docs/data-table/hit-the-rollup/#aggregate-%E5%92%8C-unique-%E6%A8%A1%E5%9E%8B%E4%B8%AD%E7%9A%84-rollup
Conclusion :UNIQUE KEY Mode does not support single primary key rollup, All primary keys of the table must be included
AGGREGATE KEY Patterns of rollup Support for
CREATE TABLE db.XXX (
event_id bigint(20) NULL COMMENT "",
animal_id bigint(20) NULL COMMENT "",
org_id bigint(20) NULL COMMENT "",
pig_num bigint(20) SUM
) ENGINE=OLAP
Aggregate KEY(event_id, animal_id, org_id)
COMMENT "OLAP"
DISTRIBUTED BY HASH(org_id) BUCKETS 32
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
);
Use a single primary key to create ROLLUP :
ALTER TABLE db.XXX ADD ROLLUP r1(org_id , pig_num);
Conclusion : Successful implementation , Support single primary key
AGGREGATE KEY Pattern - verification insert/delete
Create table statement
CREATE TABLE `dws_fmc_piglet_got_confirm_agg` (
`tenant_id` bigint(20) NULL COMMENT "",
`got_date` varchar(255) NULL COMMENT "",
`pig_types` varchar(255) NULL COMMENT "",
`pig_type` varchar(255) NULL COMMENT "",
`pig_num` int(11) REPLACE NULL COMMENT "",
`compute_time` datetime MAX NULL COMMENT "",
) ENGINE=OLAP
AGGREGATE KEY(`tenant_id`, `got_date`, `pig_types`, `pig_type`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`tenant_id`) BUCKETS 32
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
)
** Conclusion :** Data can be normally retracted , The principle is the same as UNIQUE KEY Pattern .【UNIQUE KEY The pattern is AGGREGATE KEY Just a special case of mode 】
doris UNIQUE KEY Pattern - The inquiry is not accurate
1、 Inaccurate data query : It is uncertain whether it is caused by the instability of the background service process , After a few minutes, the data becomes consistent .
2、 The official website knows that the query is inaccurate and the solution :https://doris.apache.org/zh-CN/docs/faq/sql-faq#q4-unique-key-%E6%A8%A1%E5%9E%8B%E6%9F%A5%E8%AF%A2%E7%BB%93%E6%9E%9C%E4%B8%8D%E4%B8%80%E8%87%B4
Don't operate it frequently alter
Create multiple ROLLUP:
ALTER TABLE db.XXX ADD ROLLUP r1(col1, col4);
ALTER TABLE db.XXX ADD ROLLUP r2(col3, col5);
Error message :
SQL error [1105] [HY000]: errCode = 2, detailMessage = Table[db.XXX]'s state is not NORMAL. Do not allow doing ALTER ops
string/text/float/double Can't do key,date do key Data exception
doris edition :1.1
1、key Field cannot be text、string , Otherwise, the table creation fails
2、key Some fields in are date type ,flink sink to doris B surface ,date Most of the data in the type field becomes null
Create table statement :
CREATE TABLE XXXX (
t_id bigint(20) NULL COMMENT "",
f_id bigint(20) NULL COMMENT "",
a_id bigint(20) NULL COMMENT "",
birth_str varchar(255) NULL COMMENT "",
birth_date date NULL COMMENT "",
pig_types varchar(255) NULL COMMENT "",
pig_type varchar(255) NULL COMMENT "",
pig_num int(11) NULL COMMENT "",
compute_time datetime NULL COMMENT "",
) ENGINE=OLAP
UNIQUE KEY(t_id, f_id, a_id, birth_str, birth_date, pig_types, pig_type)
COMMENT "OLAP"
DISTRIBUTED BY HASH(t_id) BUCKETS 32
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
)
flink ETL sql:
SELECT
basic.t_id ,
basic.f_id,
basic.a_id,
cast(basic.birth_date as varchar) as birth_str,
basic.birth_date,
'a' AS pig_types,
'b' AS pig_type,
1 as pig_num,
localtimestamp as compute_time
FROM basic
WHERE
basic.birth_date is not null
Query results :
Special string parsing
1、json String parsing is not particularly fast , It is suggested that you want higher performance , Resolve in advance json ,doris Only do logical calculations 【 Probably Direct calculation is in milliseconds , Parsing a special string for calculation may turn into seconds , Not absolute 】
sql The error reporting is not accurate
Erroneous sql as follows :
SELECT
basic.tenant_id AS tenant_id,
basic.farm_id AS farm_id,
CASE
WHEN datediff(now() , cast(birth_time as datetime)) between 0 and 30 THEN '0-30 Age of day '
WHEN datediff(now(), cast(birth_time as datetime)) between 31 and 60 THEN '31-60 Age of day '
WHEN datediff(now(), cast(birth_time as datetime)) between 61 and 90 THEN '61-90 Age of day '
WHEN datediff(now(), cast(birth_time as datetime)) between 91 and 120 THEN '91-120 Age of day '
WHEN datediff(now(), cast(birth_time as datetime)) between 121 and 150 THEN '121-150 Age of day '
ELSE '150 More than days old '
END AS pig_sub_type
,sum(1)
FROM basic
group by
basic.tenant_id ,
basic.farm_id,
CASE
WHEN datediff(now(), cast(birth_time as datetime)) between 0 and 30 THEN '0-30 Age of day '
WHEN datediff(now(), cast(birth_time as datetime)) between 31 and 60 THEN '31-60 Age of day '
WHEN datediff(now(), cast(birth_time as datetime)) between 61 and 90 THEN '61-90 Age of day '
WHEN datediff(now(), cast(birth_time as datetime)) between 91 and 120 THEN '91-120 Age of day '
WHEN datediff(now(), cast(birth_time as datetime)) between 121 and 150 THEN '121-150 Age of day '
ELSE '150 More than days old '
END
Error message :select list expression not produced by aggregation output (missing from GROUP BY clause?): CASE WHEN datediff(‘2022-07-27 10:06:44’, CAST(birth_time AS DATETIME)) >= 0
Actual error reason : yes datediff The parameter type passed in the function is wrong , The two parameters must be datetime type ;
Such as :select datediff(CAST(‘2007-12-31 23:59:59’ AS DATETIME), CAST(‘2007-12-30’ AS DATETIME));
sql Amend to read :
SELECT
basic.tenant_id AS tenant_id,
basic.farm_id AS farm_id,
CASE
WHEN datediff(CURRENT_DATE() , cast(birth_time as datetime)) between 0 and 30 THEN '0-30 Age of day '
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 31 and 60 THEN '31-60 Age of day '
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 61 and 90 THEN '61-90 Age of day '
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 91 and 120 THEN '91-120 Age of day '
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 121 and 150 THEN '121-150 Age of day '
ELSE '150 More than days old '
END AS pig_sub_type
,sum(1)
FROM basic
group by
basic.tenant_id ,
basic.farm_id,
CASE
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 0 and 30 THEN '0-30 Age of day '
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 31 and 60 THEN '31-60 Age of day '
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 61 and 90 THEN '61-90 Age of day '
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 91 and 120 THEN '91-120 Age of day '
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 121 and 150 THEN '121-150 Age of day '
ELSE '150 More than days old '
END
datediff Function parameter passing description
- DATETIME DATEDIFF(DATETIME expr1,DATETIME expr2) , Note the parameter type , An error report does not necessarily indicate that the function parameter type is wrong
- Calculation expr1 - expr2, The result is accurate to days .
- expr1 and expr2 The parameter is a legal date or date / Time expression .
- notes : Only the date part of the value participates in the calculation .
- example:select datediff(CAST(‘2007-12-31 23:59:59’ AS DATETIME), CAST(‘2007-12-30’ AS DATETIME));
Add - flink dependent
flink ETL The current date in cannot trigger the calculation of the current date function again
Problem description :flink etl sql The current time is used in / date - Some time in the table record / Date fields , If a id The record of has been calculated in historical time , But take the difference of this time today or every day , In fact, this difference is the time difference from the recording time to the current time , Can lead to inconsistent data .
solve : actually , The solution has been written in the problem description . New record calculation time , There is no problem with the calculated date difference , Then the current time changes as time goes by , However flink etl The current time of the task is still the current time of history , Will no longer be triggered to calculate 【 Except that the record is modified 】, You need to use **( current time - Recorded calculation time ) + Calculated date difference = current time - Record the difference between the dates **
边栏推荐
- 微信小程序的反编译
- LDAP brief description and unified authentication description
- Overview of database system
- Teacher wangshuyao's notes on operations research 04 fundamentals of linear algebra
- MVFuseNet:Improving End-to-End Object Detection and Motion Forecasting through Multi-View Fusion of
- Teacher Wu Enda's machine learning course notes 04 multiple linear regression
- IO stream - file - properties
- Teacher wangshuyao wrote the notes of operations research course 00 in the front
- SSH password free login - two virtual machines establish password free channel two-way trust
- 王树尧老师运筹学课程笔记 07 线性规划与单纯形法(标准型、基、基解、基可行解、可行基)
猜你喜欢
实战!聊聊如何解决MySQL深分页问题
Junda technology | applicable to "riyueyuan" brand ups wechat cloud monitoring card
Thread synchronization - producers and consumers, tortoise and rabbit race, dual thread printing
CVPR2022Oral专题系列(一):低光增强
IDEA中实现Mapper接口到映射文件xml的跳转
Connecting PHP 7.4 to Oracle configuration on Windows
上采样之反卷积操作
Unity exploration plot access design analysis & process + code specific implementation
MySQL:当你CRUD时BufferPool中发生了什么?十张图就能说清楚
SDN topology discovery principle
随机推荐
Database multi table query joint query add delete modify query
Teacher wangshuyao's notes on operations research course 10 linear programming and simplex method (discussion on detection number and degradation)
Junda technology | applicable to "riyueyuan" brand ups wechat cloud monitoring card
[CF1054H] Epic Convolution——数论,卷积,任意模数NTT
Invalid access control
Teacher wangshuyao's notes on operations research 03 KKT theorem
pytorch的技巧记录
Unity探索地块通路设计分析 & 流程+代码具体实现
模拟卷Leetcode【普通】061. 旋转链表
模拟卷Leetcode【普通】150. 逆波兰表达式求值
王树尧老师运筹学课程笔记 07 线性规划与单纯形法(标准型、基、基解、基可行解、可行基)
模拟卷Leetcode【普通】222. 完全二叉树的节点个数
微信小程序的反编译
C language memory stack and heap usage
好文佳句摘录
基于C语言设计的学生成绩排名系统
1172. 餐盘栈 有序列表+栈
我的创业邻居们
Salesforce中过滤器Filter使用的相对日期
IDEA找不到Database解决方法