当前位置:网站首页>海量数据!秒级分析!Flink+Doris构建实时数仓方案
海量数据!秒级分析!Flink+Doris构建实时数仓方案
2022-06-27 13:51:00 【InfoQ】
一、背景

二、Doris基本原理

三、技术框架

四、实践
4.1建表
CREATE TABLE IF NOT EXISTS table_1
(
key1 varchar(32),
key2 varchar(32),
key3 varchar(32),
value1 int,
value2 varchar(128),
value3 Decimal(20, 6),
data_deal_datetime DateTime COMMENT '数据处理时间',
data_status INT COMMENT '数据是否删除,1表示正常,-1表示数据已经删除'
)
ENGINE=OLAP
UNIQUE KEY(`key1`,`key2`,`key3`)
COMMENT "xxx"
DISTRIBUTED BY HASH(`key2`) BUCKETS 32
PROPERTIES (
"storage_type"="column",
"replication_num" = "3",
"function_column.sequence_type" = 'DateTime'
);4.2数据导入任务
CREATE ROUTINE LOAD database.table1 ON table1
COLUMNS(key1,key2,key3,value1,value2,value3,data_deal_datetime,data_status),
ORDER BY data_deal_datetime
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "10",
"max_batch_rows" = "500000",
"max_batch_size" = "209715200",
"format" = "json",
"json_root" = "$.data",
"jsonpaths"="[\"$.key1\",\"$.key2\",\"$.key3\",\"$.value1\",\"$.value2\",
\"$.value3\",\"$.data_deal_datetime\",\"$.data_status\"]"
)FROM KAFKA
(
"kafka_broker_list"="broker1_ip:port1,broker2_ip:port2,broker3_ip:port3",
"kafka_topic"="topic_name",
"property.group.id"="group_id",
"property.kafka_default_offsets"="OFFSET_BEGINNING"
);4.3任务监控与报警
import pymysql #导入 pymysql
import requests,json
#打开数据库连接
db= pymysql.connect(host="host",user="user",
password="passwd",db="database",port=port)
# 使用cursor()方法获取操作游标
cur = db.cursor()
#1.查询操作
# 编写sql 查询语句
sql = "show routine load"
cur.execute(sql) #执行sql语句
results = cur.fetchall() #获取查询的所有记录
for row in results :
name = row[1]
state = row[7]
if state != 'RUNNING':
err_log_urls = row[16]
reason_state_changed = row[15]
msg = "doris 数据导入任务异常:\n name=%s \n state=%s \n reason_state_changed=%s \n err_log_urls=%s \n即将自动恢复,请检查错误信息" % (name, state,
reason_state_changed, err_log_urls)
payload_message = {
"msg_type": "text",
"content": {
"text": msg
}
}
url = 'lark 报警url'
s = json.dumps(payload_message)
r = requests.post(url, data=s)
cur.execute("resume routine load for " + name)
cur.close()
db.close()4.4数据模型
4.4.1Aggregate模型
CREATE TABLE tmp_table_1
(
user_id varchar(64) COMMENT "用户id",
channel varchar(64) COMMENT "用户来源渠道",
city_code varchar(64) COMMENT "用户所在城市编码",
last_visit_date DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
total_cost BIGINT SUM DEFAULT "0" COMMENT "用户总消费"
)
ENGINE=OLAP
AGGREGATE KEY(user_id, channel, city_code)
DISTRIBUTED BY HASH(user_id) BUCKETS 6
PROPERTIES("storage_type"="column","replication_num" = "1");insert into tmp_table_1 values('suh_001','JD','001','2022-01-01 00:00:01','57');
insert into tmp_table_1 values('suh_001','JD','001','2022-02-01 00:00:01','76');
insert into tmp_table_1 values('suh_001','JD','001','2022-03-01 00:00:01','107');
4.4.2Unique 模型
4.4.3Duplicate 模型
CREATE TABLE tmp_table_2
(
user_id varchar(64) COMMENT "用户id",
channel varchar(64) COMMENT "用户来源渠道",
city_code varchar(64) COMMENT "用户所在城市编码",
visit_date DATETIME COMMENT "用户登陆时间",
cost BIGINT COMMENT "用户消费金额"
)
ENGINE=OLAP
DUPLICATE KEY(user_id, channel, city_code)
DISTRIBUTED BY HASH(user_id) BUCKETS 6
PROPERTIES("storage_type"="column","replication_num" = "1");insert into tmp_table_2 values('suh_001','JD','001','2022-01-01 00:00:01','57');
insert into tmp_table_2 values('suh_001','JD','001','2022-02-01 00:00:01','76');
insert into tmp_table_2 values('suh_001','JD','001','2022-03-01 00:00:01','107');

4.4.4数据模型的选择建议
五、总结
六、参考资料
关于领创集团(Advance Intelligence Group)
往期回顾 BREAK AWAY
边栏推荐
- Number of printouts (solved by recursive method)
- [PHP code injection] common injectable functions of PHP language and utilization examples of PHP code injection vulnerabilities
- Kotlin函数使用示例教程
- awk 简明教程
- 【mysql进阶】MTS主从同步原理及实操指南(七)
- Shell 简明教程
- Naacl 2022 | TAMT: search the transportable Bert subnet through downstream task independent mask training
- Interviewer: do you understand redis' shared object pool?
- OpenSSF安全计划:SBOM将驱动软件供应链安全
- Learning records of numpy Library
猜你喜欢

Semaphore of thread synchronization

Tsinghua & Shangtang & Shanghai AI & CUHK proposed Siamese image modeling, which has both linear probing and intensive prediction performance

Implementing springboard agent through SSH port forwarding configuration

enable_ if

Too many requests at once, and the database is in danger

How to set postman to Chinese? (Chinese)

深入理解位运算

Deep understanding of bit operations

Pytoch learning 2 (CNN)

How to set the compatibility mode of 360 speed browser
随机推荐
enable_if
enable_ if
Redis master-slave replication, sentinel mode, cluster cluster
Redis 主从复制、哨兵模式、Cluster集群
Pytorch learning 3 (test training model)
现在开户有优惠吗?网上开户是否安全么?
一次性彻底解决 Web 工程中文乱码问题
做一篇人人能搞懂的ThreadLocal(源码)
力扣 第 81 场双周赛
What if the win system cannot complete the update and is revoking the status change
每日3题(1):找到最近的有相同 X 或 Y 坐标的点
Tsinghua & Shangtang & Shanghai AI & CUHK proposed Siamese image modeling, which has both linear probing and intensive prediction performance
Openssf security plan: SBOM will drive software supply chain security
请求一下子太多了,数据库危
How to solve the problem of missing language bar in win10 system
命令行编辑器 sed 基础用法总结
CMOS级电路分析
海外仓知识科普
如何使用200行代码实现Scala的对象转换器
基于 Nebula Graph 构建百亿关系知识图谱实践