当前位置:网站首页>应用实践 | 海量数据,秒级分析!Flink+Doris 构建实时数仓方案
应用实践 | 海量数据,秒级分析!Flink+Doris 构建实时数仓方案
2022-06-24 19:25:00 【InfoQ】
业务背景
Doris 基本原理

- 接收用户连接请求(MySQL 协议层)
- 元数据存储与管理
- 查询语句的解析与执行计划下发
- 集群管控
- 数据存储与管理
- 查询计划的执行
技术架构

- 通过 FlinkCDC 采集 MySQL Binlog 到 Kafka 中的 Topic1
- 开发 Flink 任务消费上述 Binlog 生成相关主题的宽表,写入 Topic2
- 配置 Doris Routine Load 任务,将 Topic2 的数据导入 Doris
应用实践
建表
CREATE TABLE IF NOT EXISTS table_1
(
key1 varchar(32),
key2 varchar(32),
key3 varchar(32),
value1 int,
value2 varchar(128),
value3 Decimal(20, 6),
data_deal_datetime DateTime COMMENT '数据处理时间',
data_status INT COMMENT '数据是否删除,1表示正常,-1表示数据已经删除'
)
ENGINE=OLAP
UNIQUE KEY(`key1`,`key2`,`key3`)
COMMENT "xxx"
DISTRIBUTED BY HASH(`key2`) BUCKETS 32
PROPERTIES (
"storage_type"="column",
"replication_num" = "3",
"function_column.sequence_type" = 'DateTime'
);
- data_deal_datetime 主要是相同 key 情况下数据覆盖的判断依据
- data_status 用来兼容业务库对数据的删除操作
数据导入任务
CREATE ROUTINE LOAD database.table1 ON table1
COLUMNS(key1,key2,key3,value1,value2,value3,data_deal_datetime,data_status),
ORDER BY data_deal_datetime
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "10",
"max_batch_rows" = "500000",
"max_batch_size" = "209715200",
"format" = "json",
"json_root" = "$.data",
"jsonpaths"="[\"$.key1\",\"$.key2\",\"$.key3\",\"$.value1\",\"$.value2\",
\"$.value3\",\"$.data_deal_datetime\",\"$.data_status\"]"
)FROM KAFKA
(
"kafka_broker_list"="broker1_ip:port1,broker2_ip:port2,broker3_ip:port3",
"kafka_topic"="topic_name",
"property.group.id"="group_id",
"property.kafka_default_offsets"="OFFSET_BEGINNING"
);
- ORDER BY data_deal_datetime 表示根据 data_deal_datetime 字段去覆盖 key 相同的数据
- desired_concurrent_number 表示期望的并发度。
- 每个子任务最大执行时间。
- 每个子任务最多读取的行数。
- 每个子任务最多读取的字节数。
任务监控与报警
import pymysql #导入 pymysql
import requests,json
#打开数据库连接
db= pymysql.connect(host="host",user="user",
password="passwd",db="database",port=port)
# 使用cursor()方法获取操作游标
cur = db.cursor()
#1.查询操作
# 编写sql 查询语句
sql = "show routine load"
cur.execute(sql) #执行sql语句
results = cur.fetchall() #获取查询的所有记录
for row in results :
name = row[1]
state = row[7]
if state != 'RUNNING':
err_log_urls = row[16]
reason_state_changed = row[15]
msg = "doris 数据导入任务异常:\n name=%s \n state=%s \n reason_state_changed=%s \n err_log_urls=%s \n即将自动恢复,请检查错误信息" % (name, state,
reason_state_changed, err_log_urls)
payload_message = {
"msg_type": "text",
"content": {
"text": msg
}
}
url = 'lark 报警url'
s = json.dumps(payload_message)
r = requests.post(url, data=s)
cur.execute("resume routine load for " + name)
cur.close()
db.close()
数据模型
Aggregate
CREATE TABLE tmp_table_1
(
user_id varchar(64) COMMENT "用户id",
channel varchar(64) COMMENT "用户来源渠道",
city_code varchar(64) COMMENT "用户所在城市编码",
last_visit_date DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
total_cost BIGINT SUM DEFAULT "0" COMMENT "用户总消费"
)
ENGINE=OLAP
AGGREGATE KEY(user_id, channel, city_code)
DISTRIBUTED BY HASH(user_id) BUCKETS 6
PROPERTIES("storage_type"="column","replication_num" = "1"):
insert into tmp_table_1 values('suh_001','JD','001','2022-01-01 00:00:01','57');
insert into tmp_table_1 values('suh_001','JD','001','2022-02-01 00:00:01','76');
insert into tmp_table_1 values('suh_001','JD','001','2022-03-01 00:00:01','107');

Unique 模型
Duplicate 模型
CREATE TABLE tmp_table_2
(
user_id varchar(64) COMMENT "用户id",
channel varchar(64) COMMENT "用户来源渠道",
city_code varchar(64) COMMENT "用户所在城市编码",
visit_date DATETIME COMMENT "用户登陆时间",
cost BIGINT COMMENT "用户消费金额"
)
ENGINE=OLAP
DUPLICATE KEY(user_id, channel, city_code)
DISTRIBUTED BY HASH(user_id) BUCKETS 6
PROPERTIES("storage_type"="column","replication_num" = "1");
insert into tmp_table_2 values('suh_001','JD','001','2022-01-01 00:00:01','57');
insert into tmp_table_2 values('suh_001','JD','001','2022-02-01 00:00:01','76');
insert into tmp_table_2 values('suh_001','JD','001','2022-03-01 00:00:01','107');

数据模型的选择建议
总结


边栏推荐
- Failed to open after installing Charles without any prompt
- OSI and tcp/ip model
- Make tea and talk about heroes! Leaders of Fujian Provincial Development and Reform Commission and Fujian municipal business office visited Yurun Health Division for exchange and guidance
- Interpretation of ebpf sockops code
- 介绍BootLoader、PM、kernel和系统开机的总体流程
- Big factories go out to sea and lose "posture"
- socket done
- Football information query system based on C language course report + project source code + demo ppt+ project screenshot
- [product design and R & D collaboration tool] Shanghai daoning provides you with blue lake introduction, download, trial and tutorial
- Network layer & IP
猜你喜欢

socket done

XTransfer技术新人进阶秘诀:不可错过的宝藏Mentor

ping: www.baidu.com: 未知的名称或服务

Intelligent fish tank control system based on STM32 under Internet of things

Transport layer UDP & TCP

多路转接select

去掉录屏提醒(七牛云demo)

Make tea and talk about heroes! Leaders of Fujian Provincial Development and Reform Commission and Fujian municipal business office visited Yurun Health Division for exchange and guidance

Advanced secret of xtransfer technology newcomers: the treasure you can't miss mentor

What does CTO (technical director) usually do?
随机推荐
Failed to open after installing Charles without any prompt
Transport layer UDP & TCP
字节的软件测试盆友们你们可以跳槽了,这还是你们心心念念的字节吗?
Multiplexer select
[cloud native learning notes] learn about kubernetes configuration list yaml file
介绍BootLoader、PM、kernel和系统开机的总体流程
What does CTO (technical director) usually do?
Shengzhe technology AI intelligent drowning prevention service launched
02---纵波不可能产生的现象
(待补充)GAMES101作业7提高-实现微表面模型你需要了解的知识
CondaValueError: The target prefix is the base prefix. Aborting.
推荐模型之多任务模型:ESMM、MMOE
Unity about conversion between local and world coordinates
使用 Go 编程语言 66 个陷阱:Golang 开发者的陷阱和常见错误指北
A field in the database is of JSON type and stores ["1", "2", "3"]
Blender FAQs
03---增反膜
Why are life science enterprises on the cloud in succession?
leetcode_1470_2021.10.12
传输层 udp && tcp