当前位置:网站首页>Flink real-time warehouse DWD layer (order placing multiple tables to realize join operation) template code
Flink real-time warehouse DWD layer (order placing multiple tables to realize join operation) template code
2022-07-29 07:03:00 【Top master cultivation plan】
brief introduction
Multiple tables join operation
Tool class
KafkaUtil
public class KafkaUtil {
private final static String BOOTSTRAP_SERVERS="master:9092";
/**
* Kafka-Source DDL sentence
*
* @param topic Data source topic
* @param groupId Consumer group
* @return Spliced Kafka data source DDL sentence
*/
public static String getKafkaDDL(String topic, String groupId) {
return " with ('connector' = 'kafka', " +
" 'topic' = '" + topic + "'," +
" 'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
" 'properties.group.id' = '" + groupId + "', " +
" 'format' = 'json', " +
" 'scan.startup.mode' = 'group-offsets')";
}
/**
* Kafka-Sink DDL sentence
*
* @param topic Output to Kafka Target theme for
* @return Spliced Kafka-Sink DDL sentence
*/
public static String getUpsertKafkaDDL(String topic) {
return "WITH ( " +
" 'connector' = 'upsert-kafka', " +
" 'topic' = '" + topic + "', " +
" 'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
" 'key.format' = 'json', " +
" 'value.format' = 'json' " +
")";
}
}MysqlUtils
public class MysqlUtils {
public static String getBaseDicLookUpDDL() {
return "create table `base_dic`( " +
"`dic_code` string, " +
"`dic_name` string, " +
"`parent_code` string, " +
"`create_time` timestamp, " +
"`operate_time` timestamp, " +
"primary key(`dic_code`) not enforced " +
")" + MysqlUtils.mysqlLookUpTableDDL("base_dic");
}
public static String mysqlLookUpTableDDL(String tableName) {
String ddl = "WITH ( " +
"'connector' = 'jdbc', " +
"'url' = 'jdbc:mysql://hadoop102:3306/gmall', " +
"'table-name' = '" + tableName + "', " +
"'lookup.cache.max-rows' = '10', " +
"'lookup.cache.ttl' = '1 hour', " +
"'username' = 'root', " +
"'password' = '000000', " +
"'driver' = 'com.mysql.cj.jdbc.Driver' " +
")";
return ddl;
}
}Code implementation example
public class DwdTradeOrderPreProcess {
public static void main(String[] args) throws Exception {
// TODO 1. Environmental preparation
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// TODO 2. Enable status backend
// env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
// env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
// env.getCheckpointConfig().enableExternalizedCheckpoints(
// CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
// );
// env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
// env.setRestartStrategy(
// RestartStrategies.failureRateRestart(3, Time.days(1L), Time.minutes(3L))
// );
// env.setStateBackend(new HashMapStateBackend());
// env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/ck");
// System.setProperty("HADOOP_USER_NAME", "atguigu");
tableEnv.getConfig().setLocalTimeZone(ZoneId.of("GMT+8"));
// Set the number of days to expire , Based on business
tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(3));
// TODO 3. from Kafka Read business data , Encapsulated in the Flink SQL surface
tableEnv.executeSql("create table topic_db(" +
"`database` String, " +
"`table` String, " +
"`type` String, " +
"`data` map<String, String>, " +
"`old` map<String, String>, " +
"`proc_time` as PROCTIME(), " +
"`ts` string " +
")" + KafkaUtil.getKafkaDDL("topic_db", "dwd_trade_order_pre_process"));
// TODO 4. Read order details data
Table orderDetail = tableEnv.sqlQuery("select " +
"data['id'] id, " +
"data['order_id'] order_id, " +
"data['sku_id'] sku_id, " +
"data['sku_name'] sku_name, " +
"data['create_time'] create_time, " +
"data['source_id'] source_id, " +
"data['source_type'] source_type, " +
"data['sku_num'] sku_num, " +
"cast(cast(data['sku_num'] as decimal(16,2)) * " +
"cast(data['order_price'] as decimal(16,2)) as String) split_original_amount, " +
"data['split_total_amount'] split_total_amount, " +
"data['split_activity_amount'] split_activity_amount, " +
"data['split_coupon_amount'] split_coupon_amount, " +
"ts od_ts, " +
"proc_time " +
"from `topic_db` where `table` = 'order_detail' " +
"and `type` = 'insert' ");
tableEnv.createTemporaryView("order_detail", orderDetail);
// TODO 5. Read order table data
Table orderInfo = tableEnv.sqlQuery("select " +
"data['id'] id, " +
"data['user_id'] user_id, " +
"data['province_id'] province_id, " +
"data['operate_time'] operate_time, " +
"data['order_status'] order_status, " +
"`type`, " +
"`old`, " +
"ts oi_ts " +
"from `topic_db` " +
"where `table` = 'order_info' " +
"and (`type` = 'insert' or `type` = 'update')");
tableEnv.createTemporaryView("order_info", orderInfo);
// TODO 6. Read order detail activity association table data
Table orderDetailActivity = tableEnv.sqlQuery("select " +
"data['order_detail_id'] order_detail_id, " +
"data['activity_id'] activity_id, " +
"data['activity_rule_id'] activity_rule_id " +
"from `topic_db` " +
"where `table` = 'order_detail_activity' " +
"and `type` = 'insert' ");
tableEnv.createTemporaryView("order_detail_activity", orderDetailActivity);
// TODO 7. Read the order details coupon association table data
Table orderDetailCoupon = tableEnv.sqlQuery("select " +
"data['order_detail_id'] order_detail_id, " +
"data['coupon_id'] coupon_id " +
"from `topic_db` " +
"where `table` = 'order_detail_coupon' " +
"and `type` = 'insert' ");
tableEnv.createTemporaryView("order_detail_coupon", orderDetailCoupon);
// TODO 8. establish MySQL-LookUp Dictionary table
tableEnv.executeSql(MysqlUtils.getBaseDicLookUpDDL());
// TODO 9. Associate five tables to obtain order details
Table resultTable = tableEnv.sqlQuery("select " +
"od.id, " +
"od.order_id, " +
"oi.user_id, " +
"oi.order_status, " +
"od.sku_id, " +
"od.sku_name, " +
"oi.province_id, " +
"act.activity_id, " +
"act.activity_rule_id, " +
"cou.coupon_id, " +
"date_format(od.create_time, 'yyyy-MM-dd') date_id, " +
"od.create_time, " +
"date_format(oi.operate_time, 'yyyy-MM-dd') operate_date_id, " +
"oi.operate_time, " +
"od.source_id, " +
"od.source_type, " +
"dic.dic_name source_type_name, " +
"od.sku_num, " +
"od.split_original_amount, " +
"od.split_activity_amount, " +
"od.split_coupon_amount, " +
"od.split_total_amount, " +
"oi.`type`, " +
"oi.`old`, " +
"od.od_ts, " +
"oi.oi_ts, " +
"current_row_timestamp() row_op_ts " +
"from order_detail od " +
"join order_info oi " +
"on od.order_id = oi.id " +
"left join order_detail_activity act " +
"on od.id = act.order_detail_id " +
"left join order_detail_coupon cou " +
"on od.id = cou.order_detail_id " +
"left join `base_dic` for system_time as of od.proc_time as dic " +
"on od.source_type = dic.dic_code");
tableEnv.createTemporaryView("result_table", resultTable);
// TODO 10. establish Upsert-Kafka dwd_trade_order_pre_process surface
tableEnv.executeSql("" +
"create table dwd_trade_order_pre_process( " +
"id string, " +
"order_id string, " +
"user_id string, " +
"order_status string, " +
"sku_id string, " +
"sku_name string, " +
"province_id string, " +
"activity_id string, " +
"activity_rule_id string, " +
"coupon_id string, " +
"date_id string, " +
"create_time string, " +
"operate_date_id string, " +
"operate_time string, " +
"source_id string, " +
"source_type string, " +
"source_type_name string, " +
"sku_num string, " +
"split_original_amount string, " +
"split_activity_amount string, " +
"split_coupon_amount string, " +
"split_total_amount string, " +
"`type` string, " +
"`old` map<string,string>, " +
"od_ts string, " +
"oi_ts string, " +
"row_op_ts timestamp_ltz(3), " +
"primary key(id) not enforced " +
")" + KafkaUtil.getUpsertKafkaDDL("dwd_trade_order_pre_process"));
// TODO 11. Write the correlation result to Upsert-Kafka surface
tableEnv.executeSql("" +
"insert into dwd_trade_order_pre_process " +
"select * from result_table")
.print();
env.execute();
}
}边栏推荐
- IO stream - file - properties
- 【冷冻电镜】Relion4.0——subtomogram教程
- Teacher Wu Enda machine learning course notes 01 introduction
- Can MySQL export tables regularly?
- Sword finger offer II 115: reconstruction sequence
- HJ37 统计每个月兔子的总数 斐波那契数列
- Unity免费元素特效推荐
- 王树尧老师运筹学课程笔记 06 线性规划与单纯形法(几何意义)
- 吴恩达老师机器学习课程笔记 01 引言
- Windows 上 php 7.4 连接 oracle 配置
猜你喜欢
随机推荐
ECCV 2022 lightweight model frame Parc net press apple mobilevit code and paper Download
Actual combat! Talk about how to solve the deep paging problem of MySQL
【技能积累】写邮件时的常用表达
Teacher Wu Enda's machine learning course notes 02 univariate linear regression
Leetcode-1331: array ordinal conversion
SDN topology discovery principle
吴恩达老师机器学习课程笔记 02 单变量线性回归
Ping principle
C language memory stack and heap usage
LDAP brief description and unified authentication description
Teacher wangshuyao's notes on operations research 03 KKT theorem
Simulation volume leetcode [normal] 081. Search rotation sort array II
模拟卷Leetcode【普通】222. 完全二叉树的节点个数
Summary of 2022 SQL classic interview questions (with analysis)
Teacher Wu Enda machine learning course notes 05 octave tutorial
Teacher wangshuyao's notes on operations research 01 guidance and introduction
Improved Pillar with Fine-grained Feature for 3D Object Detection论文笔记
游戏资产的革命
HJ37 统计每个月兔子的总数 斐波那契数列
分享一些你代码更好的小建议,流畅编码提搞效率









