当前位置:网站首页>Flink实时仓库-DWD层(处理复杂数据-流和表的装换处理)模板代码
Flink实时仓库-DWD层(处理复杂数据-流和表的装换处理)模板代码
2022-07-29 05:54:00 【顶尖高手养成计划】
简介
单有复制的数据处理的时候,我们可以把表装换成流来处理
工具类
KafkaUtil
public class KafkaUtil {
private final static String BOOTSTRAP_SERVERS="master:9092";
/**
* Kafka-Source DDL 语句
*
* @param topic 数据源主题
* @param groupId 消费者组
* @return 拼接好的 Kafka 数据源 DDL 语句
*/
public static String getKafkaDDL(String topic, String groupId) {
return " with ('connector' = 'kafka', " +
" 'topic' = '" + topic + "'," +
" 'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
" 'properties.group.id' = '" + groupId + "', " +
" 'format' = 'json', " +
" 'scan.startup.mode' = 'group-offsets')";
}
/**
* Kafka-Sink DDL 语句
*
* @param topic 输出到 Kafka 的目标主题
* @return 拼接好的 Kafka-Sink DDL 语句
*/
public static String getUpsertKafkaDDL(String topic) {
return "WITH ( " +
" 'connector' = 'upsert-kafka', " +
" 'topic' = '" + topic + "', " +
" 'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
" 'key.format' = 'json', " +
" 'value.format' = 'json' " +
")";
}
}Flink Sql装换的实体类
@Data
public class CouponUseOrderBean {
// 优惠券领用记录 id
String id;
// 优惠券 id
String coupon_id;
// 用户 id
String user_id;
// 订单 id
String order_id;
// 优惠券使用日期(下单)
String date_id;
// 优惠券使用时间(下单)
String using_time;
// 历史数据
String old;
// 时间戳
String ts;
}
实现
public class DwdTradeOrderPreProcess {
public static void main(String[] args) throws Exception {
// TODO 1. 环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// TODO 2. 状态后端设置
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, Time.days(1), Time.minutes(1)
));
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(
"hdfs://hadoop102:8020/ck"
);
System.setProperty("HADOOP_USER_NAME", "atguigu");
// TODO 3. 从 Kafka 读取业务数据,封装为 Flink SQL 表
tableEnv.executeSql("create table `topic_db` ( " +
"`database` string, " +
"`table` string, " +
"`data` map<string, string>, " +
"`type` string, " +
"`old` string, " +
"`ts` string " +
")" + KafkaUtil.getKafkaDDL("topic_db", "dwd_tool_coupon_order"));
// TODO 4. 读取优惠券领用表数据,封装为流
Table couponUseOrder = tableEnv.sqlQuery("select " +
"data['id'] id, " +
"data['coupon_id'] coupon_id, " +
"data['user_id'] user_id, " +
"data['order_id'] order_id, " +
"date_format(data['using_time'],'yyyy-MM-dd') date_id, " +
"data['using_time'] using_time, " +
"`old`, " +
"ts " +
"from topic_db " +
"where `table` = 'coupon_use' " +
"and `type` = 'update' ");
DataStream<CouponUseOrderBean> couponUseOrderDS = tableEnv.toAppendStream(couponUseOrder, CouponUseOrderBean.class);
// TODO 5. 过滤满足条件的优惠券下单数据,封装为表
SingleOutputStreamOperator<CouponUseOrderBean> filteredDS = couponUseOrderDS.filter(
couponUseOrderBean -> {
String old = couponUseOrderBean.getOld();
if(old != null) {
Map oldMap = JSON.parseObject(old, Map.class);
Set changeKeys = oldMap.keySet();
return changeKeys.contains("using_time");
}
return true;
}
);
Table resultTable = tableEnv.fromDataStream(filteredDS);
tableEnv.createTemporaryView("result_table", resultTable);
// TODO 6. 建立 Upsert-Kafka dwd_tool_coupon_order 表
tableEnv.executeSql("create table dwd_tool_coupon_order( " +
"id string, " +
"coupon_id string, " +
"user_id string, " +
"order_id string, " +
"date_id string, " +
"order_time string, " +
"ts string, " +
"primary key(id) not enforced " +
")" + KafkaUtil.getUpsertKafkaDDL("dwd_tool_coupon_order"));
// TODO 7. 将数据写入 Upsert-Kafka 表
tableEnv.executeSql("" +
"insert into dwd_tool_coupon_order select " +
"id, " +
"coupon_id, " +
"user_id, " +
"order_id, " +
"date_id, " +
"using_time order_time, " +
"ts from result_table");
env.execute();
}
}
边栏推荐
- Teacher wangshuyao's notes on operations research course 08 linear programming and simplex method (simplex method)
- 【技能积累】写邮件时的常用表达
- 【干货备忘】50种Matplotlib科研论文绘图合集,含代码实现
- 线程同步—— 生产者与消费者、龟兔赛跑、双线程打印
- 线程 - 线程安全 - 线程优化
- 吴恩达老师机器学习课程笔记 03 线性代数回顾
- 阿里一面,给了几条SQL,问需要执行几次树搜索操作?
- Junda technology | applicable to "riyueyuan" brand ups wechat cloud monitoring card
- Analog volume leetcode [normal] 093. Restore IP address
- 2022年SQL经典面试题总结(带解析)
猜你喜欢

N2 interface of 5g control plane protocol

leetcode-592:分数加减运算

Unity exploration plot access design analysis & process + code specific implementation

Software definition boundary SDP

Analysis of four isolation levels of MySQL things

C language memory stack and heap usage

【冷冻电镜】RELION4.0之subtomogram对位功能源码分析(自用)

Idea cannot find a database solution

【冷冻电镜|论文阅读】子断层平均 M 软件解读:Multi-particle cryo-EM refinement with M

阿里一面,给了几条SQL,问需要执行几次树搜索操作?
随机推荐
王树尧老师运筹学课程笔记 09 线性规划与单纯形法(单纯形表的应用)
【冷冻电镜|论文阅读】A feature-guided, focused 3D signal permutation method for subtomogram averaging
【论文阅读 | cryoET】Gum-Net:快速准确的3D Subtomo图像对齐和平均的无监督几何匹配
Relative date used by filter in salesforce
实战!聊聊如何解决MySQL深分页问题
JVM之垃圾回收机制(GC)
Unity exploration plot access design analysis & process + code specific implementation
关于SQL Server语句入门级应用阶段性学习——找工作必备(一)
Unity免费元素特效推荐
数据库使用psql及jdbc进行远程连接,不定时自动断开的解决办法
HJ37 统计每个月兔子的总数 斐波那契数列
【备忘】关于ssh为什么会失败的原因总结?下次记得来找。
MySQL: what happens in the bufferpool when you crud? Ten pictures can make it clear
Simulation volume leetcode [normal] 222. number of nodes of complete binary tree
数据库多表查询 联合查询 增删改查
Security in quantum machine learning
【CryoEM】FSC, Fourier Shell Correlation简介
量子机器学习中的安全性问题
leetcode-592:分数加减运算
吴恩达老师机器学习课程笔记 03 线性代数回顾