当前位置:网站首页>Flink实时仓库-DWS层(状态编程,windowall的使用,数据保存到clickhouse)模板代码
Flink实时仓库-DWS层(状态编程,windowall的使用,数据保存到clickhouse)模板代码
2022-07-30 14:29:00 【顶尖高手养成计划】
简介
键控状态的使用还有windowall的时候,结果保存到clickhouse
工具类
public class MyKafkaUtil {
static String BOOTSTRAP_SERVERS = "master:9092, node1:9092, node2:9092";
static String DEFAULT_TOPIC = "default_topic";
/**
* 根据主题还有消费者组得到消费者
*
* @param topic
* @param groupId
* @return
*/
public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic, String groupId) {
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic,
//由于默认的解码器,如果字符串为空的时候他会保存,所以自定义一个
new KafkaDeserializationSchema<String>() {
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
if (record == null || record.value() == null) {
return "";
}
return new String(record.value());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}, prop);
return consumer;
}
/**
* 根据主题得到生产者
*
* @param topic
* @return
*/
public static FlinkKafkaProducer<String> getKafkaProducer(String topic) {
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
prop.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60 * 15 * 1000 + "");
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(DEFAULT_TOPIC, new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String jsonStr, @Nullable Long timestamp) {
return new ProducerRecord<byte[], byte[]>(topic, jsonStr.getBytes());
}
}, prop,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
return producer;
}
}public class ClickHouseUtil {
// ClickHouse 驱动
public static final String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
// ClickHouse 连接 URL,gmall_rebuild是数据库create database gmall_rebuild;
public static final String CLICKHOUSE_URL = "jdbc:clickhouse://master:8123/gmall_rebuild";
public static <T> SinkFunction<T> getJdbcSink(String sql) {
return JdbcSink.<T>sink(
sql,
new JdbcStatementBuilder<T>() {
@Override
public void accept(PreparedStatement preparedStatement, T obj) throws SQLException {
Field[] declaredFields = obj.getClass().getDeclaredFields();
int skipNum = 0;
for (int i = 0; i < declaredFields.length; i++) {
Field declaredField = declaredFields[i];
//使用这个自定义注解的作用是如果有些字段是 不想保存的时候可以标记它
TransientSink transientSink = declaredField.getAnnotation(TransientSink.class);
if (transientSink != null) {
skipNum++;
continue;
}
declaredField.setAccessible(true);
try {
Object value = declaredField.get(obj);
preparedStatement.setObject(i + 1 - skipNum, value);
} catch (IllegalAccessException e) {
System.out.println("ClickHouse 数据插入 SQL 占位符传参异常 ~");
e.printStackTrace();
}
}
}
},
JdbcExecutionOptions.builder()
.withBatchIntervalMs(5000L)
.withBatchSize(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName(CLICKHOUSE_DRIVER)
.withUrl(CLICKHOUSE_URL)
.build()
);
}
}@Data
@AllArgsConstructor
public class TrafficHomeDetailPageViewBean {
// 窗口起始时间
String stt;
// 窗口结束时间
String edt;
// 首页独立访客数
Long homeUvCt;
// 商品详情页独立访客数
Long goodDetailUvCt;
// 时间戳
Long ts;
}
应用实现
public class DwsTrafficPageViewWindow {
public static void main(String[] args) throws Exception {
// TODO 1. 环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// TODO 2. 状态后端设置
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
env.setRestartStrategy(
RestartStrategies.failureRateRestart(
3, Time.days(1), Time.minutes(1)
)
);
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(
"hdfs://hadoop102:8020/ck"
);
System.setProperty("HADOOP_USER_NAME", "atguigu");
// TODO 3. 读取 Kafka dwd_traffic_page_log 数据,封装为流
String topic = "dwd_traffic_page_log";
String groupId = "dws_traffic_page_view_window";
FlinkKafkaConsumer<String> kafkaConsumer = MyKafkaUtil.getKafkaConsumer(topic, groupId);
DataStreamSource<String> source = env.addSource(kafkaConsumer);
// TODO 4. 转换数据结构 String -> JSONObject
SingleOutputStreamOperator<JSONObject> mappedStream = source.map(JSON::parseObject);
// TODO 5. 过滤 page_id 不为 home && page_id 不为 good_detail 的数据
SingleOutputStreamOperator<JSONObject> filteredStream = mappedStream.filter(
jsonObj -> {
JSONObject page = jsonObj.getJSONObject("page");
String pageId = page.getString("page_id");
return pageId.equals("home") || pageId.equals("good_detail");
});
// TODO 6. 设置水位线
SingleOutputStreamOperator<JSONObject> withWatermarkDS = filteredStream.assignTimestampsAndWatermarks(
WatermarkStrategy
.<JSONObject>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject element, long recordTimestamp) {
return element.getLong("ts");
}
}
)
);
// TODO 7. 按照 mid 分组
KeyedStream<JSONObject, String> keyedStream = withWatermarkDS.keyBy(r -> r.getJSONObject("common").getString("mid"));
// TODO 8. 鉴别独立访客,转换数据结构
SingleOutputStreamOperator<TrafficHomeDetailPageViewBean> uvStream = keyedStream.process(
new KeyedProcessFunction<String, JSONObject, TrafficHomeDetailPageViewBean>() {
private ValueState<String> homeLastVisitDt;
private ValueState<String> detailLastVisitDt;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
homeLastVisitDt = getRuntimeContext().getState(
new ValueStateDescriptor<String>("home_last_visit_dt", String.class)
);
detailLastVisitDt = getRuntimeContext().getState(
new ValueStateDescriptor<String>("detail_last_visit_dt", String.class)
);
}
@Override
public void processElement(JSONObject jsonObj, Context ctx, Collector<TrafficHomeDetailPageViewBean> out) throws Exception {
String homeLastDt = homeLastVisitDt.value();
String detailLastDt = detailLastVisitDt.value();
JSONObject page = jsonObj.getJSONObject("page");
String pageId = page.getString("page_id");
Long ts = jsonObj.getLong("ts");
String visitDt = DateFormatUtil.toDate(ts);
Long homeUvCt = 0L;
Long detailUvCt = 0L;
if (pageId.equals("home")) {
if (homeLastDt == null || !homeLastDt.equals(visitDt)) {
homeUvCt = 1L;
homeLastVisitDt.update(visitDt);
}
}
if (pageId.equals("good_detail")) {
if (detailLastDt == null || !detailLastDt.equals(visitDt)) {
detailUvCt = 1L;
detailLastVisitDt.update(visitDt);
}
}
if (homeUvCt != 0 || detailUvCt != 0) {
out.collect(new TrafficHomeDetailPageViewBean(
"",
"",
homeUvCt,
detailUvCt,
0L
));
}
}
}
);
// TODO 9. 开窗
AllWindowedStream<TrafficHomeDetailPageViewBean, TimeWindow> windowStream = uvStream.windowAll(TumblingEventTimeWindows.of(
org.apache.flink.streaming.api.windowing.time.Time.seconds(10L)));
// TODO 10. 聚合
SingleOutputStreamOperator<TrafficHomeDetailPageViewBean> reducedStream = windowStream.reduce(
new ReduceFunction<TrafficHomeDetailPageViewBean>() {
@Override
public TrafficHomeDetailPageViewBean reduce(TrafficHomeDetailPageViewBean value1, TrafficHomeDetailPageViewBean value2) throws Exception {
value1.setGoodDetailUvCt(
value1.getGoodDetailUvCt() + value2.getGoodDetailUvCt()
);
value1.setHomeUvCt(
value1.getHomeUvCt() + value2.getHomeUvCt()
);
return value1;
}
},
new AllWindowFunction<TrafficHomeDetailPageViewBean, TrafficHomeDetailPageViewBean, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<TrafficHomeDetailPageViewBean> values, Collector<TrafficHomeDetailPageViewBean> out) throws Exception {
String stt = DateFormatUtil.toYmdHms(window.getStart());
String edt = DateFormatUtil.toYmdHms(window.getEnd());
for (TrafficHomeDetailPageViewBean value : values) {
value.setStt(stt);
value.setEdt(edt);
value.setTs(System.currentTimeMillis());
out.collect(value);
}
}
}
);
// TODO 11. 写出到 OLAP 数据库
SinkFunction<TrafficHomeDetailPageViewBean> jdbcSink = ClickHouseUtil.<TrafficHomeDetailPageViewBean>getJdbcSink(
"insert into dws_traffic_page_view_window values(?,?,?,?,?)"
);
reducedStream.<TrafficHomeDetailPageViewBean>addSink(jdbcSink);
env.execute();
}
}边栏推荐
- LeetCode_数位枚举_困难_233.数字 1 的个数
- 什么是缺陷分析?一篇文章带你了解,测试工程师必备技能
- Understand the Chisel language. 29. Chisel advanced communication state machine (1) - communication state machine: take the flash as an example
- pytorch与keras的相互转换(代码以LeNet-5为例)
- 算力顶天地,存力纳乾坤:国家超级计算济南中心的一体两面
- What is defect analysis?An article takes you to understand the necessary skills of test engineers
- Ts是什么?
- 浅析显卡市场的未来走向:现在可以抄底了吗?
- Flink优化
- Use of SLF4J
猜你喜欢

学习 MySQL 需要知道的 28 个小技巧

关于容器的小案例

DDS Arbitrary Waveform Output Based on FPGA

Flask框架——Flask-Mail邮件

The evolution of content products has three axes: traffic, technology, and product form

Remember an experience of interviewing an outsourcing company, should you go?

2022年,目前大环境下还适合转行软件测试吗?

Meta首份元宇宙白皮书9大看点,瞄准80万亿美元市场

5. DOM

Application of time series database in the field of ship risk management
随机推荐
English语法_不定代词 - both / either / neither
The use of ccs software (app software that makes money reliably)
深入浅出零钱兑换问题——背包问题的套壳
A Small Case About Containers
Flask Framework - Flask-Mail Mail
Redis6.0 source code learning (5) ziplist
跳槽前,把自己弄成卷王
Understand Chisel language. 28. Chisel advanced finite state machine (2) - Mealy state machine and comparison with Moore state machine
智能合约安全——私有数据访问
MaxWell抓取数据
DocuWare 文件管理与工作流程自动化案例研究——DocuWare 工作流程功能使在家工作的员工能够保持沟通和高效工作,支持混合环境
BI-SQL丨WHILE
Smart Contract Security - Private Data Access
Huawei's 7-year-experienced software testing director, gives some advice to all friends who want to change careers to learn software testing
学习 MySQL 需要知道的 28 个小技巧
71-page comprehensive overall solution for global tourism 2021 ppt
1700. 无法吃午餐的学生数量
Web消息推送之SSE
Conversion between pytorch and keras (the code takes LeNet-5 as an example)
算力顶天地,存力纳乾坤:国家超级计算济南中心的一体两面