当前位置:网站首页>Flink SQL builds real-time data warehouse DWD layer
Flink SQL builds real-time data warehouse DWD layer
2022-08-02 19:03:00 【Big data study club】
1.实时数仓DWD层
DWDis the detail data layer,The table structure and granularity of this layer remains the same as the original table,不过需要对ODS层数据进行清洗、维度退化、脱敏等,The resulting data is clean,完整的、一致的数据.
(1)对用户行为数据解析.
(2)Null filter for core data.
(3)Remodel the business data collection dimensional model,即维度退化.
2.Dimensional modeling of vehicle travel

3.基于Flink SQL搭建实时数仓DWD层
package com.bigdata.warehouse.dwd;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class DwdCarsLog {public static void main(String[] args) {//1.获取Stream的执行环境StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度//senv.setParallelism(1);//开启checkpoint容错//senv.enableCheckpointing(60000);//senv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//senv.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);//senv.getCheckpointConfig().setCheckpointTimeout(10000);//senv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//设置状态后端//(1)开启RocksDB//senv.setStateBackend(new EmbeddedRocksDBStateBackend());//(2)设置checkpoint 存储//senv.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://mycluster/flink/checkpoints"));//2.创建表执行环境StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);//3.Read the vehicle entry and exit fact tabletEnv.executeSql("CREATE TABLE ods_cars_log (" +" id STRING," +" opTime STRING," +" ctype SMALLINT," +" carCode STRING," +" cId BIGINT," +" proc_time as PROCTIME() "+") WITH (" +" 'connector' = 'kafka'," +" 'topic' = 'ods_cars_log'," +" 'properties.bootstrap.servers' = 'hadoop1:9092'," +" 'properties.group.id' = 'ods_cars_log'," +" 'scan.startup.mode' = 'earliest-offset'," +" 'format' = 'json'" +")");//4.Read the vehicle dimension tabletEnv.executeSql("CREATE TABLE dim_base_cars ( " +" id INT, " +" owerId INT, " +" carCode STRING, " +" carColor STRING, " +" type TINYINT, " +" remark STRING, " +" PRIMARY KEY(id) NOT ENFORCED " +") WITH ( " +" 'connector' = 'jdbc', " +" 'url' = 'jdbc:mysql://hadoop1:3306/sca?useUnicode=true&characterEncoding=utf8', " +" 'table-name' = 'dim_base_cars', " +" 'username' = 'hive', " +" 'password' = 'hive' " +")");//5.Relate fact table and dimension table to get vehicle entry and exit detailsTable resultTable = tEnv.sqlQuery("select " +"cl.id, " +"c.owerId, " +"cl.opTime, " +"cl.cId, " +"cl.carCode, " +"cl.ctype " +"from ods_cars_log cl " +"left join dim_base_cars for system_time as of cl.proc_time as c " +"on cl.carCode=c.carCode");tEnv.createTemporaryView("resultTable",resultTable);//6.创建dwd_cars_log表tEnv.executeSql("CREATE TABLE dwd_cars_log ( " +" id STRING, " +" owerId INT, " +" opTime STRING, " +" cId BIGINT, " +" carCode STRING, " +" ctype SMALLINT, " +" PRIMARY KEY (id) NOT ENFORCED " +") WITH ( " +" 'connector' = 'upsert-kafka', " +" 'topic' = 'dwd_cars_log', " +" 'properties.bootstrap.servers' = 'hadoop1:9092', " +" 'key.format' = 'json', " +" 'value.format' = 'json' " +")");//7.将关联结果写入dwd_cars_log表tEnv.executeSql("insert into dwd_cars_log select * from resultTable");}}
4.基于Kafka创建DWD层topic
#创建kafka topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic dwd_cars_log --replication-factor 3 --partitions 15.View real-time data warehousesDWD层结果
#消费kafka topic
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dwd_cars_log --from-beginningIf the console prints the expected result,Explain real-time data warehouseDWD层搭建成功.
{"id":"3bfe7e59-4771-4aa8-ab90-80c98010c4ea","owerId":10022759,"opTime":"2022-07-15 11:59:55.443","cId":10000095,"carCode":"青I·PY2MR","ctype":2}
{"id":"36208b62-739b-4eea-abf4-9f26b85b85d1","owerId":10075672,"opTime":"2022-07-15 11:59:56.443","cId":10000311,"carCode":"渝Z·C0AFY","ctype":1}{"id":"2a5df539-4668-4a42-8013-978b82b3c318","owerId":10126156,"opTime":"2022-07-15 11:59:57.443","cId":10000526,"carCode":"晋B·1RPVV","ctype":1}{"id":"2bd0ce39-1c39-4db5-9376-68e297fda4b0","owerId":10206773,"opTime":"2022-07-15 11:59:58.443","cId":10000843,"carCode":"冀D·FX3IJ","ctype":2}{"id":"2959544d-53f9-43e4-9101-96629fecdcc6","owerId":10153485,"opTime":"2022-07-15 11:59:59.443","cId":10000631,"carCode":"晋D·8OWIR","ctype":2}{"id":"2fd665f9-ea27-44fd-a8cd-1f204ab2d5fc","owerId":10152560,"opTime":"2022-07-15 12:00:00.099","cId":10000627,"carCode":"贵A·MVO77","ctype":2}{"id":"3c283bc5-5616-43cf-87b2-c94396ced64f","owerId":10103872,"opTime":"2022-07-15 12:00:01.037","cId":10000425,"carCode":"辽L·3C5DU","ctype":1}{"id":"3634862d-c824-4829-a017-0082b7514471","owerId":10234908,"opTime":"2022-07-15 12:00:02.376","cId":10000961,"carCode":"沪T·QNNXP","ctype":1}{"id":"2b4a4d0f-4441-4e75-8437-008dfea5c03c","owerId":10228881,"opTime":"2022-07-15 12:00:03.33","cId":10000938,"carCode":"闽E·GZKRQ","ctype":2}{"id":"2ce336bc-2b31-4089-ae85-a76921c6a306","owerId":10144509,"opTime":"2022-07-15 12:00:04.819","cId":10000596,"carCode
边栏推荐
- 金仓数据库KingbaseES安全指南--6.12. BSD身份验证
- JZ4 二维数组中的查找
- 链表| leecode刷题笔记
- [LeetCode]剑指 Offer 54. 二叉搜索树的第k大节点
- VMware启动报错:另一个程序已锁定文件的一部分,进程无法访问(删除最近的.lck文件夹)
- 3.NVIDIA Deepstream开发指南中文版--Deepstream 环境配置
- 周末看点回顾|亚马逊将于2023年底关闭Amazon Drive网盘服务;千寻位置发布时空智能六大底层自研技术…
- 金仓数据库 OCCI 迁移指南(4. KingbaseES 的 OCCI 迁移指南)
- 【一】TS安装编译配置自动生成.js文件
- 如何为项目匹配资源技能和要求?
猜你喜欢
随机推荐
什么是APS系统?导入APS要注意什么?值得反复观看
Locking and Concurrency Control (3)
QACTION_QA百科
numpy的学习笔记
nacos集群配置详解
持续集成(四)Jenkins配置报警机制
如何为项目匹配资源技能和要求?
JZ11 旋转数组的最小数字
Limit实现分页
A tour of gRPC: 06 - gRPC client straming
JZ21 调整数组顺序使奇数位于偶数前面(一)-相对位置变化
低光数据集
【电子器件笔记7】MOS管参数和选型
es6 map使用场景
let块级作用域,var变量提升
【二】通过props进行传值,子页面多种方式接收
【电子器件笔记6】三极管(BJT)参数和选型
8大软件供应链攻击事件概述
Gartner发布,年度Challenger!
DSP-ADAU1452输入通道配置


![js商品总价格、最高价格商品、排除重复商品[初版]](/img/6f/11241f0d717b0c4e163986ba76fe0b.png)






