当前位置:网站首页>Record - step on the pit - real-time data warehouse development - doris/pg/flink
Record - step on the pit - real-time data warehouse development - doris/pg/flink
2022-07-29 07:04:00 【cg6】
Step on the pit - Real time data warehouse development - doris/pg/flink
- Business scenario description - Real time data warehouse
- Description of the environment
- debezium - flink - doris Framework implementations 【 Step on the pit 】
- debezium - flink - PostgreSQL Framework implementations
- doris Use summary
- Try to reduce delete/insert operation
- As key Field cannot be null
- UNIQUE KEY Patterns of rollup Support for
- AGGREGATE KEY Patterns of rollup Support for
- AGGREGATE KEY Pattern - verification insert/delete
- doris UNIQUE KEY Pattern - The inquiry is not accurate
- Don't operate it frequently alter
- string/text/float/double Can't do key,date do key Data exception
- Special string parsing
- sql The error reporting is not accurate
- datediff Function parameter passing description
- Add - flink dependent
Business scenario description - Real time data warehouse
- Query performance is on the millisecond level
- Business data can Modify at will 、 Delete data Wait for the operation
- Index calculation requirements involve associated queries across multiple tables ,4 individual union all [ namely 5 individual select], Involving large tables json String parsing and calculation
- There is json String parsing , And the parsed data is aggregated , Then do the difference
Description of the environment
doris 1.1
debezium - flink - doris Framework implementations 【 Step on the pit 】
doris - UNIQUE KEY Deployment mode
Realize the idea :
flink consumption kafka Detailed data to doris, The table model is UNIQUE KEY Pattern , Wrap with normal view ETL Logic , For external inquiry ; But after verification, it is found that the query performance is at the second level , Try to optimize .
For some slow sql The optimization attempt of :
- Use rollup Single table aggregation in advance , However, partial primary keys are not supported
Screenshot - To be added - Optimize table prefix index , Query performance has improved , But it's still in the second range
- hold json The string parsing function is completed in advance, and the irrelevant fields of the table are deleted , Re execute the query , Single performance has Significantly improve , It reaches the millisecond level ; Integrate all sql, The execution time is also on the millisecond level ; Re optimize the model and re run the task , Re execution sql, The heart is tired of returning to the second level [ Because there are many mixed services deployed on the server , Environmental instability , Lead to sql Query performance is not stable ]
remarks : The deadline for troubleshooting and optimization of this mode , Change your mind : Give Way flink Not only do synchronization , Also do ETL, So the following pattern
doris - AGGREGATE KEY Deployment mode
Realize the idea :
flink consumption kafka Detailed data , At the same time, put in doris The calculation logic of is flink In the task , Resynchronize data to doris, The table model is AGGREGATE KEY Pattern , It is also wrapped in a normal view ETL Logic , For external inquiry ; At this time, the amount of data is small , It must meet the query performance .
remarks : After deploying the task, I found doris There is a problem with data rollback and data writing 【 The data will double , And may not be able to retreat 】
Analysis of reasons for doubling data :
flink It's a stateful calculation , The output of each time is the historical state + After logical calculation of the data in the current state sink To the target table , because doris Of AGGREGATE KEY The pattern is based on key Do aggregation , Which leads to doris Table data doubled
Cause analysis of abnormal data rollback : - To be verified
doris - UNIQUE KEY Deployment mode 【 Revalidation 】
Deploy the above flink sink to pg Mission , After the test data is accurate , To recall doris UNIQUE KEY Mode principle and pg It should be similar , After redeployment doris UNIQUE KEY Schema Validation , This mode also meets the requirements , I neglected it before because of time tension doirs UNIQUE KEY Mode principle .【 The realization idea is the same as debezium - flink - PostgreSQL】
Verified as follows :
1、 Table building statements and pg The statement of creating a table is basically the same , Set up key Field
2、flink ETL Aggregate code rewrite sink to doris , Verify data and sink to pg The data of the table is consistent
remarks : Take a look at the summary key Field problems
debezium - flink - PostgreSQL Framework implementations
# Realize the idea :
flink consumption kafka Detailed data , At the same time ETL Computational logic in flink In the task , Resynchronize data to pg, Then wrap it in a normal view ETL Logic , For external inquiry ; At this time, the amount of data is small ,pg It can also meet the query performance . 【 Just put doris The database is replaced by PostgreSQL database , The rest is the same 】
doris Use summary
Try to reduce delete/insert operation
1、delete/insert operation , Slow performance ; It is suggested to reduce deleted/insert Operate or increase batch writing .
As key Field cannot be null
- it is to be noted that key Field cannot be empty , Some business scenarios may serve as key Field is null , Data will fail to be written .
key Field is null An error is as follows :
Column ‘birth_date’ is NOT NULL, however, a null value is being written into it. You can set job configuration ‘table.exec.sink.not-null-enforcer’=‘drop’ to suppress this exception and drop such records silently.
UNIQUE KEY Patterns of rollup Support for
CREATE TABLE XXXX (
event_id bigint(20) NULL COMMENT "",
animal_id bigint(20) NULL COMMENT "",
org_id bigint(20) NULL COMMENT "",
pig_num bigint(20)
) ENGINE=OLAP
UNIQUE KEY(event_id, animal_id, org_id)
COMMENT "OLAP"
DISTRIBUTED BY HASH(org_id) BUCKETS 32
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
);
perform :
ALTER TABLE DB.XXX ADD ROLLUP r1(org_id, pig_num);
Error message : SQL error [1105] [HY000]: errCode = 2, detailMessage = Rollup should contains all unique keys in basetable
Add all primary keys , No problem :
ALTER TABLE test_bigdata_realtime_metrics.dwd_fpf_anc_event_list0727 ADD ROLLUP r1(event_id,org_id,animal_id, pig_num);
The official website said it was supported , If you are interested, you can check :https://doris.apache.org/zh-CN/docs/data-table/hit-the-rollup/#aggregate-%E5%92%8C-unique-%E6%A8%A1%E5%9E%8B%E4%B8%AD%E7%9A%84-rollup
Conclusion :UNIQUE KEY Mode does not support single primary key rollup, All primary keys of the table must be included
AGGREGATE KEY Patterns of rollup Support for
CREATE TABLE db.XXX (
event_id bigint(20) NULL COMMENT "",
animal_id bigint(20) NULL COMMENT "",
org_id bigint(20) NULL COMMENT "",
pig_num bigint(20) SUM
) ENGINE=OLAP
Aggregate KEY(event_id, animal_id, org_id)
COMMENT "OLAP"
DISTRIBUTED BY HASH(org_id) BUCKETS 32
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
);
Use a single primary key to create ROLLUP :
ALTER TABLE db.XXX ADD ROLLUP r1(org_id , pig_num);
Conclusion : Successful implementation , Support single primary key
AGGREGATE KEY Pattern - verification insert/delete
Create table statement
CREATE TABLE `dws_fmc_piglet_got_confirm_agg` (
`tenant_id` bigint(20) NULL COMMENT "",
`got_date` varchar(255) NULL COMMENT "",
`pig_types` varchar(255) NULL COMMENT "",
`pig_type` varchar(255) NULL COMMENT "",
`pig_num` int(11) REPLACE NULL COMMENT "",
`compute_time` datetime MAX NULL COMMENT "",
) ENGINE=OLAP
AGGREGATE KEY(`tenant_id`, `got_date`, `pig_types`, `pig_type`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`tenant_id`) BUCKETS 32
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
)
** Conclusion :** Data can be normally retracted , The principle is the same as UNIQUE KEY Pattern .【UNIQUE KEY The pattern is AGGREGATE KEY Just a special case of mode 】
doris UNIQUE KEY Pattern - The inquiry is not accurate
1、 Inaccurate data query : It is uncertain whether it is caused by the instability of the background service process , After a few minutes, the data becomes consistent .
2、 The official website knows that the query is inaccurate and the solution :https://doris.apache.org/zh-CN/docs/faq/sql-faq#q4-unique-key-%E6%A8%A1%E5%9E%8B%E6%9F%A5%E8%AF%A2%E7%BB%93%E6%9E%9C%E4%B8%8D%E4%B8%80%E8%87%B4
Don't operate it frequently alter
Create multiple ROLLUP:
ALTER TABLE db.XXX ADD ROLLUP r1(col1, col4);
ALTER TABLE db.XXX ADD ROLLUP r2(col3, col5);
Error message :
SQL error [1105] [HY000]: errCode = 2, detailMessage = Table[db.XXX]'s state is not NORMAL. Do not allow doing ALTER ops
string/text/float/double Can't do key,date do key Data exception
doris edition :1.1
1、key Field cannot be text、string , Otherwise, the table creation fails
2、key Some fields in are date type ,flink sink to doris B surface ,date Most of the data in the type field becomes null
Create table statement :
CREATE TABLE XXXX (
t_id bigint(20) NULL COMMENT "",
f_id bigint(20) NULL COMMENT "",
a_id bigint(20) NULL COMMENT "",
birth_str varchar(255) NULL COMMENT "",
birth_date date NULL COMMENT "",
pig_types varchar(255) NULL COMMENT "",
pig_type varchar(255) NULL COMMENT "",
pig_num int(11) NULL COMMENT "",
compute_time datetime NULL COMMENT "",
) ENGINE=OLAP
UNIQUE KEY(t_id, f_id, a_id, birth_str, birth_date, pig_types, pig_type)
COMMENT "OLAP"
DISTRIBUTED BY HASH(t_id) BUCKETS 32
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
)
flink ETL sql:
SELECT
basic.t_id ,
basic.f_id,
basic.a_id,
cast(basic.birth_date as varchar) as birth_str,
basic.birth_date,
'a' AS pig_types,
'b' AS pig_type,
1 as pig_num,
localtimestamp as compute_time
FROM basic
WHERE
basic.birth_date is not null
Query results :
Special string parsing
1、json String parsing is not particularly fast , It is suggested that you want higher performance , Resolve in advance json ,doris Only do logical calculations 【 Probably Direct calculation is in milliseconds , Parsing a special string for calculation may turn into seconds , Not absolute 】
sql The error reporting is not accurate
Erroneous sql as follows :
SELECT
basic.tenant_id AS tenant_id,
basic.farm_id AS farm_id,
CASE
WHEN datediff(now() , cast(birth_time as datetime)) between 0 and 30 THEN '0-30 Age of day '
WHEN datediff(now(), cast(birth_time as datetime)) between 31 and 60 THEN '31-60 Age of day '
WHEN datediff(now(), cast(birth_time as datetime)) between 61 and 90 THEN '61-90 Age of day '
WHEN datediff(now(), cast(birth_time as datetime)) between 91 and 120 THEN '91-120 Age of day '
WHEN datediff(now(), cast(birth_time as datetime)) between 121 and 150 THEN '121-150 Age of day '
ELSE '150 More than days old '
END AS pig_sub_type
,sum(1)
FROM basic
group by
basic.tenant_id ,
basic.farm_id,
CASE
WHEN datediff(now(), cast(birth_time as datetime)) between 0 and 30 THEN '0-30 Age of day '
WHEN datediff(now(), cast(birth_time as datetime)) between 31 and 60 THEN '31-60 Age of day '
WHEN datediff(now(), cast(birth_time as datetime)) between 61 and 90 THEN '61-90 Age of day '
WHEN datediff(now(), cast(birth_time as datetime)) between 91 and 120 THEN '91-120 Age of day '
WHEN datediff(now(), cast(birth_time as datetime)) between 121 and 150 THEN '121-150 Age of day '
ELSE '150 More than days old '
END
Error message :select list expression not produced by aggregation output (missing from GROUP BY clause?): CASE WHEN datediff(‘2022-07-27 10:06:44’, CAST(birth_time AS DATETIME)) >= 0
Actual error reason : yes datediff The parameter type passed in the function is wrong , The two parameters must be datetime type ;
Such as :select datediff(CAST(‘2007-12-31 23:59:59’ AS DATETIME), CAST(‘2007-12-30’ AS DATETIME));
sql Amend to read :
SELECT
basic.tenant_id AS tenant_id,
basic.farm_id AS farm_id,
CASE
WHEN datediff(CURRENT_DATE() , cast(birth_time as datetime)) between 0 and 30 THEN '0-30 Age of day '
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 31 and 60 THEN '31-60 Age of day '
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 61 and 90 THEN '61-90 Age of day '
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 91 and 120 THEN '91-120 Age of day '
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 121 and 150 THEN '121-150 Age of day '
ELSE '150 More than days old '
END AS pig_sub_type
,sum(1)
FROM basic
group by
basic.tenant_id ,
basic.farm_id,
CASE
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 0 and 30 THEN '0-30 Age of day '
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 31 and 60 THEN '31-60 Age of day '
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 61 and 90 THEN '61-90 Age of day '
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 91 and 120 THEN '91-120 Age of day '
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 121 and 150 THEN '121-150 Age of day '
ELSE '150 More than days old '
END
datediff Function parameter passing description
- DATETIME DATEDIFF(DATETIME expr1,DATETIME expr2) , Note the parameter type , An error report does not necessarily indicate that the function parameter type is wrong
- Calculation expr1 - expr2, The result is accurate to days .
- expr1 and expr2 The parameter is a legal date or date / Time expression .
- notes : Only the date part of the value participates in the calculation .
- example:select datediff(CAST(‘2007-12-31 23:59:59’ AS DATETIME), CAST(‘2007-12-30’ AS DATETIME));
Add - flink dependent
flink ETL The current date in cannot trigger the calculation of the current date function again
Problem description :flink etl sql The current time is used in / date - Some time in the table record / Date fields , If a id The record of has been calculated in historical time , But take the difference of this time today or every day , In fact, this difference is the time difference from the recording time to the current time , Can lead to inconsistent data .
solve : actually , The solution has been written in the problem description . New record calculation time , There is no problem with the calculated date difference , Then the current time changes as time goes by , However flink etl The current time of the task is still the current time of history , Will no longer be triggered to calculate 【 Except that the record is modified 】, You need to use **( current time - Recorded calculation time ) + Calculated date difference = current time - Record the difference between the dates **
边栏推荐
- 竣达技术 | 适用于”日月元”品牌UPS微信云监控卡
- 线程 - 线程安全 - 线程优化
- Summary of 2022 SQL classic interview questions (with analysis)
- Teacher wangshuyao's notes on operations research course 08 linear programming and simplex method (simplex method)
- 王树尧老师运筹学课程笔记 06 线性规划与单纯形法(几何意义)
- 1172. 餐盘栈 有序列表+栈
- HJ37 统计每个月兔子的总数 斐波那契数列
- 模拟卷Leetcode【普通】172. 阶乘后的零
- Cesium reflection
- 好文佳句摘录
猜你喜欢
Jetpack Compose 中的键盘处理
竣达技术 | 适用于”日月元”品牌UPS微信云监控卡
Leetcode-592: fraction addition and subtraction
2D cartoon rendering - advanced skills
MutationObserver文档学习
Apisik health check test
Connecting PHP 7.4 to Oracle configuration on Windows
Improved pillar with fine grained feature for 3D object detection paper notes
Share some tips for better code, smooth coding and improve efficiency
城市花样精~侬好!DESIGN#可视化电台即将开播
随机推荐
pytorch的技巧记录
基于C语言设计的学生成绩排名系统
MySQL: what happens in the bufferpool when you crud? Ten pictures can make it clear
数据库持久化+JDBC数据库连接
Sword finger offer II 115: reconstruction sequence
王树尧老师运筹学课程笔记 07 线性规划与单纯形法(标准型、基、基解、基可行解、可行基)
Teacher wangshuyao's notes on operations research 05 linear programming and simplex method (concept, modeling, standard type)
Actual combat! Talk about how to solve the deep paging problem of MySQL
mysql查询区分大小写
Teacher wangshuyao's notes on operations research 03 KKT theorem
网上传说软件测试培训真的那么黑心吗?都是骗局?
ECCV 2022 lightweight model frame Parc net press apple mobilevit code and paper Download
Thread - thread safety - thread optimization
线程 - 线程安全 - 线程优化
Flink real-time warehouse DWD layer (processing complex data - installation and replacement of streams and tables) template code
330. 按要求补齐数组
Cvpr2022oral special series (I): low light enhancement
Simulation volume leetcode [ordinary] 172. Zero after factorial
记 - 踩坑-实时数仓开发 - doris/pg/flink
'function VTable for error: undefined reference to... 'cause and solution of the problem