当前位置:网站首页>Flink实时仓库-DWD层(处理复杂数据-流和表的装换处理)模板代码
Flink实时仓库-DWD层(处理复杂数据-流和表的装换处理)模板代码
2022-07-29 05:54:00 【顶尖高手养成计划】
简介
单有复制的数据处理的时候,我们可以把表装换成流来处理
工具类
KafkaUtil
public class KafkaUtil {
private final static String BOOTSTRAP_SERVERS="master:9092";
/**
* Kafka-Source DDL 语句
*
* @param topic 数据源主题
* @param groupId 消费者组
* @return 拼接好的 Kafka 数据源 DDL 语句
*/
public static String getKafkaDDL(String topic, String groupId) {
return " with ('connector' = 'kafka', " +
" 'topic' = '" + topic + "'," +
" 'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
" 'properties.group.id' = '" + groupId + "', " +
" 'format' = 'json', " +
" 'scan.startup.mode' = 'group-offsets')";
}
/**
* Kafka-Sink DDL 语句
*
* @param topic 输出到 Kafka 的目标主题
* @return 拼接好的 Kafka-Sink DDL 语句
*/
public static String getUpsertKafkaDDL(String topic) {
return "WITH ( " +
" 'connector' = 'upsert-kafka', " +
" 'topic' = '" + topic + "', " +
" 'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
" 'key.format' = 'json', " +
" 'value.format' = 'json' " +
")";
}
}Flink Sql装换的实体类
@Data
public class CouponUseOrderBean {
// 优惠券领用记录 id
String id;
// 优惠券 id
String coupon_id;
// 用户 id
String user_id;
// 订单 id
String order_id;
// 优惠券使用日期(下单)
String date_id;
// 优惠券使用时间(下单)
String using_time;
// 历史数据
String old;
// 时间戳
String ts;
}
实现
public class DwdTradeOrderPreProcess {
public static void main(String[] args) throws Exception {
// TODO 1. 环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// TODO 2. 状态后端设置
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, Time.days(1), Time.minutes(1)
));
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(
"hdfs://hadoop102:8020/ck"
);
System.setProperty("HADOOP_USER_NAME", "atguigu");
// TODO 3. 从 Kafka 读取业务数据,封装为 Flink SQL 表
tableEnv.executeSql("create table `topic_db` ( " +
"`database` string, " +
"`table` string, " +
"`data` map<string, string>, " +
"`type` string, " +
"`old` string, " +
"`ts` string " +
")" + KafkaUtil.getKafkaDDL("topic_db", "dwd_tool_coupon_order"));
// TODO 4. 读取优惠券领用表数据,封装为流
Table couponUseOrder = tableEnv.sqlQuery("select " +
"data['id'] id, " +
"data['coupon_id'] coupon_id, " +
"data['user_id'] user_id, " +
"data['order_id'] order_id, " +
"date_format(data['using_time'],'yyyy-MM-dd') date_id, " +
"data['using_time'] using_time, " +
"`old`, " +
"ts " +
"from topic_db " +
"where `table` = 'coupon_use' " +
"and `type` = 'update' ");
DataStream<CouponUseOrderBean> couponUseOrderDS = tableEnv.toAppendStream(couponUseOrder, CouponUseOrderBean.class);
// TODO 5. 过滤满足条件的优惠券下单数据,封装为表
SingleOutputStreamOperator<CouponUseOrderBean> filteredDS = couponUseOrderDS.filter(
couponUseOrderBean -> {
String old = couponUseOrderBean.getOld();
if(old != null) {
Map oldMap = JSON.parseObject(old, Map.class);
Set changeKeys = oldMap.keySet();
return changeKeys.contains("using_time");
}
return true;
}
);
Table resultTable = tableEnv.fromDataStream(filteredDS);
tableEnv.createTemporaryView("result_table", resultTable);
// TODO 6. 建立 Upsert-Kafka dwd_tool_coupon_order 表
tableEnv.executeSql("create table dwd_tool_coupon_order( " +
"id string, " +
"coupon_id string, " +
"user_id string, " +
"order_id string, " +
"date_id string, " +
"order_time string, " +
"ts string, " +
"primary key(id) not enforced " +
")" + KafkaUtil.getUpsertKafkaDDL("dwd_tool_coupon_order"));
// TODO 7. 将数据写入 Upsert-Kafka 表
tableEnv.executeSql("" +
"insert into dwd_tool_coupon_order select " +
"id, " +
"coupon_id, " +
"user_id, " +
"order_id, " +
"date_id, " +
"using_time order_time, " +
"ts from result_table");
env.execute();
}
}
边栏推荐
- 王树尧老师运筹学课程笔记 04 线性代数基础
- 【论文阅读】TomoAlign: A novel approach to correcting sample motion and 3D CTF in CryoET
- Overview of database system
- [CF1054H] Epic Convolution——数论,卷积,任意模数NTT
- 吴恩达老师机器学习课程笔记 02 单变量线性回归
- 数仓建模,什么是宽表?如何设计?好处与不足
- 数据库多表查询 联合查询 增删改查
- 王树尧老师运筹学课程笔记 08 线性规划与单纯形法(单纯形法)
- 10 frequently asked JVM questions in interviews
- 基于C语言设计的学籍管理系统
猜你喜欢

C language memory stack and heap usage

2D cartoon rendering - advanced skills

5g service interface and reference point

Leetcode-592: fraction addition and subtraction

【论文阅读 | 冷冻电镜】RELION 4.0 中新的 subtomogram averaging 方法解读

Ali gave several SQL messages and asked how many tree search operations need to be performed?

Teacher wangshuyao's operations research course notes 07 linear programming and simplex method (standard form, base, base solution, base feasible solution, feasible base)

LDAP brief description and unified authentication description

vim文本编辑器的一些使用小技巧

IDEA中实现Mapper接口到映射文件xml的跳转
随机推荐
leetcode-1331:数组序号转换
Teacher wangshuyao's notes on operations research 04 fundamentals of linear algebra
Shallow reading of condition object source code
LDAP brief description and unified authentication description
吴恩达老师机器学习课程笔记 03 线性代数回顾
Basic knowledge of MySQL (high frequency interview questions)
10 frequently asked JVM questions in interviews
数据库使用psql及jdbc进行远程连接,不定时自动断开的解决办法
Teacher Cui Xueting's course notes on optimization theory and methods 00 are written in the front
二次元卡通渲染——进阶技巧
2D cartoon rendering - advanced skills
【冷冻电镜|论文阅读】A feature-guided, focused 3D signal permutation method for subtomogram averaging
Apisik health check test
模拟卷Leetcode【普通】150. 逆波兰表达式求值
【干货备忘】50种Matplotlib科研论文绘图合集,含代码实现
Teacher wangshuyao's notes on operations research 02 fundamentals of advanced mathematics
MySQL:当你CRUD时BufferPool中发生了什么?十张图就能说清楚
Teacher Wu Enda's machine learning course notes 03 review of linear algebra
Simulation volume leetcode [normal] 222. number of nodes of complete binary tree
王树尧老师运筹学课程笔记 07 线性规划与单纯形法(标准型、基、基解、基可行解、可行基)