当前位置:网站首页>Flink实时仓库-DWS层(状态编程,windowall的使用,数据保存到clickhouse)模板代码
Flink实时仓库-DWS层(状态编程,windowall的使用,数据保存到clickhouse)模板代码
2022-07-30 14:29:00 【顶尖高手养成计划】
简介
键控状态的使用还有windowall的时候,结果保存到clickhouse
工具类
public class MyKafkaUtil {
static String BOOTSTRAP_SERVERS = "master:9092, node1:9092, node2:9092";
static String DEFAULT_TOPIC = "default_topic";
/**
* 根据主题还有消费者组得到消费者
*
* @param topic
* @param groupId
* @return
*/
public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic, String groupId) {
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic,
//由于默认的解码器,如果字符串为空的时候他会保存,所以自定义一个
new KafkaDeserializationSchema<String>() {
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
if (record == null || record.value() == null) {
return "";
}
return new String(record.value());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}, prop);
return consumer;
}
/**
* 根据主题得到生产者
*
* @param topic
* @return
*/
public static FlinkKafkaProducer<String> getKafkaProducer(String topic) {
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
prop.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60 * 15 * 1000 + "");
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(DEFAULT_TOPIC, new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String jsonStr, @Nullable Long timestamp) {
return new ProducerRecord<byte[], byte[]>(topic, jsonStr.getBytes());
}
}, prop,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
return producer;
}
}
public class ClickHouseUtil {
// ClickHouse 驱动
public static final String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
// ClickHouse 连接 URL,gmall_rebuild是数据库create database gmall_rebuild;
public static final String CLICKHOUSE_URL = "jdbc:clickhouse://master:8123/gmall_rebuild";
public static <T> SinkFunction<T> getJdbcSink(String sql) {
return JdbcSink.<T>sink(
sql,
new JdbcStatementBuilder<T>() {
@Override
public void accept(PreparedStatement preparedStatement, T obj) throws SQLException {
Field[] declaredFields = obj.getClass().getDeclaredFields();
int skipNum = 0;
for (int i = 0; i < declaredFields.length; i++) {
Field declaredField = declaredFields[i];
//使用这个自定义注解的作用是如果有些字段是 不想保存的时候可以标记它
TransientSink transientSink = declaredField.getAnnotation(TransientSink.class);
if (transientSink != null) {
skipNum++;
continue;
}
declaredField.setAccessible(true);
try {
Object value = declaredField.get(obj);
preparedStatement.setObject(i + 1 - skipNum, value);
} catch (IllegalAccessException e) {
System.out.println("ClickHouse 数据插入 SQL 占位符传参异常 ~");
e.printStackTrace();
}
}
}
},
JdbcExecutionOptions.builder()
.withBatchIntervalMs(5000L)
.withBatchSize(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName(CLICKHOUSE_DRIVER)
.withUrl(CLICKHOUSE_URL)
.build()
);
}
}
@Data
@AllArgsConstructor
public class TrafficHomeDetailPageViewBean {
// 窗口起始时间
String stt;
// 窗口结束时间
String edt;
// 首页独立访客数
Long homeUvCt;
// 商品详情页独立访客数
Long goodDetailUvCt;
// 时间戳
Long ts;
}
应用实现
public class DwsTrafficPageViewWindow {
public static void main(String[] args) throws Exception {
// TODO 1. 环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// TODO 2. 状态后端设置
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
env.setRestartStrategy(
RestartStrategies.failureRateRestart(
3, Time.days(1), Time.minutes(1)
)
);
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(
"hdfs://hadoop102:8020/ck"
);
System.setProperty("HADOOP_USER_NAME", "atguigu");
// TODO 3. 读取 Kafka dwd_traffic_page_log 数据,封装为流
String topic = "dwd_traffic_page_log";
String groupId = "dws_traffic_page_view_window";
FlinkKafkaConsumer<String> kafkaConsumer = MyKafkaUtil.getKafkaConsumer(topic, groupId);
DataStreamSource<String> source = env.addSource(kafkaConsumer);
// TODO 4. 转换数据结构 String -> JSONObject
SingleOutputStreamOperator<JSONObject> mappedStream = source.map(JSON::parseObject);
// TODO 5. 过滤 page_id 不为 home && page_id 不为 good_detail 的数据
SingleOutputStreamOperator<JSONObject> filteredStream = mappedStream.filter(
jsonObj -> {
JSONObject page = jsonObj.getJSONObject("page");
String pageId = page.getString("page_id");
return pageId.equals("home") || pageId.equals("good_detail");
});
// TODO 6. 设置水位线
SingleOutputStreamOperator<JSONObject> withWatermarkDS = filteredStream.assignTimestampsAndWatermarks(
WatermarkStrategy
.<JSONObject>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject element, long recordTimestamp) {
return element.getLong("ts");
}
}
)
);
// TODO 7. 按照 mid 分组
KeyedStream<JSONObject, String> keyedStream = withWatermarkDS.keyBy(r -> r.getJSONObject("common").getString("mid"));
// TODO 8. 鉴别独立访客,转换数据结构
SingleOutputStreamOperator<TrafficHomeDetailPageViewBean> uvStream = keyedStream.process(
new KeyedProcessFunction<String, JSONObject, TrafficHomeDetailPageViewBean>() {
private ValueState<String> homeLastVisitDt;
private ValueState<String> detailLastVisitDt;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
homeLastVisitDt = getRuntimeContext().getState(
new ValueStateDescriptor<String>("home_last_visit_dt", String.class)
);
detailLastVisitDt = getRuntimeContext().getState(
new ValueStateDescriptor<String>("detail_last_visit_dt", String.class)
);
}
@Override
public void processElement(JSONObject jsonObj, Context ctx, Collector<TrafficHomeDetailPageViewBean> out) throws Exception {
String homeLastDt = homeLastVisitDt.value();
String detailLastDt = detailLastVisitDt.value();
JSONObject page = jsonObj.getJSONObject("page");
String pageId = page.getString("page_id");
Long ts = jsonObj.getLong("ts");
String visitDt = DateFormatUtil.toDate(ts);
Long homeUvCt = 0L;
Long detailUvCt = 0L;
if (pageId.equals("home")) {
if (homeLastDt == null || !homeLastDt.equals(visitDt)) {
homeUvCt = 1L;
homeLastVisitDt.update(visitDt);
}
}
if (pageId.equals("good_detail")) {
if (detailLastDt == null || !detailLastDt.equals(visitDt)) {
detailUvCt = 1L;
detailLastVisitDt.update(visitDt);
}
}
if (homeUvCt != 0 || detailUvCt != 0) {
out.collect(new TrafficHomeDetailPageViewBean(
"",
"",
homeUvCt,
detailUvCt,
0L
));
}
}
}
);
// TODO 9. 开窗
AllWindowedStream<TrafficHomeDetailPageViewBean, TimeWindow> windowStream = uvStream.windowAll(TumblingEventTimeWindows.of(
org.apache.flink.streaming.api.windowing.time.Time.seconds(10L)));
// TODO 10. 聚合
SingleOutputStreamOperator<TrafficHomeDetailPageViewBean> reducedStream = windowStream.reduce(
new ReduceFunction<TrafficHomeDetailPageViewBean>() {
@Override
public TrafficHomeDetailPageViewBean reduce(TrafficHomeDetailPageViewBean value1, TrafficHomeDetailPageViewBean value2) throws Exception {
value1.setGoodDetailUvCt(
value1.getGoodDetailUvCt() + value2.getGoodDetailUvCt()
);
value1.setHomeUvCt(
value1.getHomeUvCt() + value2.getHomeUvCt()
);
return value1;
}
},
new AllWindowFunction<TrafficHomeDetailPageViewBean, TrafficHomeDetailPageViewBean, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<TrafficHomeDetailPageViewBean> values, Collector<TrafficHomeDetailPageViewBean> out) throws Exception {
String stt = DateFormatUtil.toYmdHms(window.getStart());
String edt = DateFormatUtil.toYmdHms(window.getEnd());
for (TrafficHomeDetailPageViewBean value : values) {
value.setStt(stt);
value.setEdt(edt);
value.setTs(System.currentTimeMillis());
out.collect(value);
}
}
}
);
// TODO 11. 写出到 OLAP 数据库
SinkFunction<TrafficHomeDetailPageViewBean> jdbcSink = ClickHouseUtil.<TrafficHomeDetailPageViewBean>getJdbcSink(
"insert into dws_traffic_page_view_window values(?,?,?,?,?)"
);
reducedStream.<TrafficHomeDetailPageViewBean>addSink(jdbcSink);
env.execute();
}
}
边栏推荐
- localhost与127.0.0.1
- 分布式前修课:MySQL实现分布式锁
- 4位资深专家多年大厂经验分享出Flink技术内幕架构设计与实现原理
- 第十一章 api mgmnt API 参考
- 桌面软件开发框架大赏
- 网站添加能换装可互动的live 2d看板娘
- ESP32 Repeated Reboot Issue Arduino Shield Power Outage Detector
- canal抓取数据
- What is defect analysis?An article takes you to understand the necessary skills of test engineers
- Why do software testing have to learn automation?Talk about the value of automated testing in my eyes
猜你喜欢
自动化办公|办公软件和亿图脑图MindMaster快捷键
Lock wait timeout exceeded解决方案
The highest level of wiring in the computer room, the beauty is suffocating
Huawei issues another summoning order for "Genius Boys"!He, who had given up an annual salary of 3.6 million, also made his debut
开始学习C语言了
Understand the Chisel language. 29. Chisel advanced communication state machine (1) - communication state machine: take the flash as an example
Mac 中 MySQL 的安装与卸载
Flask框架——Flask-SQLite数据库
国内数字藏品的乱象与未来
3年软件测试经验面试要求月薪22K,明显感觉他背了很多面试题...
随机推荐
Use of SLF4J
JVM性能调优
canal scrape data
MySql报错:SqlError(Unable to execute query“, “Can‘t create/write to file OS errno 2 - No such file...
Metaverse Post Office AI space-themed series of digital collections will be launched at 10:00 on July 30th "Yuanyou Digital Collection"
ECCV 2022 | Towards Data Efficient Transformer Object Detectors
ECCV 2022 | 通往数据高效的Transformer目标检测器
这个编辑器居然号称快如闪电!
去腾讯面试,直接让人出门左拐 :幂等性都不知道!
MongoDB启动报错 Process: 29784 ExecStart=/usr/bin/mongod $OPTIONS (code=exited, status=14)
元宇宙邮局AI航天主题系列数字藏品 将于7月30日10:00点上线“元邮数藏”
What is the relationship between the construction of smart cities and 5G technology in the new era
Why do software testing have to learn automation?Talk about the value of automated testing in my eyes
超T动力 盈运天下——中国重汽黄河/豪沃WP14T产品首发荣耀上市!
Container sorting case
基于FPGA的DDS任意波形输出
那些破釜沉舟入局Web3.0的互联网精英都怎么样了?
Teach you how to write an eye-catching software testing resume, if you don't receive an interview invitation, I will lose
算力顶天地,存力纳乾坤:国家超级计算济南中心的一体两面
Eight years of testing experience, why was the leader criticized: the test documents you wrote are not as good as those of fresh graduates