当前位置:网站首页>Flink实时仓库-DWS层(状态编程,windowall的使用,数据保存到clickhouse)模板代码
Flink实时仓库-DWS层(状态编程,windowall的使用,数据保存到clickhouse)模板代码
2022-07-30 14:29:00 【顶尖高手养成计划】
简介
键控状态的使用还有windowall的时候,结果保存到clickhouse
工具类
public class MyKafkaUtil {
static String BOOTSTRAP_SERVERS = "master:9092, node1:9092, node2:9092";
static String DEFAULT_TOPIC = "default_topic";
/**
* 根据主题还有消费者组得到消费者
*
* @param topic
* @param groupId
* @return
*/
public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic, String groupId) {
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic,
//由于默认的解码器,如果字符串为空的时候他会保存,所以自定义一个
new KafkaDeserializationSchema<String>() {
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
if (record == null || record.value() == null) {
return "";
}
return new String(record.value());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}, prop);
return consumer;
}
/**
* 根据主题得到生产者
*
* @param topic
* @return
*/
public static FlinkKafkaProducer<String> getKafkaProducer(String topic) {
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
prop.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60 * 15 * 1000 + "");
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(DEFAULT_TOPIC, new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String jsonStr, @Nullable Long timestamp) {
return new ProducerRecord<byte[], byte[]>(topic, jsonStr.getBytes());
}
}, prop,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
return producer;
}
}public class ClickHouseUtil {
// ClickHouse 驱动
public static final String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
// ClickHouse 连接 URL,gmall_rebuild是数据库create database gmall_rebuild;
public static final String CLICKHOUSE_URL = "jdbc:clickhouse://master:8123/gmall_rebuild";
public static <T> SinkFunction<T> getJdbcSink(String sql) {
return JdbcSink.<T>sink(
sql,
new JdbcStatementBuilder<T>() {
@Override
public void accept(PreparedStatement preparedStatement, T obj) throws SQLException {
Field[] declaredFields = obj.getClass().getDeclaredFields();
int skipNum = 0;
for (int i = 0; i < declaredFields.length; i++) {
Field declaredField = declaredFields[i];
//使用这个自定义注解的作用是如果有些字段是 不想保存的时候可以标记它
TransientSink transientSink = declaredField.getAnnotation(TransientSink.class);
if (transientSink != null) {
skipNum++;
continue;
}
declaredField.setAccessible(true);
try {
Object value = declaredField.get(obj);
preparedStatement.setObject(i + 1 - skipNum, value);
} catch (IllegalAccessException e) {
System.out.println("ClickHouse 数据插入 SQL 占位符传参异常 ~");
e.printStackTrace();
}
}
}
},
JdbcExecutionOptions.builder()
.withBatchIntervalMs(5000L)
.withBatchSize(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName(CLICKHOUSE_DRIVER)
.withUrl(CLICKHOUSE_URL)
.build()
);
}
}@Data
@AllArgsConstructor
public class TrafficHomeDetailPageViewBean {
// 窗口起始时间
String stt;
// 窗口结束时间
String edt;
// 首页独立访客数
Long homeUvCt;
// 商品详情页独立访客数
Long goodDetailUvCt;
// 时间戳
Long ts;
}
应用实现
public class DwsTrafficPageViewWindow {
public static void main(String[] args) throws Exception {
// TODO 1. 环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// TODO 2. 状态后端设置
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
env.setRestartStrategy(
RestartStrategies.failureRateRestart(
3, Time.days(1), Time.minutes(1)
)
);
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(
"hdfs://hadoop102:8020/ck"
);
System.setProperty("HADOOP_USER_NAME", "atguigu");
// TODO 3. 读取 Kafka dwd_traffic_page_log 数据,封装为流
String topic = "dwd_traffic_page_log";
String groupId = "dws_traffic_page_view_window";
FlinkKafkaConsumer<String> kafkaConsumer = MyKafkaUtil.getKafkaConsumer(topic, groupId);
DataStreamSource<String> source = env.addSource(kafkaConsumer);
// TODO 4. 转换数据结构 String -> JSONObject
SingleOutputStreamOperator<JSONObject> mappedStream = source.map(JSON::parseObject);
// TODO 5. 过滤 page_id 不为 home && page_id 不为 good_detail 的数据
SingleOutputStreamOperator<JSONObject> filteredStream = mappedStream.filter(
jsonObj -> {
JSONObject page = jsonObj.getJSONObject("page");
String pageId = page.getString("page_id");
return pageId.equals("home") || pageId.equals("good_detail");
});
// TODO 6. 设置水位线
SingleOutputStreamOperator<JSONObject> withWatermarkDS = filteredStream.assignTimestampsAndWatermarks(
WatermarkStrategy
.<JSONObject>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject element, long recordTimestamp) {
return element.getLong("ts");
}
}
)
);
// TODO 7. 按照 mid 分组
KeyedStream<JSONObject, String> keyedStream = withWatermarkDS.keyBy(r -> r.getJSONObject("common").getString("mid"));
// TODO 8. 鉴别独立访客,转换数据结构
SingleOutputStreamOperator<TrafficHomeDetailPageViewBean> uvStream = keyedStream.process(
new KeyedProcessFunction<String, JSONObject, TrafficHomeDetailPageViewBean>() {
private ValueState<String> homeLastVisitDt;
private ValueState<String> detailLastVisitDt;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
homeLastVisitDt = getRuntimeContext().getState(
new ValueStateDescriptor<String>("home_last_visit_dt", String.class)
);
detailLastVisitDt = getRuntimeContext().getState(
new ValueStateDescriptor<String>("detail_last_visit_dt", String.class)
);
}
@Override
public void processElement(JSONObject jsonObj, Context ctx, Collector<TrafficHomeDetailPageViewBean> out) throws Exception {
String homeLastDt = homeLastVisitDt.value();
String detailLastDt = detailLastVisitDt.value();
JSONObject page = jsonObj.getJSONObject("page");
String pageId = page.getString("page_id");
Long ts = jsonObj.getLong("ts");
String visitDt = DateFormatUtil.toDate(ts);
Long homeUvCt = 0L;
Long detailUvCt = 0L;
if (pageId.equals("home")) {
if (homeLastDt == null || !homeLastDt.equals(visitDt)) {
homeUvCt = 1L;
homeLastVisitDt.update(visitDt);
}
}
if (pageId.equals("good_detail")) {
if (detailLastDt == null || !detailLastDt.equals(visitDt)) {
detailUvCt = 1L;
detailLastVisitDt.update(visitDt);
}
}
if (homeUvCt != 0 || detailUvCt != 0) {
out.collect(new TrafficHomeDetailPageViewBean(
"",
"",
homeUvCt,
detailUvCt,
0L
));
}
}
}
);
// TODO 9. 开窗
AllWindowedStream<TrafficHomeDetailPageViewBean, TimeWindow> windowStream = uvStream.windowAll(TumblingEventTimeWindows.of(
org.apache.flink.streaming.api.windowing.time.Time.seconds(10L)));
// TODO 10. 聚合
SingleOutputStreamOperator<TrafficHomeDetailPageViewBean> reducedStream = windowStream.reduce(
new ReduceFunction<TrafficHomeDetailPageViewBean>() {
@Override
public TrafficHomeDetailPageViewBean reduce(TrafficHomeDetailPageViewBean value1, TrafficHomeDetailPageViewBean value2) throws Exception {
value1.setGoodDetailUvCt(
value1.getGoodDetailUvCt() + value2.getGoodDetailUvCt()
);
value1.setHomeUvCt(
value1.getHomeUvCt() + value2.getHomeUvCt()
);
return value1;
}
},
new AllWindowFunction<TrafficHomeDetailPageViewBean, TrafficHomeDetailPageViewBean, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<TrafficHomeDetailPageViewBean> values, Collector<TrafficHomeDetailPageViewBean> out) throws Exception {
String stt = DateFormatUtil.toYmdHms(window.getStart());
String edt = DateFormatUtil.toYmdHms(window.getEnd());
for (TrafficHomeDetailPageViewBean value : values) {
value.setStt(stt);
value.setEdt(edt);
value.setTs(System.currentTimeMillis());
out.collect(value);
}
}
}
);
// TODO 11. 写出到 OLAP 数据库
SinkFunction<TrafficHomeDetailPageViewBean> jdbcSink = ClickHouseUtil.<TrafficHomeDetailPageViewBean>getJdbcSink(
"insert into dws_traffic_page_view_window values(?,?,?,?,?)"
);
reducedStream.<TrafficHomeDetailPageViewBean>addSink(jdbcSink);
env.execute();
}
}边栏推荐
- 元宇宙邮局AI航天主题系列数字藏品 将于7月30日10:00点上线“元邮数藏”
- Desktop Software Development Framework Awards
- SQL 优化这么做就对了!
- MongoDB启动报错 Process: 29784 ExecStart=/usr/bin/mongod $OPTIONS (code=exited, status=14)
- JSON common annotations
- Container sorting case
- The evolution of content products has three axes: traffic, technology, and product form
- EasyV数字孪生流域|宁波智慧水利整体智治综合应用
- 浅析显卡市场的未来走向:现在可以抄底了吗?
- 1700. 无法吃午餐的学生数量
猜你喜欢

71-page comprehensive overall solution for global tourism 2021 ppt

还在说软件测试没有中年危机?9年测试工程师惨遭淘汰

Conversion between pytorch and keras (the code takes LeNet-5 as an example)

71页全域旅游综合整体解决方案2021 ppt

吃透Chisel语言.28.Chisel进阶之有限状态机(二)——Mealy状态机及与Moore状态机的对比

算力顶天地,存力纳乾坤:国家超级计算济南中心的一体两面

惊艳!京东T8纯手码的Redis核心原理手册,基础与源码齐下

新时代背景下智慧城市的建设与5G技术有何关联

Still saying software testing doesn't have a midlife crisis?9 years of test engineers were eliminated

Flink优化
随机推荐
Web消息推送之SSE
吃透Chisel语言.28.Chisel进阶之有限状态机(二)——Mealy状态机及与Moore状态机的对比
网站添加能换装可互动的live 2d看板娘
惊艳!京东T8纯手码的Redis核心原理手册,基础与源码齐下
Conversion between pytorch and keras (the code takes LeNet-5 as an example)
A simple change for problem, knapsack problem sets of shell
71页全域旅游综合整体解决方案2021 ppt
Flask Framework - Sijax
3年软件测试经验面试要求月薪22K,明显感觉他背了很多面试题...
容器排序案例
Flask框架——Flask-SQLite数据库
华为再发「天才少年」召集令!曾放弃360万年薪的他也来首秀
三电系统集成技术杂谈
Get the Google Advertising ID as a unique identifier
Flink real-time data warehouse completed
吃透Chisel语言.29.Chisel进阶之通信状态机(一)——通信状态机:以闪光灯为例
MongoDB启动报错 Process: 29784 ExecStart=/usr/bin/mongod $OPTIONS (code=exited, status=14)
What should I do if the sql server installation fails (what should I do if the sql server cannot be installed)
桌面软件开发框架大赏
我们公司用了 6 年的网关服务,动态路由、鉴权、限流等都有,稳的一批!