当前位置:网站首页>Flink实时仓库-DWS层(状态编程,windowall的使用,数据保存到clickhouse)模板代码
Flink实时仓库-DWS层(状态编程,windowall的使用,数据保存到clickhouse)模板代码
2022-07-30 14:29:00 【顶尖高手养成计划】
简介
键控状态的使用还有windowall的时候,结果保存到clickhouse
工具类
public class MyKafkaUtil {
static String BOOTSTRAP_SERVERS = "master:9092, node1:9092, node2:9092";
static String DEFAULT_TOPIC = "default_topic";
/**
* 根据主题还有消费者组得到消费者
*
* @param topic
* @param groupId
* @return
*/
public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic, String groupId) {
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic,
//由于默认的解码器,如果字符串为空的时候他会保存,所以自定义一个
new KafkaDeserializationSchema<String>() {
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
if (record == null || record.value() == null) {
return "";
}
return new String(record.value());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}, prop);
return consumer;
}
/**
* 根据主题得到生产者
*
* @param topic
* @return
*/
public static FlinkKafkaProducer<String> getKafkaProducer(String topic) {
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
prop.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60 * 15 * 1000 + "");
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(DEFAULT_TOPIC, new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String jsonStr, @Nullable Long timestamp) {
return new ProducerRecord<byte[], byte[]>(topic, jsonStr.getBytes());
}
}, prop,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
return producer;
}
}public class ClickHouseUtil {
// ClickHouse 驱动
public static final String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
// ClickHouse 连接 URL,gmall_rebuild是数据库create database gmall_rebuild;
public static final String CLICKHOUSE_URL = "jdbc:clickhouse://master:8123/gmall_rebuild";
public static <T> SinkFunction<T> getJdbcSink(String sql) {
return JdbcSink.<T>sink(
sql,
new JdbcStatementBuilder<T>() {
@Override
public void accept(PreparedStatement preparedStatement, T obj) throws SQLException {
Field[] declaredFields = obj.getClass().getDeclaredFields();
int skipNum = 0;
for (int i = 0; i < declaredFields.length; i++) {
Field declaredField = declaredFields[i];
//使用这个自定义注解的作用是如果有些字段是 不想保存的时候可以标记它
TransientSink transientSink = declaredField.getAnnotation(TransientSink.class);
if (transientSink != null) {
skipNum++;
continue;
}
declaredField.setAccessible(true);
try {
Object value = declaredField.get(obj);
preparedStatement.setObject(i + 1 - skipNum, value);
} catch (IllegalAccessException e) {
System.out.println("ClickHouse 数据插入 SQL 占位符传参异常 ~");
e.printStackTrace();
}
}
}
},
JdbcExecutionOptions.builder()
.withBatchIntervalMs(5000L)
.withBatchSize(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName(CLICKHOUSE_DRIVER)
.withUrl(CLICKHOUSE_URL)
.build()
);
}
}@Data
@AllArgsConstructor
public class TrafficHomeDetailPageViewBean {
// 窗口起始时间
String stt;
// 窗口结束时间
String edt;
// 首页独立访客数
Long homeUvCt;
// 商品详情页独立访客数
Long goodDetailUvCt;
// 时间戳
Long ts;
}
应用实现
public class DwsTrafficPageViewWindow {
public static void main(String[] args) throws Exception {
// TODO 1. 环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// TODO 2. 状态后端设置
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
env.setRestartStrategy(
RestartStrategies.failureRateRestart(
3, Time.days(1), Time.minutes(1)
)
);
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(
"hdfs://hadoop102:8020/ck"
);
System.setProperty("HADOOP_USER_NAME", "atguigu");
// TODO 3. 读取 Kafka dwd_traffic_page_log 数据,封装为流
String topic = "dwd_traffic_page_log";
String groupId = "dws_traffic_page_view_window";
FlinkKafkaConsumer<String> kafkaConsumer = MyKafkaUtil.getKafkaConsumer(topic, groupId);
DataStreamSource<String> source = env.addSource(kafkaConsumer);
// TODO 4. 转换数据结构 String -> JSONObject
SingleOutputStreamOperator<JSONObject> mappedStream = source.map(JSON::parseObject);
// TODO 5. 过滤 page_id 不为 home && page_id 不为 good_detail 的数据
SingleOutputStreamOperator<JSONObject> filteredStream = mappedStream.filter(
jsonObj -> {
JSONObject page = jsonObj.getJSONObject("page");
String pageId = page.getString("page_id");
return pageId.equals("home") || pageId.equals("good_detail");
});
// TODO 6. 设置水位线
SingleOutputStreamOperator<JSONObject> withWatermarkDS = filteredStream.assignTimestampsAndWatermarks(
WatermarkStrategy
.<JSONObject>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject element, long recordTimestamp) {
return element.getLong("ts");
}
}
)
);
// TODO 7. 按照 mid 分组
KeyedStream<JSONObject, String> keyedStream = withWatermarkDS.keyBy(r -> r.getJSONObject("common").getString("mid"));
// TODO 8. 鉴别独立访客,转换数据结构
SingleOutputStreamOperator<TrafficHomeDetailPageViewBean> uvStream = keyedStream.process(
new KeyedProcessFunction<String, JSONObject, TrafficHomeDetailPageViewBean>() {
private ValueState<String> homeLastVisitDt;
private ValueState<String> detailLastVisitDt;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
homeLastVisitDt = getRuntimeContext().getState(
new ValueStateDescriptor<String>("home_last_visit_dt", String.class)
);
detailLastVisitDt = getRuntimeContext().getState(
new ValueStateDescriptor<String>("detail_last_visit_dt", String.class)
);
}
@Override
public void processElement(JSONObject jsonObj, Context ctx, Collector<TrafficHomeDetailPageViewBean> out) throws Exception {
String homeLastDt = homeLastVisitDt.value();
String detailLastDt = detailLastVisitDt.value();
JSONObject page = jsonObj.getJSONObject("page");
String pageId = page.getString("page_id");
Long ts = jsonObj.getLong("ts");
String visitDt = DateFormatUtil.toDate(ts);
Long homeUvCt = 0L;
Long detailUvCt = 0L;
if (pageId.equals("home")) {
if (homeLastDt == null || !homeLastDt.equals(visitDt)) {
homeUvCt = 1L;
homeLastVisitDt.update(visitDt);
}
}
if (pageId.equals("good_detail")) {
if (detailLastDt == null || !detailLastDt.equals(visitDt)) {
detailUvCt = 1L;
detailLastVisitDt.update(visitDt);
}
}
if (homeUvCt != 0 || detailUvCt != 0) {
out.collect(new TrafficHomeDetailPageViewBean(
"",
"",
homeUvCt,
detailUvCt,
0L
));
}
}
}
);
// TODO 9. 开窗
AllWindowedStream<TrafficHomeDetailPageViewBean, TimeWindow> windowStream = uvStream.windowAll(TumblingEventTimeWindows.of(
org.apache.flink.streaming.api.windowing.time.Time.seconds(10L)));
// TODO 10. 聚合
SingleOutputStreamOperator<TrafficHomeDetailPageViewBean> reducedStream = windowStream.reduce(
new ReduceFunction<TrafficHomeDetailPageViewBean>() {
@Override
public TrafficHomeDetailPageViewBean reduce(TrafficHomeDetailPageViewBean value1, TrafficHomeDetailPageViewBean value2) throws Exception {
value1.setGoodDetailUvCt(
value1.getGoodDetailUvCt() + value2.getGoodDetailUvCt()
);
value1.setHomeUvCt(
value1.getHomeUvCt() + value2.getHomeUvCt()
);
return value1;
}
},
new AllWindowFunction<TrafficHomeDetailPageViewBean, TrafficHomeDetailPageViewBean, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<TrafficHomeDetailPageViewBean> values, Collector<TrafficHomeDetailPageViewBean> out) throws Exception {
String stt = DateFormatUtil.toYmdHms(window.getStart());
String edt = DateFormatUtil.toYmdHms(window.getEnd());
for (TrafficHomeDetailPageViewBean value : values) {
value.setStt(stt);
value.setEdt(edt);
value.setTs(System.currentTimeMillis());
out.collect(value);
}
}
}
);
// TODO 11. 写出到 OLAP 数据库
SinkFunction<TrafficHomeDetailPageViewBean> jdbcSink = ClickHouseUtil.<TrafficHomeDetailPageViewBean>getJdbcSink(
"insert into dws_traffic_page_view_window values(?,?,?,?,?)"
);
reducedStream.<TrafficHomeDetailPageViewBean>addSink(jdbcSink);
env.execute();
}
}边栏推荐
- The use of ccs software (app software that makes money reliably)
- How to use Databricks for data analysis on TiDB Cloud | TiDB Cloud User Guide
- 算力顶天地,存力纳乾坤:国家超级计算济南中心的一体两面
- MongoDB启动报错 Process: 29784 ExecStart=/usr/bin/mongod $OPTIONS (code=exited, status=14)
- Ts是什么?
- 学习 MySQL 需要知道的 28 个小技巧
- JUC常见的线程池源码学习 02 ( ThreadPoolExecutor 线程池 )
- Flask框架——Flask-Mail邮件
- Flink实时数仓完结
- MongoDB starts an error Process: 29784 ExecStart=/usr/bin/mongod $OPTIONS (code=exited, status=14)
猜你喜欢

ToDesk版本更新,引入RTC传输技术,是否早以替代向日葵远程控制?

泡沫褪去,DeFi还剩下什么

Flink实时数仓完结

Baijiahao cancels the function of posting documents on the interface: the weight of the plug-in chain is blocked

深入浅出零钱兑换问题——背包问题的套壳

元宇宙邮局AI航天主题系列数字藏品 将于7月30日10:00点上线“元邮数藏”

The highest level of wiring in the computer room, the beauty is suffocating

pytorch与keras的相互转换(代码以LeNet-5为例)

【元胞自动机】基于元胞自动机模拟生命演化、病毒感染等实例附matlab代码

Smart Contract Security - Private Data Access
随机推荐
MPSK抗噪声性能对比(即MPSK标准误码率曲线)
Application of time series database in the field of ship risk management
JSON常用注解
获取Google Advertising ID作为唯一识别码
71页全域旅游综合整体解决方案2021 ppt
机房布线的至高境界,美到窒息
Cookie simulation login "recommended collection"
BI-SQL丨WHILE
Digital signal processing course lab report (what foundation is needed for digital signal processing)
【回归预测-CNN预测】基于卷积神经网络CNN实现数据回归预测附matlab代码
手把手教你写让人眼前一亮的软件测试简历,收不到面试邀请算我输
一文读懂网络效应对Web3的重要意义
SSE for Web Message Push
Chapter6 : Has Artificial Intelligence Impacted Drug Discovery?
异常情况处置方案
Six-faced ant financial clothing, resisting the bombardment of the interviewer, came to interview for review
查阅所连接过的WiFi所有信息(含密码)(访问历史所有WiFi连接)
元宇宙邮局AI航天主题系列数字藏品 将于7月30日10:00点上线“元邮数藏”
Machine learning difference in the competition and industry application
剑指 Offer II 037. 小行星碰撞