当前位置:网站首页>Flink real-time warehouse DWD layer (Kafka associated with MySQL lookup join) template code
Flink real-time warehouse DWD layer (Kafka associated with MySQL lookup join) template code
2022-07-29 07:04:00 【Top master cultivation plan】
brief introduction
kafka Data Association lookup join The data of
Tool class
KafkaUtil
public class KafkaUtil {
private final static String BOOTSTRAP_SERVERS="master:9092";
/**
* Kafka-Source DDL sentence
*
* @param topic Data source topic
* @param groupId Consumer group
* @return Spliced Kafka data source DDL sentence
*/
public static String getKafkaDDL(String topic, String groupId) {
return " with ('connector' = 'kafka', " +
" 'topic' = '" + topic + "'," +
" 'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
" 'properties.group.id' = '" + groupId + "', " +
" 'format' = 'json', " +
" 'scan.startup.mode' = 'group-offsets')";
}
/**
* Kafka-Sink DDL sentence
*
* @param topic Output to Kafka Target theme for
* @return Spliced Kafka-Sink DDL sentence
*/
public static String getUpsertKafkaDDL(String topic) {
return "WITH ( " +
" 'connector' = 'upsert-kafka', " +
" 'topic' = '" + topic + "', " +
" 'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
" 'key.format' = 'json', " +
" 'value.format' = 'json' " +
")";
}
}
MysqlUtils
public class MysqlUtils {
public static String getBaseDicLookUpDDL() {
return "create table `base_dic`( " +
"`dic_code` string, " +
"`dic_name` string, " +
"`parent_code` string, " +
"`create_time` timestamp, " +
"`operate_time` timestamp, " +
"primary key(`dic_code`) not enforced " +
")" + MysqlUtils.mysqlLookUpTableDDL("base_dic");
}
public static String mysqlLookUpTableDDL(String tableName) {
String ddl = "WITH ( " +
"'connector' = 'jdbc', " +
"'url' = 'jdbc:mysql://hadoop102:3306/gmall', " +
"'table-name' = '" + tableName + "', " +
"'lookup.cache.max-rows' = '10', " +
"'lookup.cache.ttl' = '1 hour', " +
"'username' = 'root', " +
"'password' = '000000', " +
"'driver' = 'com.mysql.cj.jdbc.Driver' " +
")";
return ddl;
}
}Realization
public class DwdTradeOrderPreProcess {
public static void main(String[] args) throws Exception {
// TODO 1. Environmental preparation
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// TODO 2. Status backend settings
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, Time.days(1), Time.minutes(1)
));
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(
"hdfs://hadoop102:8020/ck"
);
System.setProperty("HADOOP_USER_NAME", "atguigu");
// TODO 3. from Kafka Read topic_db data , Encapsulated in the Flink SQL surface
tableEnv.executeSql("create table topic_db(" +
"`database` string, " +
"`table` string, " +
"`type` string, " +
"`data` map<string, string>, " +
"`old` map<string, string>, " +
//TODO Important processing time function
"`proc_time` as PROCTIME(), " +
"`ts` string " +
")" + KafkaUtil.getKafkaDDL("topic_db", "dwd_trade_order_refund"));
// TODO 4. Read chargeback table data
Table orderRefundInfo = tableEnv.sqlQuery("select " +
"data['id'] id, " +
"data['user_id'] user_id, " +
"data['order_id'] order_id, " +
"data['sku_id'] sku_id, " +
"data['refund_type'] refund_type, " +
"data['refund_num'] refund_num, " +
"data['refund_amount'] refund_amount, " +
"data['refund_reason_type'] refund_reason_type, " +
"data['refund_reason_txt'] refund_reason_txt, " +
"data['create_time'] create_time, " +
"proc_time, " +
"ts " +
"from topic_db " +
"where `table` = 'order_refund_info' " +
"and `type` = 'insert' ");
tableEnv.createTemporaryView("order_refund_info", orderRefundInfo);
// TODO 5. Read order table data , Filter chargeback data
Table orderInfoRefund = tableEnv.sqlQuery("select " +
"data['id'] id, " +
"data['province_id'] province_id, " +
"`old` " +
"from topic_db " +
"where `table` = 'order_info' " +
"and `type` = 'update' " +
"and data['order_status']='1005' " +
"and `old`['order_status'] is not null");
tableEnv.createTemporaryView("order_info_refund", orderInfoRefund);
// TODO 6. establish MySQL-LookUp Dictionary table
tableEnv.executeSql(MysqlUtils.getBaseDicLookUpDDL());
// TODO 7. Associate three tables to obtain the chargeback wide table
Table resultTable = tableEnv.sqlQuery("select " +
"ri.id, " +
"ri.user_id, " +
"ri.order_id, " +
"ri.sku_id, " +
"oi.province_id, " +
"date_format(ri.create_time,'yyyy-MM-dd') date_id, " +
"ri.create_time, " +
"ri.refund_type, " +
"type_dic.dic_name, " +
"ri.refund_reason_type, " +
"reason_dic.dic_name, " +
"ri.refund_reason_txt, " +
"ri.refund_num, " +
"ri.refund_amount, " +
"ri.ts, " +
"current_row_timestamp() row_op_ts " +
"from order_refund_info ri " +
"left join " +
"order_info_refund oi " +
"on ri.order_id = oi.id " +
"left join " +
"base_dic for system_time as of ri.proc_time as type_dic " +
"on ri.refund_type = type_dic.dic_code " +
"left join " +
"base_dic for system_time as of ri.proc_time as reason_dic " +
"on ri.refund_reason_type=reason_dic.dic_code");
tableEnv.createTemporaryView("result_table", resultTable);
// TODO 8. establish Upsert-Kafka dwd_trade_order_refund surface
tableEnv.executeSql("create table dwd_trade_order_refund( " +
"id string, " +
"user_id string, " +
"order_id string, " +
"sku_id string, " +
"province_id string, " +
"date_id string, " +
"create_time string, " +
"refund_type_code string, " +
"refund_type_name string, " +
"refund_reason_type_code string, " +
"refund_reason_type_name string, " +
"refund_reason_txt string, " +
"refund_num string, " +
"refund_amount string, " +
"ts string, " +
"row_op_ts timestamp_ltz(3), " +
"primary key(id) not enforced " +
")" + KafkaUtil.getUpsertKafkaDDL("dwd_trade_order_refund"));
// TODO 9. Write the correlation result to Upsert-Kafka surface
tableEnv.executeSql("" +
"insert into dwd_trade_order_refund select * from result_table");
}
}边栏推荐
- 【C语言刷LeetCode】2332. 坐上公交的最晚时间(M)
- Improved pillar with fine grained feature for 3D object detection paper notes
- Some tips of vim text editor
- 好文佳句摘录
- MySql基础知识(高频面试题)
- Teacher Wu Enda's machine learning course notes 04 multiple linear regression
- MySQL queries are case sensitive
- 没那么简单的单例模式
- 二次元卡通渲染——进阶技巧
- Teacher wangshuyao's notes on operations research course 10 linear programming and simplex method (discussion on detection number and degradation)
猜你喜欢

实战!聊聊如何解决MySQL深分页问题

怎么会不喜欢呢,CICD中轻松发送邮件

Decompilation of wechat applet

Unity exploration plot access design analysis & process + code specific implementation

Connecting PHP 7.4 to Oracle configuration on Windows

Basic knowledge of MySQL (high frequency interview questions)

IO stream - file - properties

vim文本编辑器的一些使用小技巧

阿里一面,给了几条SQL,问需要执行几次树搜索操作?

Cvpr2022oral special series (I): low light enhancement
随机推荐
微信小程序的反编译
Flink实时仓库-DWD层(kafka-关联mysql的lookup join)模板代码
Hj37 statistics of the total number of rabbits per month Fibonacci series
Invalid access control
HJ37 统计每个月兔子的总数 斐波那契数列
联邦学习后门攻击总结(2019-2022)
vscode通过remotessh结合xdebug远程调试php解决方案
Cvpr2022oral special series (I): low light enhancement
Software definition boundary SDP
Share some tips for better code, smooth coding and improve efficiency
Unity free element special effect recommendation
记 - 踩坑-实时数仓开发 - doris/pg/flink
基于C语言设计的学籍管理系统
Jetpack Compose 中的键盘处理
Junda technology | applicable to "riyueyuan" brand ups wechat cloud monitoring card
Teacher wangshuyao's notes on operations research 02 fundamentals of advanced mathematics
Windows 上 php 7.4 连接 oracle 配置
SSH免密登录-两台虚拟机建立免密通道 双向信任
Flink real time warehouse DWD layer (traffic domain) template code
Teacher Wu Enda machine learning course notes 01 introduction