当前位置:网站首页>Massive data! Second level analysis! Flink+doris build a real-time data warehouse scheme
Massive data! Second level analysis! Flink+doris build a real-time data warehouse scheme
2022-06-27 14:05:00 【InfoQ】
One 、 background

Two 、Doris The basic principle

3、 ... and 、 Technical framework

Four 、 practice
4.1 Build table
CREATE TABLE IF NOT EXISTS table_1
(
key1 varchar(32),
key2 varchar(32),
key3 varchar(32),
value1 int,
value2 varchar(128),
value3 Decimal(20, 6),
data_deal_datetime DateTime COMMENT ' Data processing time ',
data_status INT COMMENT ' Whether the data is deleted ,1 Is normal ,-1 Indicates that the data has been deleted '
)
ENGINE=OLAP
UNIQUE KEY(`key1`,`key2`,`key3`)
COMMENT "xxx"
DISTRIBUTED BY HASH(`key2`) BUCKETS 32
PROPERTIES (
"storage_type"="column",
"replication_num" = "3",
"function_column.sequence_type" = 'DateTime'
);4.2 Data import task
CREATE ROUTINE LOAD database.table1 ON table1
COLUMNS(key1,key2,key3,value1,value2,value3,data_deal_datetime,data_status),
ORDER BY data_deal_datetime
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "10",
"max_batch_rows" = "500000",
"max_batch_size" = "209715200",
"format" = "json",
"json_root" = "$.data",
"jsonpaths"="[\"$.key1\",\"$.key2\",\"$.key3\",\"$.value1\",\"$.value2\",
\"$.value3\",\"$.data_deal_datetime\",\"$.data_status\"]"
)FROM KAFKA
(
"kafka_broker_list"="broker1_ip:port1,broker2_ip:port2,broker3_ip:port3",
"kafka_topic"="topic_name",
"property.group.id"="group_id",
"property.kafka_default_offsets"="OFFSET_BEGINNING"
);4.3 Task monitoring and alarm
import pymysql # Import pymysql
import requests,json
# Open database connection
db= pymysql.connect(host="host",user="user",
password="passwd",db="database",port=port)
# Use cursor() Method get operation cursor
cur = db.cursor()
#1. Query operation
# To write sql Query statement
sql = "show routine load"
cur.execute(sql) # perform sql sentence
results = cur.fetchall() # Get all the records of the query
for row in results :
name = row[1]
state = row[7]
if state != 'RUNNING':
err_log_urls = row[16]
reason_state_changed = row[15]
msg = "doris The data import task is abnormal :\n name=%s \n state=%s \n reason_state_changed=%s \n err_log_urls=%s \n About to automatically resume , Please check the error message " % (name, state,
reason_state_changed, err_log_urls)
payload_message = {
"msg_type": "text",
"content": {
"text": msg
}
}
url = 'lark Call the police url'
s = json.dumps(payload_message)
r = requests.post(url, data=s)
cur.execute("resume routine load for " + name)
cur.close()
db.close()4.4 Data model
4.4.1Aggregate Model
CREATE TABLE tmp_table_1
(
user_id varchar(64) COMMENT " user id",
channel varchar(64) COMMENT " User source channel ",
city_code varchar(64) COMMENT " User's city code ",
last_visit_date DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT " Last time the user visited ",
total_cost BIGINT SUM DEFAULT "0" COMMENT " Total user consumption "
)
ENGINE=OLAP
AGGREGATE KEY(user_id, channel, city_code)
DISTRIBUTED BY HASH(user_id) BUCKETS 6
PROPERTIES("storage_type"="column","replication_num" = "1");insert into tmp_table_1 values('suh_001','JD','001','2022-01-01 00:00:01','57');
insert into tmp_table_1 values('suh_001','JD','001','2022-02-01 00:00:01','76');
insert into tmp_table_1 values('suh_001','JD','001','2022-03-01 00:00:01','107');
4.4.2Unique Model
4.4.3Duplicate Model
CREATE TABLE tmp_table_2
(
user_id varchar(64) COMMENT " user id",
channel varchar(64) COMMENT " User source channel ",
city_code varchar(64) COMMENT " User's city code ",
visit_date DATETIME COMMENT " User login time ",
cost BIGINT COMMENT " User consumption amount "
)
ENGINE=OLAP
DUPLICATE KEY(user_id, channel, city_code)
DISTRIBUTED BY HASH(user_id) BUCKETS 6
PROPERTIES("storage_type"="column","replication_num" = "1");insert into tmp_table_2 values('suh_001','JD','001','2022-01-01 00:00:01','57');
insert into tmp_table_2 values('suh_001','JD','001','2022-02-01 00:00:01','76');
insert into tmp_table_2 values('suh_001','JD','001','2022-03-01 00:00:01','107');

4.4.4 Suggestions on the selection of data model
5、 ... and 、 summary
6、 ... and 、 Reference material
About lingchuang group (Advance Intelligence Group)
Looking back BREAK AWAY
边栏推荐
- At a time of oversupply of chips, China, the largest importer, continued to reduce imports, and the United States panicked
- MySQL 索引及其分类
- 为什么 Oracle 云客户必须在Oracle Cloud 季度更新发布后自行测试?
- POSIX AIO -- glibc 版本异步 IO 简介
- crane:字典项与关联数据处理的新思路
- Number of printouts (solved by recursive method)
- JVM parameter setting and analysis
- 阅读别人的代码,是一种怎样的体验
- 招标公告:暨南大学附属第一医院Oracle数据库维保服务采购
- High efficiency exponentiation
猜你喜欢

High efficiency exponentiation

【业务安全-02】业务数据安全测试及商品订购数量篡改实例

CMOS级电路分析

高效率取幂运算
![[advanced MySQL] MTS master-slave synchronization principle and Practice Guide (7)](/img/d6/1b916f49ad02dee4ab2c26add324df.png)
[advanced MySQL] MTS master-slave synchronization principle and Practice Guide (7)

基于 Nebula Graph 构建百亿关系知识图谱实践

Openssf security plan: SBOM will drive software supply chain security

How to set the compatibility mode of 360 speed browser

The second part of the travel notes of C (Part II) structural thinking: Zen is stable; all four advocate structure

AXI总线
随机推荐
【PHP代码注入】PHP语言常见可注入函数以及PHP代码注入漏洞的利用实例
【微服务|Sentinel】热点规则|授权规则|集群流控|机器列表
【每日3题(3)】盒子中小球的最大数量
The second part of the travel notes of C (Part II) structural thinking: Zen is stable; all four advocate structure
American chips are hit hard again, and another chip enterprise after Intel will be overtaken by Chinese chips
基于 xml 配置文件的入门级 SSM 框架整合
【问题解决】Tensorflow中run究竟运行了哪些节点?
How to split microservices
Library management system
CCID Consulting released the database Market Research Report on key application fields during the "14th five year plan" (attached with download)
关于接口测试自动化的总结与思考
招标公告:上海市研发公共服务平台管理中心Oracle一体机软硬件维保项目
Resolve activity startup - lifecycle Perspective
AcWing 第57 场周赛
In the past, domestic mobile phones were arrogant in pricing and threatened that consumers would like to buy or not, but now they have plummeted by 2000 for sale
Gaode map IP positioning 2.0 backup
crane:字典项与关联数据处理的新思路
Kyndryl partnered with Oracle and Veritas
【高等数学】从法向量到第二类曲面积分
Practice of constructing ten billion relationship knowledge map based on Nebula graph