当前位置:网站首页>Flink实时仓库-DWD层(kafka-关联mysql的lookup join)模板代码
Flink实时仓库-DWD层(kafka-关联mysql的lookup join)模板代码
2022-07-29 05:54:00 【顶尖高手养成计划】
简介
kafka的数据关联lookup join的数据
工具类
KafkaUtil
public class KafkaUtil {
private final static String BOOTSTRAP_SERVERS="master:9092";
/**
* Kafka-Source DDL 语句
*
* @param topic 数据源主题
* @param groupId 消费者组
* @return 拼接好的 Kafka 数据源 DDL 语句
*/
public static String getKafkaDDL(String topic, String groupId) {
return " with ('connector' = 'kafka', " +
" 'topic' = '" + topic + "'," +
" 'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
" 'properties.group.id' = '" + groupId + "', " +
" 'format' = 'json', " +
" 'scan.startup.mode' = 'group-offsets')";
}
/**
* Kafka-Sink DDL 语句
*
* @param topic 输出到 Kafka 的目标主题
* @return 拼接好的 Kafka-Sink DDL 语句
*/
public static String getUpsertKafkaDDL(String topic) {
return "WITH ( " +
" 'connector' = 'upsert-kafka', " +
" 'topic' = '" + topic + "', " +
" 'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
" 'key.format' = 'json', " +
" 'value.format' = 'json' " +
")";
}
}
MysqlUtils
public class MysqlUtils {
public static String getBaseDicLookUpDDL() {
return "create table `base_dic`( " +
"`dic_code` string, " +
"`dic_name` string, " +
"`parent_code` string, " +
"`create_time` timestamp, " +
"`operate_time` timestamp, " +
"primary key(`dic_code`) not enforced " +
")" + MysqlUtils.mysqlLookUpTableDDL("base_dic");
}
public static String mysqlLookUpTableDDL(String tableName) {
String ddl = "WITH ( " +
"'connector' = 'jdbc', " +
"'url' = 'jdbc:mysql://hadoop102:3306/gmall', " +
"'table-name' = '" + tableName + "', " +
"'lookup.cache.max-rows' = '10', " +
"'lookup.cache.ttl' = '1 hour', " +
"'username' = 'root', " +
"'password' = '000000', " +
"'driver' = 'com.mysql.cj.jdbc.Driver' " +
")";
return ddl;
}
}实现
public class DwdTradeOrderPreProcess {
public static void main(String[] args) throws Exception {
// TODO 1. 环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// TODO 2. 状态后端设置
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, Time.days(1), Time.minutes(1)
));
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(
"hdfs://hadoop102:8020/ck"
);
System.setProperty("HADOOP_USER_NAME", "atguigu");
// TODO 3. 从 Kafka 读取 topic_db 数据,封装为 Flink SQL 表
tableEnv.executeSql("create table topic_db(" +
"`database` string, " +
"`table` string, " +
"`type` string, " +
"`data` map<string, string>, " +
"`old` map<string, string>, " +
//TODO 重要处理时间函数
"`proc_time` as PROCTIME(), " +
"`ts` string " +
")" + KafkaUtil.getKafkaDDL("topic_db", "dwd_trade_order_refund"));
// TODO 4. 读取退单表数据
Table orderRefundInfo = tableEnv.sqlQuery("select " +
"data['id'] id, " +
"data['user_id'] user_id, " +
"data['order_id'] order_id, " +
"data['sku_id'] sku_id, " +
"data['refund_type'] refund_type, " +
"data['refund_num'] refund_num, " +
"data['refund_amount'] refund_amount, " +
"data['refund_reason_type'] refund_reason_type, " +
"data['refund_reason_txt'] refund_reason_txt, " +
"data['create_time'] create_time, " +
"proc_time, " +
"ts " +
"from topic_db " +
"where `table` = 'order_refund_info' " +
"and `type` = 'insert' ");
tableEnv.createTemporaryView("order_refund_info", orderRefundInfo);
// TODO 5. 读取订单表数据,筛选退单数据
Table orderInfoRefund = tableEnv.sqlQuery("select " +
"data['id'] id, " +
"data['province_id'] province_id, " +
"`old` " +
"from topic_db " +
"where `table` = 'order_info' " +
"and `type` = 'update' " +
"and data['order_status']='1005' " +
"and `old`['order_status'] is not null");
tableEnv.createTemporaryView("order_info_refund", orderInfoRefund);
// TODO 6. 建立 MySQL-LookUp 字典表
tableEnv.executeSql(MysqlUtils.getBaseDicLookUpDDL());
// TODO 7. 关联三张表获得退单宽表
Table resultTable = tableEnv.sqlQuery("select " +
"ri.id, " +
"ri.user_id, " +
"ri.order_id, " +
"ri.sku_id, " +
"oi.province_id, " +
"date_format(ri.create_time,'yyyy-MM-dd') date_id, " +
"ri.create_time, " +
"ri.refund_type, " +
"type_dic.dic_name, " +
"ri.refund_reason_type, " +
"reason_dic.dic_name, " +
"ri.refund_reason_txt, " +
"ri.refund_num, " +
"ri.refund_amount, " +
"ri.ts, " +
"current_row_timestamp() row_op_ts " +
"from order_refund_info ri " +
"left join " +
"order_info_refund oi " +
"on ri.order_id = oi.id " +
"left join " +
"base_dic for system_time as of ri.proc_time as type_dic " +
"on ri.refund_type = type_dic.dic_code " +
"left join " +
"base_dic for system_time as of ri.proc_time as reason_dic " +
"on ri.refund_reason_type=reason_dic.dic_code");
tableEnv.createTemporaryView("result_table", resultTable);
// TODO 8. 建立 Upsert-Kafka dwd_trade_order_refund 表
tableEnv.executeSql("create table dwd_trade_order_refund( " +
"id string, " +
"user_id string, " +
"order_id string, " +
"sku_id string, " +
"province_id string, " +
"date_id string, " +
"create_time string, " +
"refund_type_code string, " +
"refund_type_name string, " +
"refund_reason_type_code string, " +
"refund_reason_type_name string, " +
"refund_reason_txt string, " +
"refund_num string, " +
"refund_amount string, " +
"ts string, " +
"row_op_ts timestamp_ltz(3), " +
"primary key(id) not enforced " +
")" + KafkaUtil.getUpsertKafkaDDL("dwd_trade_order_refund"));
// TODO 9. 将关联结果写入 Upsert-Kafka 表
tableEnv.executeSql("" +
"insert into dwd_trade_order_refund select * from result_table");
}
}边栏推荐
- 关于SQL Server语句入门级应用阶段性学习——找工作必备(一)
- API for using the new date class of instant
- Share some tips for better code, smooth coding and improve efficiency
- Windows 上 php 7.4 连接 oracle 配置
- 如何优雅的写 Controller 层代码?
- LDAP brief description and unified authentication description
- 'function VTable for error: undefined reference to... 'cause and solution of the problem
- Cesium reflection
- 【解决方案】ERROR: lib/bridge_generated.dart:837:9: Error: The parameter ‘ptr‘ of the method ‘FlutterRustB
- Idea cannot find a database solution
猜你喜欢

如何优雅的写 Controller 层代码?

Actual combat! Talk about how to solve the deep paging problem of MySQL

Jetpack Compose 中的键盘处理

vim文本编辑器的一些使用小技巧

【冷冻电镜|论文阅读】A feature-guided, focused 3D signal permutation method for subtomogram averaging

Idea cannot find a database solution

C language memory stack and heap usage

5g service interface and reference point

ECCV 2022丨轻量级模型架ParC-Net 力压苹果MobileViT代码和论文下载

MySQL:当你CRUD时BufferPool中发生了什么?十张图就能说清楚
随机推荐
Overview of database system
模拟卷Leetcode【普通】222. 完全二叉树的节点个数
SS command details
网上传说软件测试培训真的那么黑心吗?都是骗局?
Teacher Cui Xueting's course notes on optimization theory and methods 00 are written in the front
实战!聊聊如何解决MySQL深分页问题
ECCV 2022丨轻量级模型架ParC-Net 力压苹果MobileViT代码和论文下载
MySQL: what happens in the bufferpool when you crud? Ten pictures can make it clear
The difference between pairs and ipairs
5g service interface and reference point
10道面试常问JVM题
2022年SQL经典面试题总结(带解析)
Cesium reflection
Teacher wangshuyao's operations research course notes 07 linear programming and simplex method (standard form, base, base solution, base feasible solution, feasible base)
mysql可以定时导出表格吗?
Teacher wangshuyao's notes on operations research 06 linear programming and simplex method (geometric significance)
吴恩达老师机器学习课程笔记 05 Octave教程
Cvpr2022oral special series (I): low light enhancement
Teacher Wu Enda's machine learning course notes 04 multiple linear regression
【论文阅读 | 冷冻电镜】RELION 4.0 中新的 subtomogram averaging 方法解读