当前位置:网站首页>Flink SQL builds real-time data warehouse DWD layer
Flink SQL builds real-time data warehouse DWD layer
2022-08-02 19:03:00 【Big data study club】
1.实时数仓DWD层
DWDis the detail data layer,The table structure and granularity of this layer remains the same as the original table,不过需要对ODS层数据进行清洗、维度退化、脱敏等,The resulting data is clean,完整的、一致的数据.
(1)对用户行为数据解析.
(2)Null filter for core data.
(3)Remodel the business data collection dimensional model,即维度退化.
2.Dimensional modeling of vehicle travel
3.基于Flink SQL搭建实时数仓DWD层
package com.bigdata.warehouse.dwd;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class DwdCarsLog {
public static void main(String[] args) {
//1.获取Stream的执行环境
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
//senv.setParallelism(1);
//开启checkpoint容错
//senv.enableCheckpointing(60000);
//senv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//senv.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
//senv.getCheckpointConfig().setCheckpointTimeout(10000);
//senv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//设置状态后端
//(1)开启RocksDB
//senv.setStateBackend(new EmbeddedRocksDBStateBackend());
//(2)设置checkpoint 存储
//senv.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://mycluster/flink/checkpoints"));
//2.创建表执行环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);
//3.Read the vehicle entry and exit fact table
tEnv.executeSql("CREATE TABLE ods_cars_log (" +
" id STRING," +
" opTime STRING," +
" ctype SMALLINT," +
" carCode STRING," +
" cId BIGINT," +
" proc_time as PROCTIME() "+
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'ods_cars_log'," +
" 'properties.bootstrap.servers' = 'hadoop1:9092'," +
" 'properties.group.id' = 'ods_cars_log'," +
" 'scan.startup.mode' = 'earliest-offset'," +
" 'format' = 'json'" +
")");
//4.Read the vehicle dimension table
tEnv.executeSql("CREATE TABLE dim_base_cars ( " +
" id INT, " +
" owerId INT, " +
" carCode STRING, " +
" carColor STRING, " +
" type TINYINT, " +
" remark STRING, " +
" PRIMARY KEY(id) NOT ENFORCED " +
") WITH ( " +
" 'connector' = 'jdbc', " +
" 'url' = 'jdbc:mysql://hadoop1:3306/sca?useUnicode=true&characterEncoding=utf8', " +
" 'table-name' = 'dim_base_cars', " +
" 'username' = 'hive', " +
" 'password' = 'hive' " +
")");
//5.Relate fact table and dimension table to get vehicle entry and exit details
Table resultTable = tEnv.sqlQuery("select " +
"cl.id, " +
"c.owerId, " +
"cl.opTime, " +
"cl.cId, " +
"cl.carCode, " +
"cl.ctype " +
"from ods_cars_log cl " +
"left join dim_base_cars for system_time as of cl.proc_time as c " +
"on cl.carCode=c.carCode");
tEnv.createTemporaryView("resultTable",resultTable);
//6.创建dwd_cars_log表
tEnv.executeSql("CREATE TABLE dwd_cars_log ( " +
" id STRING, " +
" owerId INT, " +
" opTime STRING, " +
" cId BIGINT, " +
" carCode STRING, " +
" ctype SMALLINT, " +
" PRIMARY KEY (id) NOT ENFORCED " +
") WITH ( " +
" 'connector' = 'upsert-kafka', " +
" 'topic' = 'dwd_cars_log', " +
" 'properties.bootstrap.servers' = 'hadoop1:9092', " +
" 'key.format' = 'json', " +
" 'value.format' = 'json' " +
")");
//7.将关联结果写入dwd_cars_log表
tEnv.executeSql("insert into dwd_cars_log select * from resultTable");
}
}
4.基于Kafka创建DWD层topic
#创建kafka topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic dwd_cars_log --replication-factor 3 --partitions 1
5.View real-time data warehousesDWD层结果
#消费kafka topic
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dwd_cars_log --from-beginning
If the console prints the expected result,Explain real-time data warehouseDWD层搭建成功.
{"id":"3bfe7e59-4771-4aa8-ab90-80c98010c4ea","owerId":10022759,"opTime":"2022-07-15 11:59:55.443","cId":10000095,"carCode":"青I·PY2MR","ctype":2}
{"id":"36208b62-739b-4eea-abf4-9f26b85b85d1","owerId":10075672,"opTime":"2022-07-15 11:59:56.443","cId":10000311,"carCode":"渝Z·C0AFY","ctype":1}
{"id":"2a5df539-4668-4a42-8013-978b82b3c318","owerId":10126156,"opTime":"2022-07-15 11:59:57.443","cId":10000526,"carCode":"晋B·1RPVV","ctype":1}
{"id":"2bd0ce39-1c39-4db5-9376-68e297fda4b0","owerId":10206773,"opTime":"2022-07-15 11:59:58.443","cId":10000843,"carCode":"冀D·FX3IJ","ctype":2}
{"id":"2959544d-53f9-43e4-9101-96629fecdcc6","owerId":10153485,"opTime":"2022-07-15 11:59:59.443","cId":10000631,"carCode":"晋D·8OWIR","ctype":2}
{"id":"2fd665f9-ea27-44fd-a8cd-1f204ab2d5fc","owerId":10152560,"opTime":"2022-07-15 12:00:00.099","cId":10000627,"carCode":"贵A·MVO77","ctype":2}
{"id":"3c283bc5-5616-43cf-87b2-c94396ced64f","owerId":10103872,"opTime":"2022-07-15 12:00:01.037","cId":10000425,"carCode":"辽L·3C5DU","ctype":1}
{"id":"3634862d-c824-4829-a017-0082b7514471","owerId":10234908,"opTime":"2022-07-15 12:00:02.376","cId":10000961,"carCode":"沪T·QNNXP","ctype":1}
{"id":"2b4a4d0f-4441-4e75-8437-008dfea5c03c","owerId":10228881,"opTime":"2022-07-15 12:00:03.33","cId":10000938,"carCode":"闽E·GZKRQ","ctype":2}
{"id":"2ce336bc-2b31-4089-ae85-a76921c6a306","owerId":10144509,"opTime":"2022-07-15 12:00:04.819","cId":10000596,"carCode
边栏推荐
猜你喜欢
随机推荐
【Redis】连接报错:Could not connect to Redis at 127.0.0.1:6379: Connection refused
数字孪生园区场景中的坐标知识
synchronized已经不在臃肿了,放下对他的成见之初识轻量级锁
2022年PMP考试应该注意些什么?
JZ4 二维数组中的查找
DSP-ADAU1452输入通道配置
分类实验报告作业
【电子器件笔记7】MOS管参数和选型
navicat premium 15 下载安装详细教程
技术分享| 融合调度系统中的电子围栏功能说明
尚硅谷尚品项目汇笔记(二)
JZ15 二进制中1的个数
数据中台应该怎样规划与建设?_光点科技
Oracle分析归档日志内容时,发现很多null?
内网渗透之kerberos认证(三)
我的创作纪念日
解析并执行 shell 命令
蔚来杯2022牛客暑期多校训练营5 ABCDFGHK
JZ21 调整数组顺序使奇数位于偶数前面(一)-相对位置变化
“蔚来杯“2022牛客暑期多校训练营4 E - Jobs (Hard Version)