当前位置:网站首页>Flink实时仓库-DWS层(状态编程,windowall的使用,数据保存到clickhouse)模板代码
Flink实时仓库-DWS层(状态编程,windowall的使用,数据保存到clickhouse)模板代码
2022-07-30 14:29:00 【顶尖高手养成计划】
简介
键控状态的使用还有windowall的时候,结果保存到clickhouse
工具类
public class MyKafkaUtil {
static String BOOTSTRAP_SERVERS = "master:9092, node1:9092, node2:9092";
static String DEFAULT_TOPIC = "default_topic";
/**
* 根据主题还有消费者组得到消费者
*
* @param topic
* @param groupId
* @return
*/
public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic, String groupId) {
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic,
//由于默认的解码器,如果字符串为空的时候他会保存,所以自定义一个
new KafkaDeserializationSchema<String>() {
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
if (record == null || record.value() == null) {
return "";
}
return new String(record.value());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}, prop);
return consumer;
}
/**
* 根据主题得到生产者
*
* @param topic
* @return
*/
public static FlinkKafkaProducer<String> getKafkaProducer(String topic) {
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
prop.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60 * 15 * 1000 + "");
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(DEFAULT_TOPIC, new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String jsonStr, @Nullable Long timestamp) {
return new ProducerRecord<byte[], byte[]>(topic, jsonStr.getBytes());
}
}, prop,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
return producer;
}
}
public class ClickHouseUtil {
// ClickHouse 驱动
public static final String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
// ClickHouse 连接 URL,gmall_rebuild是数据库create database gmall_rebuild;
public static final String CLICKHOUSE_URL = "jdbc:clickhouse://master:8123/gmall_rebuild";
public static <T> SinkFunction<T> getJdbcSink(String sql) {
return JdbcSink.<T>sink(
sql,
new JdbcStatementBuilder<T>() {
@Override
public void accept(PreparedStatement preparedStatement, T obj) throws SQLException {
Field[] declaredFields = obj.getClass().getDeclaredFields();
int skipNum = 0;
for (int i = 0; i < declaredFields.length; i++) {
Field declaredField = declaredFields[i];
//使用这个自定义注解的作用是如果有些字段是 不想保存的时候可以标记它
TransientSink transientSink = declaredField.getAnnotation(TransientSink.class);
if (transientSink != null) {
skipNum++;
continue;
}
declaredField.setAccessible(true);
try {
Object value = declaredField.get(obj);
preparedStatement.setObject(i + 1 - skipNum, value);
} catch (IllegalAccessException e) {
System.out.println("ClickHouse 数据插入 SQL 占位符传参异常 ~");
e.printStackTrace();
}
}
}
},
JdbcExecutionOptions.builder()
.withBatchIntervalMs(5000L)
.withBatchSize(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName(CLICKHOUSE_DRIVER)
.withUrl(CLICKHOUSE_URL)
.build()
);
}
}
@Data
@AllArgsConstructor
public class TrafficHomeDetailPageViewBean {
// 窗口起始时间
String stt;
// 窗口结束时间
String edt;
// 首页独立访客数
Long homeUvCt;
// 商品详情页独立访客数
Long goodDetailUvCt;
// 时间戳
Long ts;
}
应用实现
public class DwsTrafficPageViewWindow {
public static void main(String[] args) throws Exception {
// TODO 1. 环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// TODO 2. 状态后端设置
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
env.setRestartStrategy(
RestartStrategies.failureRateRestart(
3, Time.days(1), Time.minutes(1)
)
);
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(
"hdfs://hadoop102:8020/ck"
);
System.setProperty("HADOOP_USER_NAME", "atguigu");
// TODO 3. 读取 Kafka dwd_traffic_page_log 数据,封装为流
String topic = "dwd_traffic_page_log";
String groupId = "dws_traffic_page_view_window";
FlinkKafkaConsumer<String> kafkaConsumer = MyKafkaUtil.getKafkaConsumer(topic, groupId);
DataStreamSource<String> source = env.addSource(kafkaConsumer);
// TODO 4. 转换数据结构 String -> JSONObject
SingleOutputStreamOperator<JSONObject> mappedStream = source.map(JSON::parseObject);
// TODO 5. 过滤 page_id 不为 home && page_id 不为 good_detail 的数据
SingleOutputStreamOperator<JSONObject> filteredStream = mappedStream.filter(
jsonObj -> {
JSONObject page = jsonObj.getJSONObject("page");
String pageId = page.getString("page_id");
return pageId.equals("home") || pageId.equals("good_detail");
});
// TODO 6. 设置水位线
SingleOutputStreamOperator<JSONObject> withWatermarkDS = filteredStream.assignTimestampsAndWatermarks(
WatermarkStrategy
.<JSONObject>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject element, long recordTimestamp) {
return element.getLong("ts");
}
}
)
);
// TODO 7. 按照 mid 分组
KeyedStream<JSONObject, String> keyedStream = withWatermarkDS.keyBy(r -> r.getJSONObject("common").getString("mid"));
// TODO 8. 鉴别独立访客,转换数据结构
SingleOutputStreamOperator<TrafficHomeDetailPageViewBean> uvStream = keyedStream.process(
new KeyedProcessFunction<String, JSONObject, TrafficHomeDetailPageViewBean>() {
private ValueState<String> homeLastVisitDt;
private ValueState<String> detailLastVisitDt;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
homeLastVisitDt = getRuntimeContext().getState(
new ValueStateDescriptor<String>("home_last_visit_dt", String.class)
);
detailLastVisitDt = getRuntimeContext().getState(
new ValueStateDescriptor<String>("detail_last_visit_dt", String.class)
);
}
@Override
public void processElement(JSONObject jsonObj, Context ctx, Collector<TrafficHomeDetailPageViewBean> out) throws Exception {
String homeLastDt = homeLastVisitDt.value();
String detailLastDt = detailLastVisitDt.value();
JSONObject page = jsonObj.getJSONObject("page");
String pageId = page.getString("page_id");
Long ts = jsonObj.getLong("ts");
String visitDt = DateFormatUtil.toDate(ts);
Long homeUvCt = 0L;
Long detailUvCt = 0L;
if (pageId.equals("home")) {
if (homeLastDt == null || !homeLastDt.equals(visitDt)) {
homeUvCt = 1L;
homeLastVisitDt.update(visitDt);
}
}
if (pageId.equals("good_detail")) {
if (detailLastDt == null || !detailLastDt.equals(visitDt)) {
detailUvCt = 1L;
detailLastVisitDt.update(visitDt);
}
}
if (homeUvCt != 0 || detailUvCt != 0) {
out.collect(new TrafficHomeDetailPageViewBean(
"",
"",
homeUvCt,
detailUvCt,
0L
));
}
}
}
);
// TODO 9. 开窗
AllWindowedStream<TrafficHomeDetailPageViewBean, TimeWindow> windowStream = uvStream.windowAll(TumblingEventTimeWindows.of(
org.apache.flink.streaming.api.windowing.time.Time.seconds(10L)));
// TODO 10. 聚合
SingleOutputStreamOperator<TrafficHomeDetailPageViewBean> reducedStream = windowStream.reduce(
new ReduceFunction<TrafficHomeDetailPageViewBean>() {
@Override
public TrafficHomeDetailPageViewBean reduce(TrafficHomeDetailPageViewBean value1, TrafficHomeDetailPageViewBean value2) throws Exception {
value1.setGoodDetailUvCt(
value1.getGoodDetailUvCt() + value2.getGoodDetailUvCt()
);
value1.setHomeUvCt(
value1.getHomeUvCt() + value2.getHomeUvCt()
);
return value1;
}
},
new AllWindowFunction<TrafficHomeDetailPageViewBean, TrafficHomeDetailPageViewBean, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<TrafficHomeDetailPageViewBean> values, Collector<TrafficHomeDetailPageViewBean> out) throws Exception {
String stt = DateFormatUtil.toYmdHms(window.getStart());
String edt = DateFormatUtil.toYmdHms(window.getEnd());
for (TrafficHomeDetailPageViewBean value : values) {
value.setStt(stt);
value.setEdt(edt);
value.setTs(System.currentTimeMillis());
out.collect(value);
}
}
}
);
// TODO 11. 写出到 OLAP 数据库
SinkFunction<TrafficHomeDetailPageViewBean> jdbcSink = ClickHouseUtil.<TrafficHomeDetailPageViewBean>getJdbcSink(
"insert into dws_traffic_page_view_window values(?,?,?,?,?)"
);
reducedStream.<TrafficHomeDetailPageViewBean>addSink(jdbcSink);
env.execute();
}
}
边栏推荐
- 00后测试员摸爬滚打近一年,为是否要转行或去学软件测试的学弟们总结出了以下走心建议
- English语法_不定代词 - both / either / neither
- The highest level of wiring in the computer room, the beauty is suffocating
- Hello,World
- 【回归预测-lssvm分类】基于最小二乘支持向量机lssvm实现数据分类代码
- 开始学习C语言了
- Six-faced ant financial clothing, resisting the bombardment of the interviewer, came to interview for review
- Recommended open source tools: MegPeak, a high-performance computing tool
- 三电系统集成技术杂谈
- 打破原则引入SQL,MongoDB到底想要干啥?
猜你喜欢
The truth of the industry: I will only test those that have no future, and I panic...
如何在 TiDB Cloud 上使用 Databricks 进行数据分析 | TiDB Cloud 使用指南
容器排序案例
Huawei issues another summoning order for "Genius Boys"!He, who had given up an annual salary of 3.6 million, also made his debut
MaxWell scraped data
Web3创始人和建设者必备指南:如何构建适合的社区?
Mac 中 MySQL 的安装与卸载
元宇宙邮局AI航天主题系列数字藏品 将于7月30日10:00点上线“元邮数藏”
A new generation of open source free terminal tools, so cool
ECCV 2022 | Towards Data Efficient Transformer Object Detectors
随机推荐
Could not acquire management access for administration
智能合约安全——私有数据访问
The use of ccs software (app software that makes money reliably)
那些破釜沉舟入局Web3.0的互联网精英都怎么样了?
SLF4J的使用
JUC common thread pool source learning 02 ( ThreadPoolExecutor thread pool )
PyQt5快速开发与实战 9.1 使用PyInstaller打包项目生成exe文件
SQL 优化这么做就对了!
自动化办公|办公软件和亿图脑图MindMaster快捷键
Huawei's 7-year-experienced software testing director, gives some advice to all friends who want to change careers to learn software testing
双碳目标下:农田温室气体排放模拟
Allure进阶-动态生成报告内容
What should I do if the sql server installation fails (what should I do if the sql server cannot be installed)
Desktop Software Development Framework Awards
[Enlightenment by Opportunity-53]: "Sushu"-3- Self-cultivation and Self-cultivation
【Vue.js 3.0源码】KeepAlive 组件:如何让组件在内存中缓存和调度?
获取Google Advertising ID作为唯一识别码
Use of SLF4J
手把手教你写让人眼前一亮的软件测试简历,收不到面试邀请算我输
A new generation of open source free terminal tools, so cool