当前位置:网站首页>Flink实时仓库-DWD层(kafka-关联mysql的lookup join)模板代码
Flink实时仓库-DWD层(kafka-关联mysql的lookup join)模板代码
2022-07-29 05:54:00 【顶尖高手养成计划】
简介
kafka的数据关联lookup join的数据
工具类
KafkaUtil
public class KafkaUtil {
private final static String BOOTSTRAP_SERVERS="master:9092";
/**
* Kafka-Source DDL 语句
*
* @param topic 数据源主题
* @param groupId 消费者组
* @return 拼接好的 Kafka 数据源 DDL 语句
*/
public static String getKafkaDDL(String topic, String groupId) {
return " with ('connector' = 'kafka', " +
" 'topic' = '" + topic + "'," +
" 'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
" 'properties.group.id' = '" + groupId + "', " +
" 'format' = 'json', " +
" 'scan.startup.mode' = 'group-offsets')";
}
/**
* Kafka-Sink DDL 语句
*
* @param topic 输出到 Kafka 的目标主题
* @return 拼接好的 Kafka-Sink DDL 语句
*/
public static String getUpsertKafkaDDL(String topic) {
return "WITH ( " +
" 'connector' = 'upsert-kafka', " +
" 'topic' = '" + topic + "', " +
" 'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
" 'key.format' = 'json', " +
" 'value.format' = 'json' " +
")";
}
}
MysqlUtils
public class MysqlUtils {
public static String getBaseDicLookUpDDL() {
return "create table `base_dic`( " +
"`dic_code` string, " +
"`dic_name` string, " +
"`parent_code` string, " +
"`create_time` timestamp, " +
"`operate_time` timestamp, " +
"primary key(`dic_code`) not enforced " +
")" + MysqlUtils.mysqlLookUpTableDDL("base_dic");
}
public static String mysqlLookUpTableDDL(String tableName) {
String ddl = "WITH ( " +
"'connector' = 'jdbc', " +
"'url' = 'jdbc:mysql://hadoop102:3306/gmall', " +
"'table-name' = '" + tableName + "', " +
"'lookup.cache.max-rows' = '10', " +
"'lookup.cache.ttl' = '1 hour', " +
"'username' = 'root', " +
"'password' = '000000', " +
"'driver' = 'com.mysql.cj.jdbc.Driver' " +
")";
return ddl;
}
}实现
public class DwdTradeOrderPreProcess {
public static void main(String[] args) throws Exception {
// TODO 1. 环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// TODO 2. 状态后端设置
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, Time.days(1), Time.minutes(1)
));
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(
"hdfs://hadoop102:8020/ck"
);
System.setProperty("HADOOP_USER_NAME", "atguigu");
// TODO 3. 从 Kafka 读取 topic_db 数据,封装为 Flink SQL 表
tableEnv.executeSql("create table topic_db(" +
"`database` string, " +
"`table` string, " +
"`type` string, " +
"`data` map<string, string>, " +
"`old` map<string, string>, " +
//TODO 重要处理时间函数
"`proc_time` as PROCTIME(), " +
"`ts` string " +
")" + KafkaUtil.getKafkaDDL("topic_db", "dwd_trade_order_refund"));
// TODO 4. 读取退单表数据
Table orderRefundInfo = tableEnv.sqlQuery("select " +
"data['id'] id, " +
"data['user_id'] user_id, " +
"data['order_id'] order_id, " +
"data['sku_id'] sku_id, " +
"data['refund_type'] refund_type, " +
"data['refund_num'] refund_num, " +
"data['refund_amount'] refund_amount, " +
"data['refund_reason_type'] refund_reason_type, " +
"data['refund_reason_txt'] refund_reason_txt, " +
"data['create_time'] create_time, " +
"proc_time, " +
"ts " +
"from topic_db " +
"where `table` = 'order_refund_info' " +
"and `type` = 'insert' ");
tableEnv.createTemporaryView("order_refund_info", orderRefundInfo);
// TODO 5. 读取订单表数据,筛选退单数据
Table orderInfoRefund = tableEnv.sqlQuery("select " +
"data['id'] id, " +
"data['province_id'] province_id, " +
"`old` " +
"from topic_db " +
"where `table` = 'order_info' " +
"and `type` = 'update' " +
"and data['order_status']='1005' " +
"and `old`['order_status'] is not null");
tableEnv.createTemporaryView("order_info_refund", orderInfoRefund);
// TODO 6. 建立 MySQL-LookUp 字典表
tableEnv.executeSql(MysqlUtils.getBaseDicLookUpDDL());
// TODO 7. 关联三张表获得退单宽表
Table resultTable = tableEnv.sqlQuery("select " +
"ri.id, " +
"ri.user_id, " +
"ri.order_id, " +
"ri.sku_id, " +
"oi.province_id, " +
"date_format(ri.create_time,'yyyy-MM-dd') date_id, " +
"ri.create_time, " +
"ri.refund_type, " +
"type_dic.dic_name, " +
"ri.refund_reason_type, " +
"reason_dic.dic_name, " +
"ri.refund_reason_txt, " +
"ri.refund_num, " +
"ri.refund_amount, " +
"ri.ts, " +
"current_row_timestamp() row_op_ts " +
"from order_refund_info ri " +
"left join " +
"order_info_refund oi " +
"on ri.order_id = oi.id " +
"left join " +
"base_dic for system_time as of ri.proc_time as type_dic " +
"on ri.refund_type = type_dic.dic_code " +
"left join " +
"base_dic for system_time as of ri.proc_time as reason_dic " +
"on ri.refund_reason_type=reason_dic.dic_code");
tableEnv.createTemporaryView("result_table", resultTable);
// TODO 8. 建立 Upsert-Kafka dwd_trade_order_refund 表
tableEnv.executeSql("create table dwd_trade_order_refund( " +
"id string, " +
"user_id string, " +
"order_id string, " +
"sku_id string, " +
"province_id string, " +
"date_id string, " +
"create_time string, " +
"refund_type_code string, " +
"refund_type_name string, " +
"refund_reason_type_code string, " +
"refund_reason_type_name string, " +
"refund_reason_txt string, " +
"refund_num string, " +
"refund_amount string, " +
"ts string, " +
"row_op_ts timestamp_ltz(3), " +
"primary key(id) not enforced " +
")" + KafkaUtil.getUpsertKafkaDDL("dwd_trade_order_refund"));
// TODO 9. 将关联结果写入 Upsert-Kafka 表
tableEnv.executeSql("" +
"insert into dwd_trade_order_refund select * from result_table");
}
}边栏推荐
- Excerpts from good essays
- 线程 - 线程安全 - 线程优化
- 吴恩达老师机器学习课程笔记 04 多元线性回归
- Security in quantum machine learning
- 【冷冻电镜|论文阅读】子断层平均 M 软件解读:Multi-particle cryo-EM refinement with M
- DM数据守护集群搭建
- Implementation of DDP cluster distributed training under pytoch multi GPU conditions (brief introduction - from scratch)
- Pytorch多GPU条件下DDP集群分布训练实现(简述-从无到有)
- Is online legend software testing training really so black hearted? Are they all scams?
- Jetpack Compose 中的键盘处理
猜你喜欢

10 frequently asked JVM questions in interviews

SSH免密登录-两台虚拟机建立免密通道 双向信任

Federal learning backdoor attack summary (2019-2022)

分享一些你代码更好的小建议,流畅编码提搞效率

Software definition boundary SDP

实战!聊聊如何解决MySQL深分页问题

【冷冻电镜入门】加州理工公开课课程笔记 Part 3: Image Formation

Ali gave several SQL messages and asked how many tree search operations need to be performed?

Leetcode-592: fraction addition and subtraction

线程 - 线程安全 - 线程优化
随机推荐
【冷冻电镜|论文阅读】A feature-guided, focused 3D signal permutation method for subtomogram averaging
数据库使用psql及jdbc进行远程连接,不定时自动断开的解决办法
Teacher wangshuyao's notes on operations research 01 guidance and introduction
Windows 上 php 7.4 连接 oracle 配置
Simulation volume leetcode [general] 150. evaluation of inverse Polish expression
没那么简单的单例模式
【冷冻电镜|论文阅读】子断层平均 M 软件解读:Multi-particle cryo-EM refinement with M
Why does 5g N2 interface control plane use SCTP protocol?
【冷冻电镜】Relion4.0——subtomogram教程
好文佳句摘录
数据库系统概述
Share some tips for better code, smooth coding and improve efficiency
Leetcode-592: fraction addition and subtraction
Jetpack Compose 中的键盘处理
Etcd principle
Teacher wangshuyao's notes on operations research 03 KKT theorem
Thread - thread safety - thread optimization
Unity free element special effect recommendation
'function VTable for error: undefined reference to... 'cause and solution of the problem
微信小程序的反编译