当前位置:网站首页>Flink SQL搭建实时数仓DWD层
Flink SQL搭建实时数仓DWD层
2022-08-02 16:30:00 【大数据研习社】
1.实时数仓DWD层
DWD是明细数据层,该层的表结构和粒度与原始表保持一致,不过需要对ODS层数据进行清洗、维度退化、脱敏等,最终得到的数据是干净的,完整的、一致的数据。
(1)对用户行为数据解析。
(2)对核心数据进行空值过滤。
(3)对业务数据采集维度模型重新建模,即维度退化。
2.基于车辆出行的维度建模

3.基于Flink SQL搭建实时数仓DWD层
package com.bigdata.warehouse.dwd;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class DwdCarsLog {public static void main(String[] args) {//1.获取Stream的执行环境StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度//senv.setParallelism(1);//开启checkpoint容错//senv.enableCheckpointing(60000);//senv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//senv.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);//senv.getCheckpointConfig().setCheckpointTimeout(10000);//senv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//设置状态后端//(1)开启RocksDB//senv.setStateBackend(new EmbeddedRocksDBStateBackend());//(2)设置checkpoint 存储//senv.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://mycluster/flink/checkpoints"));//2.创建表执行环境StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);//3.读取车辆进出事实表tEnv.executeSql("CREATE TABLE ods_cars_log (" +" id STRING," +" opTime STRING," +" ctype SMALLINT," +" carCode STRING," +" cId BIGINT," +" proc_time as PROCTIME() "+") WITH (" +" 'connector' = 'kafka'," +" 'topic' = 'ods_cars_log'," +" 'properties.bootstrap.servers' = 'hadoop1:9092'," +" 'properties.group.id' = 'ods_cars_log'," +" 'scan.startup.mode' = 'earliest-offset'," +" 'format' = 'json'" +")");//4.读取车辆维度表tEnv.executeSql("CREATE TABLE dim_base_cars ( " +" id INT, " +" owerId INT, " +" carCode STRING, " +" carColor STRING, " +" type TINYINT, " +" remark STRING, " +" PRIMARY KEY(id) NOT ENFORCED " +") WITH ( " +" 'connector' = 'jdbc', " +" 'url' = 'jdbc:mysql://hadoop1:3306/sca?useUnicode=true&characterEncoding=utf8', " +" 'table-name' = 'dim_base_cars', " +" 'username' = 'hive', " +" 'password' = 'hive' " +")");//5.关联事实表与维度表获取车辆进出明细表Table resultTable = tEnv.sqlQuery("select " +"cl.id, " +"c.owerId, " +"cl.opTime, " +"cl.cId, " +"cl.carCode, " +"cl.ctype " +"from ods_cars_log cl " +"left join dim_base_cars for system_time as of cl.proc_time as c " +"on cl.carCode=c.carCode");tEnv.createTemporaryView("resultTable",resultTable);//6.创建dwd_cars_log表tEnv.executeSql("CREATE TABLE dwd_cars_log ( " +" id STRING, " +" owerId INT, " +" opTime STRING, " +" cId BIGINT, " +" carCode STRING, " +" ctype SMALLINT, " +" PRIMARY KEY (id) NOT ENFORCED " +") WITH ( " +" 'connector' = 'upsert-kafka', " +" 'topic' = 'dwd_cars_log', " +" 'properties.bootstrap.servers' = 'hadoop1:9092', " +" 'key.format' = 'json', " +" 'value.format' = 'json' " +")");//7.将关联结果写入dwd_cars_log表tEnv.executeSql("insert into dwd_cars_log select * from resultTable");}}
4.基于Kafka创建DWD层topic
#创建kafka topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic dwd_cars_log --replication-factor 3 --partitions 15.查看实时数仓DWD层结果
#消费kafka topic
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dwd_cars_log --from-beginning如果控制台打印预期结果,说明实时数仓DWD层搭建成功。
{"id":"3bfe7e59-4771-4aa8-ab90-80c98010c4ea","owerId":10022759,"opTime":"2022-07-15 11:59:55.443","cId":10000095,"carCode":"青I·PY2MR","ctype":2}
{"id":"36208b62-739b-4eea-abf4-9f26b85b85d1","owerId":10075672,"opTime":"2022-07-15 11:59:56.443","cId":10000311,"carCode":"渝Z·C0AFY","ctype":1}{"id":"2a5df539-4668-4a42-8013-978b82b3c318","owerId":10126156,"opTime":"2022-07-15 11:59:57.443","cId":10000526,"carCode":"晋B·1RPVV","ctype":1}{"id":"2bd0ce39-1c39-4db5-9376-68e297fda4b0","owerId":10206773,"opTime":"2022-07-15 11:59:58.443","cId":10000843,"carCode":"冀D·FX3IJ","ctype":2}{"id":"2959544d-53f9-43e4-9101-96629fecdcc6","owerId":10153485,"opTime":"2022-07-15 11:59:59.443","cId":10000631,"carCode":"晋D·8OWIR","ctype":2}{"id":"2fd665f9-ea27-44fd-a8cd-1f204ab2d5fc","owerId":10152560,"opTime":"2022-07-15 12:00:00.099","cId":10000627,"carCode":"贵A·MVO77","ctype":2}{"id":"3c283bc5-5616-43cf-87b2-c94396ced64f","owerId":10103872,"opTime":"2022-07-15 12:00:01.037","cId":10000425,"carCode":"辽L·3C5DU","ctype":1}{"id":"3634862d-c824-4829-a017-0082b7514471","owerId":10234908,"opTime":"2022-07-15 12:00:02.376","cId":10000961,"carCode":"沪T·QNNXP","ctype":1}{"id":"2b4a4d0f-4441-4e75-8437-008dfea5c03c","owerId":10228881,"opTime":"2022-07-15 12:00:03.33","cId":10000938,"carCode":"闽E·GZKRQ","ctype":2}{"id":"2ce336bc-2b31-4089-ae85-a76921c6a306","owerId":10144509,"opTime":"2022-07-15 12:00:04.819","cId":10000596,"carCode
边栏推荐
猜你喜欢
随机推荐
低光数据集
JZ21 调整数组顺序使奇数位于偶数前面(一)-相对位置变化
js商品总价格、最高价格商品、排除重复商品[初版]
[LeetCode]剑指 Offer 54. 二叉搜索树的第k大节点
FPGA 20个例程篇:10.遍历DDR3内存颗粒读写循环校验
Locking and Concurrency Control (3)
mysql 《一》触发器
Break the stereotype, DIY is your own unique mall
julia系列1:介绍与安装
Number 类及各子类所占字节数源码分析
ActiveMQ漫谈(一)
Timestamp formatting "recommended collection"
JZ81 调整数组顺序使奇数位于偶数前面(二)-相对位置变化
融云「 IM 进阶实战高手课」系列直播上线
Redis进阶之路:深度解析Redis单线程架构,图文并茂不能再清晰了
看我如何用多线程,帮助运营小姐姐解决数据校对系统变慢!
2022 年值得尝试的 7 个 MQTT 客户端工具
【二】通过props进行传值,子页面多种方式接收
时间戳格式化「建议收藏」
js实现改变原来对象中的键值对对应的值

![js商品总价格、最高价格商品、排除重复商品[初版]](/img/6f/11241f0d717b0c4e163986ba76fe0b.png)






