当前位置:网站首页>Flink real-time warehouse DWD layer (processing complex data - installation and replacement of streams and tables) template code
Flink real-time warehouse DWD layer (processing complex data - installation and replacement of streams and tables) template code
2022-07-29 07:04:00 【Top master cultivation plan】
brief introduction
When only copying data processing , We can replace the table with a stream
Tool class
KafkaUtil
public class KafkaUtil {
private final static String BOOTSTRAP_SERVERS="master:9092";
/**
* Kafka-Source DDL sentence
*
* @param topic Data source topic
* @param groupId Consumer group
* @return Spliced Kafka data source DDL sentence
*/
public static String getKafkaDDL(String topic, String groupId) {
return " with ('connector' = 'kafka', " +
" 'topic' = '" + topic + "'," +
" 'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
" 'properties.group.id' = '" + groupId + "', " +
" 'format' = 'json', " +
" 'scan.startup.mode' = 'group-offsets')";
}
/**
* Kafka-Sink DDL sentence
*
* @param topic Output to Kafka Target theme for
* @return Spliced Kafka-Sink DDL sentence
*/
public static String getUpsertKafkaDDL(String topic) {
return "WITH ( " +
" 'connector' = 'upsert-kafka', " +
" 'topic' = '" + topic + "', " +
" 'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
" 'key.format' = 'json', " +
" 'value.format' = 'json' " +
")";
}
}Flink Sql Installed entity classes
@Data
public class CouponUseOrderBean {
// Coupon collection record id
String id;
// Coupon id
String coupon_id;
// user id
String user_id;
// Order id
String order_id;
// Coupon usage date ( Place an order )
String date_id;
// Coupon usage time ( Place an order )
String using_time;
// The historical data
String old;
// Time stamp
String ts;
}
Realization
public class DwdTradeOrderPreProcess {
public static void main(String[] args) throws Exception {
// TODO 1. Environmental preparation
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// TODO 2. Status backend settings
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, Time.days(1), Time.minutes(1)
));
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(
"hdfs://hadoop102:8020/ck"
);
System.setProperty("HADOOP_USER_NAME", "atguigu");
// TODO 3. from Kafka Read business data , Encapsulated in the Flink SQL surface
tableEnv.executeSql("create table `topic_db` ( " +
"`database` string, " +
"`table` string, " +
"`data` map<string, string>, " +
"`type` string, " +
"`old` string, " +
"`ts` string " +
")" + KafkaUtil.getKafkaDDL("topic_db", "dwd_tool_coupon_order"));
// TODO 4. Read the coupon collection table data , Encapsulated as a stream
Table couponUseOrder = tableEnv.sqlQuery("select " +
"data['id'] id, " +
"data['coupon_id'] coupon_id, " +
"data['user_id'] user_id, " +
"data['order_id'] order_id, " +
"date_format(data['using_time'],'yyyy-MM-dd') date_id, " +
"data['using_time'] using_time, " +
"`old`, " +
"ts " +
"from topic_db " +
"where `table` = 'coupon_use' " +
"and `type` = 'update' ");
DataStream<CouponUseOrderBean> couponUseOrderDS = tableEnv.toAppendStream(couponUseOrder, CouponUseOrderBean.class);
// TODO 5. Filter the coupon order data that meets the conditions , Package as table
SingleOutputStreamOperator<CouponUseOrderBean> filteredDS = couponUseOrderDS.filter(
couponUseOrderBean -> {
String old = couponUseOrderBean.getOld();
if(old != null) {
Map oldMap = JSON.parseObject(old, Map.class);
Set changeKeys = oldMap.keySet();
return changeKeys.contains("using_time");
}
return true;
}
);
Table resultTable = tableEnv.fromDataStream(filteredDS);
tableEnv.createTemporaryView("result_table", resultTable);
// TODO 6. establish Upsert-Kafka dwd_tool_coupon_order surface
tableEnv.executeSql("create table dwd_tool_coupon_order( " +
"id string, " +
"coupon_id string, " +
"user_id string, " +
"order_id string, " +
"date_id string, " +
"order_time string, " +
"ts string, " +
"primary key(id) not enforced " +
")" + KafkaUtil.getUpsertKafkaDDL("dwd_tool_coupon_order"));
// TODO 7. Write data to Upsert-Kafka surface
tableEnv.executeSql("" +
"insert into dwd_tool_coupon_order select " +
"id, " +
"coupon_id, " +
"user_id, " +
"order_id, " +
"date_id, " +
"using_time order_time, " +
"ts from result_table");
env.execute();
}
}
边栏推荐
- Leetcode-592: fraction addition and subtraction
- Teacher wangshuyao's operations research course notes 07 linear programming and simplex method (standard form, base, base solution, base feasible solution, feasible base)
- Overview of database system
- C language memory stack and heap usage
- 模拟卷Leetcode【普通】222. 完全二叉树的节点个数
- Idea cannot find a database solution
- Unity exploration plot access design analysis & process + code specific implementation
- 数据库多表查询 联合查询 增删改查
- 【论文阅读 | cryoET】Gum-Net:快速准确的3D Subtomo图像对齐和平均的无监督几何匹配
- Teacher Wu Enda machine learning course notes 01 introduction
猜你喜欢

线程同步—— 生产者与消费者、龟兔赛跑、双线程打印

我的创业邻居们

IO流 - File - properties

Etcd principle

Teacher Wu Enda's machine learning course notes 02 univariate linear regression

【论文阅读 | cryoET】Gum-Net:快速准确的3D Subtomo图像对齐和平均的无监督几何匹配

Unity探索地块通路设计分析 & 流程+代码具体实现

Teacher wangshuyao's notes on operations research course 10 linear programming and simplex method (discussion on detection number and degradation)

leetcode-592:分数加减运算

Idea cannot find a database solution
随机推荐
SDN topology discovery principle
leetcode-1331:数组序号转换
Salesforce中过滤器Filter使用的相对日期
【论文阅读 | 冷冻电镜】RELION 4.0 中新的 subtomogram averaging 方法解读
Teacher wangshuyao's notes on operations research 02 fundamentals of advanced mathematics
模拟卷Leetcode【普通】093. 复原 IP 地址
基于C语言设计的学籍管理系统
吴恩达老师机器学习课程笔记 03 线性代数回顾
Unity exploration plot access design analysis & process + code specific implementation
线程 - 线程安全 - 线程优化
谷歌零碎笔记之JWT(草稿)
buck电路boot和ph引脚实测
【冷冻电镜】Relion4.0——subtomogram教程
王树尧老师运筹学课程笔记 04 线性代数基础
Windows 上 php 7.4 连接 oracle 配置
DM数据守护集群搭建
Teacher wangshuyao's notes on operations research 05 linear programming and simplex method (concept, modeling, standard type)
Some tips of vim text editor
Teacher Wang Shuyao's notes on operations research 09 linear programming and simplex method (Application of simplex table)
MySQL: what happens in the bufferpool when you crud? Ten pictures can make it clear