当前位置:网站首页>Flink实时仓库-DWD层(下单-多张表实现join操作)模板代码
Flink实时仓库-DWD层(下单-多张表实现join操作)模板代码
2022-07-29 05:54:00 【顶尖高手养成计划】
简介
多张表的join操作
工具类
KafkaUtil
public class KafkaUtil {
private final static String BOOTSTRAP_SERVERS="master:9092";
/**
* Kafka-Source DDL 语句
*
* @param topic 数据源主题
* @param groupId 消费者组
* @return 拼接好的 Kafka 数据源 DDL 语句
*/
public static String getKafkaDDL(String topic, String groupId) {
return " with ('connector' = 'kafka', " +
" 'topic' = '" + topic + "'," +
" 'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
" 'properties.group.id' = '" + groupId + "', " +
" 'format' = 'json', " +
" 'scan.startup.mode' = 'group-offsets')";
}
/**
* Kafka-Sink DDL 语句
*
* @param topic 输出到 Kafka 的目标主题
* @return 拼接好的 Kafka-Sink DDL 语句
*/
public static String getUpsertKafkaDDL(String topic) {
return "WITH ( " +
" 'connector' = 'upsert-kafka', " +
" 'topic' = '" + topic + "', " +
" 'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
" 'key.format' = 'json', " +
" 'value.format' = 'json' " +
")";
}
}MysqlUtils
public class MysqlUtils {
public static String getBaseDicLookUpDDL() {
return "create table `base_dic`( " +
"`dic_code` string, " +
"`dic_name` string, " +
"`parent_code` string, " +
"`create_time` timestamp, " +
"`operate_time` timestamp, " +
"primary key(`dic_code`) not enforced " +
")" + MysqlUtils.mysqlLookUpTableDDL("base_dic");
}
public static String mysqlLookUpTableDDL(String tableName) {
String ddl = "WITH ( " +
"'connector' = 'jdbc', " +
"'url' = 'jdbc:mysql://hadoop102:3306/gmall', " +
"'table-name' = '" + tableName + "', " +
"'lookup.cache.max-rows' = '10', " +
"'lookup.cache.ttl' = '1 hour', " +
"'username' = 'root', " +
"'password' = '000000', " +
"'driver' = 'com.mysql.cj.jdbc.Driver' " +
")";
return ddl;
}
}代码实现例子
public class DwdTradeOrderPreProcess {
public static void main(String[] args) throws Exception {
// TODO 1. 环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// TODO 2. 启用状态后端
// env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
// env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
// env.getCheckpointConfig().enableExternalizedCheckpoints(
// CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
// );
// env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
// env.setRestartStrategy(
// RestartStrategies.failureRateRestart(3, Time.days(1L), Time.minutes(3L))
// );
// env.setStateBackend(new HashMapStateBackend());
// env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/ck");
// System.setProperty("HADOOP_USER_NAME", "atguigu");
tableEnv.getConfig().setLocalTimeZone(ZoneId.of("GMT+8"));
//设置过期的天数,根据业务制定
tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(3));
// TODO 3. 从 Kafka 读取业务数据,封装为 Flink SQL 表
tableEnv.executeSql("create table topic_db(" +
"`database` String, " +
"`table` String, " +
"`type` String, " +
"`data` map<String, String>, " +
"`old` map<String, String>, " +
"`proc_time` as PROCTIME(), " +
"`ts` string " +
")" + KafkaUtil.getKafkaDDL("topic_db", "dwd_trade_order_pre_process"));
// TODO 4. 读取订单明细表数据
Table orderDetail = tableEnv.sqlQuery("select " +
"data['id'] id, " +
"data['order_id'] order_id, " +
"data['sku_id'] sku_id, " +
"data['sku_name'] sku_name, " +
"data['create_time'] create_time, " +
"data['source_id'] source_id, " +
"data['source_type'] source_type, " +
"data['sku_num'] sku_num, " +
"cast(cast(data['sku_num'] as decimal(16,2)) * " +
"cast(data['order_price'] as decimal(16,2)) as String) split_original_amount, " +
"data['split_total_amount'] split_total_amount, " +
"data['split_activity_amount'] split_activity_amount, " +
"data['split_coupon_amount'] split_coupon_amount, " +
"ts od_ts, " +
"proc_time " +
"from `topic_db` where `table` = 'order_detail' " +
"and `type` = 'insert' ");
tableEnv.createTemporaryView("order_detail", orderDetail);
// TODO 5. 读取订单表数据
Table orderInfo = tableEnv.sqlQuery("select " +
"data['id'] id, " +
"data['user_id'] user_id, " +
"data['province_id'] province_id, " +
"data['operate_time'] operate_time, " +
"data['order_status'] order_status, " +
"`type`, " +
"`old`, " +
"ts oi_ts " +
"from `topic_db` " +
"where `table` = 'order_info' " +
"and (`type` = 'insert' or `type` = 'update')");
tableEnv.createTemporaryView("order_info", orderInfo);
// TODO 6. 读取订单明细活动关联表数据
Table orderDetailActivity = tableEnv.sqlQuery("select " +
"data['order_detail_id'] order_detail_id, " +
"data['activity_id'] activity_id, " +
"data['activity_rule_id'] activity_rule_id " +
"from `topic_db` " +
"where `table` = 'order_detail_activity' " +
"and `type` = 'insert' ");
tableEnv.createTemporaryView("order_detail_activity", orderDetailActivity);
// TODO 7. 读取订单明细优惠券关联表数据
Table orderDetailCoupon = tableEnv.sqlQuery("select " +
"data['order_detail_id'] order_detail_id, " +
"data['coupon_id'] coupon_id " +
"from `topic_db` " +
"where `table` = 'order_detail_coupon' " +
"and `type` = 'insert' ");
tableEnv.createTemporaryView("order_detail_coupon", orderDetailCoupon);
// TODO 8. 建立 MySQL-LookUp 字典表
tableEnv.executeSql(MysqlUtils.getBaseDicLookUpDDL());
// TODO 9. 关联五张表获得订单明细表
Table resultTable = tableEnv.sqlQuery("select " +
"od.id, " +
"od.order_id, " +
"oi.user_id, " +
"oi.order_status, " +
"od.sku_id, " +
"od.sku_name, " +
"oi.province_id, " +
"act.activity_id, " +
"act.activity_rule_id, " +
"cou.coupon_id, " +
"date_format(od.create_time, 'yyyy-MM-dd') date_id, " +
"od.create_time, " +
"date_format(oi.operate_time, 'yyyy-MM-dd') operate_date_id, " +
"oi.operate_time, " +
"od.source_id, " +
"od.source_type, " +
"dic.dic_name source_type_name, " +
"od.sku_num, " +
"od.split_original_amount, " +
"od.split_activity_amount, " +
"od.split_coupon_amount, " +
"od.split_total_amount, " +
"oi.`type`, " +
"oi.`old`, " +
"od.od_ts, " +
"oi.oi_ts, " +
"current_row_timestamp() row_op_ts " +
"from order_detail od " +
"join order_info oi " +
"on od.order_id = oi.id " +
"left join order_detail_activity act " +
"on od.id = act.order_detail_id " +
"left join order_detail_coupon cou " +
"on od.id = cou.order_detail_id " +
"left join `base_dic` for system_time as of od.proc_time as dic " +
"on od.source_type = dic.dic_code");
tableEnv.createTemporaryView("result_table", resultTable);
// TODO 10. 建立 Upsert-Kafka dwd_trade_order_pre_process 表
tableEnv.executeSql("" +
"create table dwd_trade_order_pre_process( " +
"id string, " +
"order_id string, " +
"user_id string, " +
"order_status string, " +
"sku_id string, " +
"sku_name string, " +
"province_id string, " +
"activity_id string, " +
"activity_rule_id string, " +
"coupon_id string, " +
"date_id string, " +
"create_time string, " +
"operate_date_id string, " +
"operate_time string, " +
"source_id string, " +
"source_type string, " +
"source_type_name string, " +
"sku_num string, " +
"split_original_amount string, " +
"split_activity_amount string, " +
"split_coupon_amount string, " +
"split_total_amount string, " +
"`type` string, " +
"`old` map<string,string>, " +
"od_ts string, " +
"oi_ts string, " +
"row_op_ts timestamp_ltz(3), " +
"primary key(id) not enforced " +
")" + KafkaUtil.getUpsertKafkaDDL("dwd_trade_order_pre_process"));
// TODO 11. 将关联结果写入 Upsert-Kafka 表
tableEnv.executeSql("" +
"insert into dwd_trade_order_pre_process " +
"select * from result_table")
.print();
env.execute();
}
}边栏推荐
- 【flask入门系列】Flask-SQLAlchemy的安装与配置
- SDN topology discovery principle
- 好文佳句摘录
- Share some tips for better code, smooth coding and improve efficiency
- 吴恩达老师机器学习课程笔记 00 写在前面
- ECCV 2022丨轻量级模型架ParC-Net 力压苹果MobileViT代码和论文下载
- 崔雪婷老师最优化理论与方法课程笔记 00 写在前面
- Junda technology | applicable to "riyueyuan" brand ups wechat cloud monitoring card
- Teacher Wu Enda's machine learning course notes 04 multiple linear regression
- [CF1054H] Epic Convolution——数论,卷积,任意模数NTT
猜你喜欢

【冷冻电镜|论文阅读】A feature-guided, focused 3D signal permutation method for subtomogram averaging

【CryoEM】FSC, Fourier Shell Correlation简介

How to write controller layer code gracefully?

基于C语言设计的学籍管理系统

IDEA中实现Mapper接口到映射文件xml的跳转

Cvpr2022oral special series (I): low light enhancement

IO流 - File - properties

实战!聊聊如何解决MySQL深分页问题

The core of openresty and cosocket

Software definition boundary SDP
随机推荐
【flask入门系列】Flask-SQLAlchemy的安装与配置
Teacher Cui Xueting's course notes on optimization theory and methods 00 are written in the front
Cvpr2022oral special series (I): low light enhancement
Difference between CNAME record and a record
游戏资产的革命
数据库系统概述
SDN topology discovery principle
矩阵分解与梯度下降
MySQL:当你CRUD时BufferPool中发生了什么?十张图就能说清楚
DBAsql面试题
王树尧老师运筹学课程笔记 09 线性规划与单纯形法(单纯形表的应用)
【冷冻电镜入门】加州理工公开课课程笔记 Part 3: Image Formation
The latest pycharm2018 cracking tutorial
5g service interface and reference point
Unity exploration plot access design analysis & process + code specific implementation
HJ37 统计每个月兔子的总数 斐波那契数列
Software definition boundary SDP
Talk about tcp/ip protocol? And the role of each layer?
王树尧老师运筹学课程笔记 00 写在前面
2022年SQL经典面试题总结(带解析)