当前位置:网站首页>flink去重(二)解决flink、flink-sql去重过程中的热点问题
flink去重(二)解决flink、flink-sql去重过程中的热点问题
2022-07-27 06:42:00 【undo_try】
解决flink、flink-sql去重过程中的热点问题
1、flink-sql解决热点问题
使用Sql去实现一个去重功能,通常会这样实现SELECT day, COUNT(DISTINCT user_id) FROM T GROUP BY day --sql1
或者select day,count(*) from( select distinct user_id,day from T ) a group by day --sql2
但是这两种方式都未解决计算热点问题,例如当某一个day 对应的devId 特别大的情况下,那么计算压力都会到该day所在的task,使这个task成为任务的性能瓶颈。
package com.yyds.flink_distinct;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/** * 去重过程中的热点问题(使用flink sql进行解决) * */
public class _06_DistinctHotpotFlinkSql {
public static void main(String[] args) {
// 创建表的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 解决热点问题的配置
tenv.getConfig().getConfiguration().setString("table.optimizer.distinct-agg.split.enabled", "true");
SingleOutputStreamOperator<_06_User> ss1 = env.socketTextStream("hadoop01", 9999)
.map(new MapFunction<String, _06_User>() {
@Override
public _06_User map(String line) throws Exception {
String[] arr = line.split(",");
return new _06_User(arr[0], arr[1]);
}
});
tenv.createTemporaryView("T",ss1);
String executeSql = "SELECT `day`, COUNT(DISTINCT user_id) as cnt FROM T GROUP BY `day`";
/** * -- 会转换为这个sql SELECT day, SUM(cnt) FROM ( SELECT day, COUNT(DISTINCT user_id) as cnt FROM T GROUP BY day, MOD(HASH_CODE(user_id), 1024) ) GROUP BY day MOD(HASH_CODE(user_id), 1024) 表示对取user_id的hashCode然后对1024取余,也就是将user_id划分到1024个桶里面去, 那么里层通过对day与桶编号进行去重(cnt)外层只需要对cnt执行sum操作即可,因为分桶操作限制了相同的user_id 一定会在相同的桶里面 */
String explainSql = tenv.explainSql(executeSql, ExplainDetail.CHANGELOG_MODE);
System.out.println(explainSql);
tenv.executeSql(executeSql).print();
}
}
2、flink解决热点问题
去重过程中的热点问题(编码实现)
实时计算广告位访客数,流量数据id(广告位ID)、devId(访问ID)、time(访问时间)
实现思路:
• 首先通过对id、设备id分桶编号、小时级别时间分组,使用一个ProcessFunction计算分桶后的去重数(与MapState方式相同)
• 然后通过对id、小时级别时间分组,使用另一个ProcessFunction做sum操作,
但是这里面需要注意的一个问题是对于相同id与时间其数据可能会来源于上游不同的task,
而上游的每个task的数据都会以全量一直往下发送,如果直接做累加操作会导致重复计算,因此得实现一个类似于sql中retract撤回机制,
也就是上一个ProcessFunction每发送一条数据都需要先将之前的数据发送一份表示其为撤回。
主程序:
package com.yyds.flink_distinct;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
/** * 去重过程中的热点问题(编码实现) * * * 实时计算广告位访客数,流量数据id(广告位ID)、devId(访问ID)、time(访问时间) * * 实现思路: * • 首先通过对id、设备id分桶编号、小时级别时间分组,使用一个ProcessFunction计算分桶后的去重数(与MapState方式相同) * • 然后通过对id、小时级别时间分组,使用另一个ProcessFunction做sum操作, * * * 但是这里面需要注意的一个问题是对于相同id与时间其数据可能会来源于上游不同的task, * 而上游的每个task的数据都会以全量一直往下发送,如果直接做累加操作会导致重复计算,因此得实现一个类似于sql中retract撤回机制, * 也就是上一个ProcessFunction每发送一条数据都需要先将之前的数据发送一份表示其为撤回。 * */
public class _07_DistinctHotpot {
public static void main(String[] args) throws Exception {
// 创建表的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
/** * 模拟数据: * 1,001,1000 * 1,002,1000 * 1,003,1000 * 1,004,1000 */
// 读取原始数据,转换为javaBean
SingleOutputStreamOperator<_07_AdData> ss1 = env.socketTextStream("hadoop01", 9999)
.map(new MapFunction<String, _07_AdData>() {
@Override
public _07_AdData map(String line) throws Exception {
String[] arr = line.split(",");
return new _07_AdData(Integer.parseInt(arr[0]), arr[1],Long.parseLong( arr[2]));
}
});
// 首先通过对id、设备id分桶编号、小时级别时间分组,使用一个ProcessFunction计算分桶后的去重数(与MapState方式相同)
KeyedStream<_07_AdData, _07_AdKey1> keyedStream1 = ss1.keyBy(new KeySelector<_07_AdData, _07_AdKey1>() {
@Override
public _07_AdKey1 getKey(_07_AdData data) throws Exception {
long endTime = TimeWindow.getWindowStartWithOffset(data.getTime(), 0, Time.hours(1).toMilliseconds()) + Time.hours(1).toMilliseconds();
return new _07_AdKey1(data.getId(), endTime, data.getDevId().hashCode() % 3);
}
});
SingleOutputStreamOperator<Tuple2<Boolean, Tuple3<Integer, Long, Long>>> processStream1 = keyedStream1.process(new _07_DistinctProcessFunction01());
KeyedStream<Tuple2<Boolean, Tuple3<Integer, Long, Long>>, _07_AdKey2> keyedStream2 = processStream1.keyBy(new KeySelector<Tuple2<Boolean, Tuple3<Integer, Long, Long>>, _07_AdKey2>() {
@Override
public _07_AdKey2 getKey(Tuple2<Boolean, Tuple3<Integer, Long, Long>> tp2) throws Exception {
return new _07_AdKey2(tp2.f1.f0, tp2.f1.f1);
}
});
keyedStream2.process(new _07_DistinctProcessFunction02());
env.execute("_07_DistinctHotpot");
}
}
自定义函数
package com.yyds.flink_distinct;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class _07_DistinctProcessFunction01 extends KeyedProcessFunction<_07_AdKey1,_07_AdData, Tuple2<Boolean, Tuple3<Integer,Long,Long>>> {
// 定义第一个状态MapState
MapState<String,Integer> deviceIdState ;
// 定义第二个状态ValueState
ValueState<Long> countState ;
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<String, Integer> deviceIdStateDescriptor = new MapStateDescriptor<>("deviceIdState", String.class, Integer.class);
deviceIdState = getRuntimeContext().getMapState(deviceIdStateDescriptor);
ValueStateDescriptor<Long> countStateDescriptor = new ValueStateDescriptor<>("countState", Long.class);
countState = getRuntimeContext().getState(countStateDescriptor);
}
@Override
public void processElement(_07_AdData adData, Context context, Collector<Tuple2<Boolean, Tuple3<Integer, Long, Long>>> collector) throws Exception {
String devId = adData.getDevId();
Integer i = deviceIdState.get(devId);
if(i == null){
i = 0;
}
int id = context.getCurrentKey().getId();
long time = context.getCurrentKey().getTime();
long code = context.getCurrentKey().getBucketCode();
Long c = countState.value();
if(c == null){
c = 0L;
}
// System.out.println("id = " + id + ",time = " + time + ",c = " + c + ",code = " + code);
if( i == 1 ){
// 表示已经存在
}else {
// 表示不存在,放入到状态中
deviceIdState.put(devId,1);
// 将统计的数据 + 1
Long count = c + 1;
countState.update(count);
System.out.println("id = " + id + ",time = " + time + ",count = " + count + ",code = " + code);
if(count > 1){
// 认为大于1的需要进行撤回
System.out.println("========撤回======");
collector.collect(Tuple2.of(false,Tuple3.of(id,time,c)));
collector.collect(Tuple2.of(true,Tuple3.of(id,time,count)));
}else {
collector.collect(Tuple2.of(true,Tuple3.of(id,time,count)));
}
}
}
}
package com.yyds.flink_distinct;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
/** * 重点在于如果收到编码为false 的数据,那么需要从当前计数里面减掉撤回的计数值。 */
public class _07_DistinctProcessFunction02 extends KeyedProcessFunction<_07_AdKey2, Tuple2<Boolean, Tuple3<Integer,Long,Long>>,Void> {
// 定义状态ValueState
ValueState<Long> countState ;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Long> countStateDescriptor = new ValueStateDescriptor<>("countState", Long.class);
countState = getRuntimeContext().getState(countStateDescriptor);
}
@Override
public void processElement(Tuple2<Boolean, Tuple3<Integer, Long, Long>> tp2, Context context, Collector<Void> collector) throws Exception {
Long count = countState.value();
if(count == null) count = 0L;
Boolean bool = tp2.f0;
System.out.println(bool);
if(bool){
countState.update(count + tp2.f1.f2);
System.out.println(context.getCurrentKey() + ":" + countState.value());
}else {
// 发生撤回,那么需要从当前计数里面减掉撤回的计数值。
countState.update(count - tp2.f1.f2);
System.out.println(context.getCurrentKey() + ":" + countState.value());
}
}
}
javaBean:
package com.yyds.flink_distinct;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@AllArgsConstructor
@NoArgsConstructor
@Data
@ToString
/** * 原始数据 */
public class _07_AdData {
private int id;
private String devId;
private Long time;
}
package com.yyds.flink_distinct;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@AllArgsConstructor
@NoArgsConstructor
@Data
@ToString
/** * 第一次keyBy的数据 */
public class _07_AdKey1 {
private int id;
private Long time;
private int bucketCode ; // 桶的编码
}
package com.yyds.flink_distinct;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@AllArgsConstructor
@NoArgsConstructor
@Data
@ToString
/** * 第二次keyBy的数据 */
public class _07_AdKey2 {
private int id;
private Long time;
}
边栏推荐
- 在mysql中同时使用left join on 和where 的查询结果分析
- Li Mu hands-on learning, in-depth learning, V2 transformer and code implementation
- 使用sqlplus显示中文为乱码的解决办法
- Basic functions and collections of guava
- UUID and secrets module
- 杂谈:最近好多朋友谈出国……
- [wsl2] configure the USB camera connecting the USB device and using the host
- Chapter 6 Shell Logic and Arithmetic
- ADB instruction sorting
- Gossip: talk with your daughter about why you should learn culture lessons well
猜你喜欢

Essay: college entrance examination

漏风的小棉袄……

Overall dichotomy?

多线程【初阶-上篇】

(2022杭电多校三)1009.Package Delivery(贪心)

(2022 Hangdian multi school III) 1009.package delivery (greedy)

js中的数组方法与循环

(posted) comparison of Eureka, consumer and Nacos 2

How to submit C4d animation to cloud rendering farm for fast rendering?

35. Search Insert Position 搜索插入位置
随机推荐
Examples of Oracle triggers
QT连接sqlite数据库的错误及其修改办法
整体二分?
请教大佬们一个问题,pgsqlcdc任务运行一段时间就不能监测变化了,重启就可以了,这个该从哪方面入
Use of tigervnc
Py2exe QT interface style becomes Win98 solution
Confluence漏洞学习——CVE-2021-26084/85,CVE-2022-26134漏洞复现
Essay: college entrance examination
(posted) comparison of Eureka, consumer and Nacos 1
利用 Amazon DynamoDB 和 Amazon S3 结合 gzip 压缩,最大化存储玩家数据
Quickly update the information in a field in kettle
Analysis of query results using both left join on and where in MySQL
临界区(critical section 每个线程中访问 临界资源 的那段代码)和互斥锁(mutex)的区别(进程间互斥量、共享内存、虚拟地址)
Drconv pytorch is changed to the same size of output and input
Guava的基础功能与集合
【golang学习笔记2.1】 golang中的数组中的排序和查找
如何取得对象的DDL信息
Excuse me, MySQL timestamp (6) using flick SQL is null. Is there a way to deal with this
Gossip: talk with your daughter about why you should learn culture lessons well
Introduction to network -- overview of VLAN and trunk