当前位置:网站首页>Flink real-time warehouse DWD layer (order placing multiple tables to realize join operation) template code
Flink real-time warehouse DWD layer (order placing multiple tables to realize join operation) template code
2022-07-29 07:03:00 【Top master cultivation plan】
brief introduction
Multiple tables join operation
Tool class
KafkaUtil
public class KafkaUtil {
private final static String BOOTSTRAP_SERVERS="master:9092";
/**
* Kafka-Source DDL sentence
*
* @param topic Data source topic
* @param groupId Consumer group
* @return Spliced Kafka data source DDL sentence
*/
public static String getKafkaDDL(String topic, String groupId) {
return " with ('connector' = 'kafka', " +
" 'topic' = '" + topic + "'," +
" 'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
" 'properties.group.id' = '" + groupId + "', " +
" 'format' = 'json', " +
" 'scan.startup.mode' = 'group-offsets')";
}
/**
* Kafka-Sink DDL sentence
*
* @param topic Output to Kafka Target theme for
* @return Spliced Kafka-Sink DDL sentence
*/
public static String getUpsertKafkaDDL(String topic) {
return "WITH ( " +
" 'connector' = 'upsert-kafka', " +
" 'topic' = '" + topic + "', " +
" 'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
" 'key.format' = 'json', " +
" 'value.format' = 'json' " +
")";
}
}
MysqlUtils
public class MysqlUtils {
public static String getBaseDicLookUpDDL() {
return "create table `base_dic`( " +
"`dic_code` string, " +
"`dic_name` string, " +
"`parent_code` string, " +
"`create_time` timestamp, " +
"`operate_time` timestamp, " +
"primary key(`dic_code`) not enforced " +
")" + MysqlUtils.mysqlLookUpTableDDL("base_dic");
}
public static String mysqlLookUpTableDDL(String tableName) {
String ddl = "WITH ( " +
"'connector' = 'jdbc', " +
"'url' = 'jdbc:mysql://hadoop102:3306/gmall', " +
"'table-name' = '" + tableName + "', " +
"'lookup.cache.max-rows' = '10', " +
"'lookup.cache.ttl' = '1 hour', " +
"'username' = 'root', " +
"'password' = '000000', " +
"'driver' = 'com.mysql.cj.jdbc.Driver' " +
")";
return ddl;
}
}
Code implementation example
public class DwdTradeOrderPreProcess {
public static void main(String[] args) throws Exception {
// TODO 1. Environmental preparation
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// TODO 2. Enable status backend
// env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
// env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
// env.getCheckpointConfig().enableExternalizedCheckpoints(
// CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
// );
// env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
// env.setRestartStrategy(
// RestartStrategies.failureRateRestart(3, Time.days(1L), Time.minutes(3L))
// );
// env.setStateBackend(new HashMapStateBackend());
// env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/ck");
// System.setProperty("HADOOP_USER_NAME", "atguigu");
tableEnv.getConfig().setLocalTimeZone(ZoneId.of("GMT+8"));
// Set the number of days to expire , Based on business
tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(3));
// TODO 3. from Kafka Read business data , Encapsulated in the Flink SQL surface
tableEnv.executeSql("create table topic_db(" +
"`database` String, " +
"`table` String, " +
"`type` String, " +
"`data` map<String, String>, " +
"`old` map<String, String>, " +
"`proc_time` as PROCTIME(), " +
"`ts` string " +
")" + KafkaUtil.getKafkaDDL("topic_db", "dwd_trade_order_pre_process"));
// TODO 4. Read order details data
Table orderDetail = tableEnv.sqlQuery("select " +
"data['id'] id, " +
"data['order_id'] order_id, " +
"data['sku_id'] sku_id, " +
"data['sku_name'] sku_name, " +
"data['create_time'] create_time, " +
"data['source_id'] source_id, " +
"data['source_type'] source_type, " +
"data['sku_num'] sku_num, " +
"cast(cast(data['sku_num'] as decimal(16,2)) * " +
"cast(data['order_price'] as decimal(16,2)) as String) split_original_amount, " +
"data['split_total_amount'] split_total_amount, " +
"data['split_activity_amount'] split_activity_amount, " +
"data['split_coupon_amount'] split_coupon_amount, " +
"ts od_ts, " +
"proc_time " +
"from `topic_db` where `table` = 'order_detail' " +
"and `type` = 'insert' ");
tableEnv.createTemporaryView("order_detail", orderDetail);
// TODO 5. Read order table data
Table orderInfo = tableEnv.sqlQuery("select " +
"data['id'] id, " +
"data['user_id'] user_id, " +
"data['province_id'] province_id, " +
"data['operate_time'] operate_time, " +
"data['order_status'] order_status, " +
"`type`, " +
"`old`, " +
"ts oi_ts " +
"from `topic_db` " +
"where `table` = 'order_info' " +
"and (`type` = 'insert' or `type` = 'update')");
tableEnv.createTemporaryView("order_info", orderInfo);
// TODO 6. Read order detail activity association table data
Table orderDetailActivity = tableEnv.sqlQuery("select " +
"data['order_detail_id'] order_detail_id, " +
"data['activity_id'] activity_id, " +
"data['activity_rule_id'] activity_rule_id " +
"from `topic_db` " +
"where `table` = 'order_detail_activity' " +
"and `type` = 'insert' ");
tableEnv.createTemporaryView("order_detail_activity", orderDetailActivity);
// TODO 7. Read the order details coupon association table data
Table orderDetailCoupon = tableEnv.sqlQuery("select " +
"data['order_detail_id'] order_detail_id, " +
"data['coupon_id'] coupon_id " +
"from `topic_db` " +
"where `table` = 'order_detail_coupon' " +
"and `type` = 'insert' ");
tableEnv.createTemporaryView("order_detail_coupon", orderDetailCoupon);
// TODO 8. establish MySQL-LookUp Dictionary table
tableEnv.executeSql(MysqlUtils.getBaseDicLookUpDDL());
// TODO 9. Associate five tables to obtain order details
Table resultTable = tableEnv.sqlQuery("select " +
"od.id, " +
"od.order_id, " +
"oi.user_id, " +
"oi.order_status, " +
"od.sku_id, " +
"od.sku_name, " +
"oi.province_id, " +
"act.activity_id, " +
"act.activity_rule_id, " +
"cou.coupon_id, " +
"date_format(od.create_time, 'yyyy-MM-dd') date_id, " +
"od.create_time, " +
"date_format(oi.operate_time, 'yyyy-MM-dd') operate_date_id, " +
"oi.operate_time, " +
"od.source_id, " +
"od.source_type, " +
"dic.dic_name source_type_name, " +
"od.sku_num, " +
"od.split_original_amount, " +
"od.split_activity_amount, " +
"od.split_coupon_amount, " +
"od.split_total_amount, " +
"oi.`type`, " +
"oi.`old`, " +
"od.od_ts, " +
"oi.oi_ts, " +
"current_row_timestamp() row_op_ts " +
"from order_detail od " +
"join order_info oi " +
"on od.order_id = oi.id " +
"left join order_detail_activity act " +
"on od.id = act.order_detail_id " +
"left join order_detail_coupon cou " +
"on od.id = cou.order_detail_id " +
"left join `base_dic` for system_time as of od.proc_time as dic " +
"on od.source_type = dic.dic_code");
tableEnv.createTemporaryView("result_table", resultTable);
// TODO 10. establish Upsert-Kafka dwd_trade_order_pre_process surface
tableEnv.executeSql("" +
"create table dwd_trade_order_pre_process( " +
"id string, " +
"order_id string, " +
"user_id string, " +
"order_status string, " +
"sku_id string, " +
"sku_name string, " +
"province_id string, " +
"activity_id string, " +
"activity_rule_id string, " +
"coupon_id string, " +
"date_id string, " +
"create_time string, " +
"operate_date_id string, " +
"operate_time string, " +
"source_id string, " +
"source_type string, " +
"source_type_name string, " +
"sku_num string, " +
"split_original_amount string, " +
"split_activity_amount string, " +
"split_coupon_amount string, " +
"split_total_amount string, " +
"`type` string, " +
"`old` map<string,string>, " +
"od_ts string, " +
"oi_ts string, " +
"row_op_ts timestamp_ltz(3), " +
"primary key(id) not enforced " +
")" + KafkaUtil.getUpsertKafkaDDL("dwd_trade_order_pre_process"));
// TODO 11. Write the correlation result to Upsert-Kafka surface
tableEnv.executeSql("" +
"insert into dwd_trade_order_pre_process " +
"select * from result_table")
.print();
env.execute();
}
}
边栏推荐
- Overview of database system
- 吴恩达老师机器学习课程笔记 04 多元线性回归
- [CF1054H] Epic Convolution——数论,卷积,任意模数NTT
- 二次元卡通渲染——进阶技巧
- SS command details
- 【CryoEM】FSC, Fourier Shell Correlation简介
- Leetcode-1331: array ordinal conversion
- Flink实时仓库-DWD层(交易域-加购维度退化处理)模板代码
- 基于C语言实现图书借阅管理系统
- LDAP brief description and unified authentication description
猜你喜欢
随机推荐
城市花样精~侬好!DESIGN#可视化电台即将开播
模拟卷Leetcode【普通】172. 阶乘后的零
IDEA中实现Mapper接口到映射文件xml的跳转
Sword finger offer II 115: reconstruction sequence
【解决方案】ERROR: lib/bridge_generated.dart:837:9: Error: The parameter ‘ptr‘ of the method ‘FlutterRustB
王树尧老师运筹学课程笔记 08 线性规划与单纯形法(单纯形法)
Database multi table query joint query add delete modify query
Some tips of vim text editor
Flink实时仓库-DWD层(流量域)模板代码
Flink实时仓库-DWD层(交易域-加购维度退化处理)模板代码
Cvpr2022oral special series (I): low light enhancement
【冷冻电镜】Relion4.0——subtomogram教程
pytest合集(7)— 参数化
[cf1054h] epic Revolution -- number theory, convolution, arbitrary modulus NTT
王树尧老师运筹学课程笔记 07 线性规划与单纯形法(标准型、基、基解、基可行解、可行基)
Simulation volume leetcode [normal] 222. number of nodes of complete binary tree
Decompilation of wechat applet
数据库系统概述
崔雪婷老师最优化理论与方法课程笔记 00 写在前面
2022年SQL经典面试题总结(带解析)