当前位置:网站首页>Flynk de duplication (2) solve the hot issues in the process of flynk and flynk SQL de duplication
Flynk de duplication (2) solve the hot issues in the process of flynk and flynk SQL de duplication
2022-07-27 07:31:00 【undo_ try】
solve flink、flink-sql Hot issues in the process of weight removal
1、flink-sql Solve hot issues
Use Sql To realize a de duplication function , This is usually done SELECT day, COUNT(DISTINCT user_id) FROM T GROUP BY day --sql1
perhaps select day,count(*) from( select distinct user_id,day from T ) a group by day --sql2
But neither of these two methods has solved the hot issue of Computing , For example, when a day Corresponding devId Especially in large cases , Then the calculation pressure will be up to day Where task, Make this task Become the performance bottleneck of the task .
package com.yyds.flink_distinct;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/** * Hot issues in the process of weight removal ( Use flink sql Solve ) * */
public class _06_DistinctHotpotFlinkSql {
public static void main(String[] args) {
// Create the execution environment for the table
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// Configuration for solving hot issues
tenv.getConfig().getConfiguration().setString("table.optimizer.distinct-agg.split.enabled", "true");
SingleOutputStreamOperator<_06_User> ss1 = env.socketTextStream("hadoop01", 9999)
.map(new MapFunction<String, _06_User>() {
@Override
public _06_User map(String line) throws Exception {
String[] arr = line.split(",");
return new _06_User(arr[0], arr[1]);
}
});
tenv.createTemporaryView("T",ss1);
String executeSql = "SELECT `day`, COUNT(DISTINCT user_id) as cnt FROM T GROUP BY `day`";
/** * -- Will be converted to this sql SELECT day, SUM(cnt) FROM ( SELECT day, COUNT(DISTINCT user_id) as cnt FROM T GROUP BY day, MOD(HASH_CODE(user_id), 1024) ) GROUP BY day MOD(HASH_CODE(user_id), 1024) Indicates opposite retrieval user_id Of hashCode Then on 1024 Remainder , Also is to user_id into 1024 In a bucket , Then the inner layer passes through the right day Remove the weight with the bucket number (cnt) The outer layer only needs to be right cnt perform sum Just operate , Because the barrel dividing operation limits the same user_id It must be in the same bucket */
String explainSql = tenv.explainSql(executeSql, ExplainDetail.CHANGELOG_MODE);
System.out.println(explainSql);
tenv.executeSql(executeSql).print();
}
}
2、flink Solve hot issues
Hot issues in the process of weight removal ( coded )
Calculate the number of advertising visitors in real time , Traffic data id( advertising ID)、devId( visit ID)、time( Access time )
Realize the idea :
• First of all, through the right id、 equipment id Barrel number 、 Hour level time grouping , Use one ProcessFunction Calculate the weight removal after barrel separation ( And MapState The same way )
• And then through the right id、 Hour level time grouping , Use another ProcessFunction do sum operation ,
But there is a problem that needs to be paid attention to for the same id Its data may come from different sources than time task,
And every upstream task All data will be sent down in full , If you do the accumulation operation directly, it will lead to repeated calculation , So we have to implement a similar sql in retract Withdrawal mechanism ,
That's the last one ProcessFunction Every time you send a piece of data, you need to send a copy of the previous data first, indicating that it is withdrawn .
The main program :
package com.yyds.flink_distinct;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
/** * Hot issues in the process of weight removal ( coded ) * * * Calculate the number of advertising visitors in real time , Traffic data id( advertising ID)、devId( visit ID)、time( Access time ) * * Realize the idea : * • First of all, through the right id、 equipment id Barrel number 、 Hour level time grouping , Use one ProcessFunction Calculate the weight removal after barrel separation ( And MapState The same way ) * • And then through the right id、 Hour level time grouping , Use another ProcessFunction do sum operation , * * * But there is a problem that needs to be paid attention to for the same id Its data may come from different sources than time task, * And every upstream task All data will be sent down in full , If you do the accumulation operation directly, it will lead to repeated calculation , So we have to implement a similar sql in retract Withdrawal mechanism , * That's the last one ProcessFunction Every time you send a piece of data, you need to send a copy of the previous data first, indicating that it is withdrawn . * */
public class _07_DistinctHotpot {
public static void main(String[] args) throws Exception {
// Create the execution environment for the table
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
/** * Analog data : * 1,001,1000 * 1,002,1000 * 1,003,1000 * 1,004,1000 */
// Read raw data , Convert to javaBean
SingleOutputStreamOperator<_07_AdData> ss1 = env.socketTextStream("hadoop01", 9999)
.map(new MapFunction<String, _07_AdData>() {
@Override
public _07_AdData map(String line) throws Exception {
String[] arr = line.split(",");
return new _07_AdData(Integer.parseInt(arr[0]), arr[1],Long.parseLong( arr[2]));
}
});
// First of all, through the right id、 equipment id Barrel number 、 Hour level time grouping , Use one ProcessFunction Calculate the weight removal after barrel separation ( And MapState The same way )
KeyedStream<_07_AdData, _07_AdKey1> keyedStream1 = ss1.keyBy(new KeySelector<_07_AdData, _07_AdKey1>() {
@Override
public _07_AdKey1 getKey(_07_AdData data) throws Exception {
long endTime = TimeWindow.getWindowStartWithOffset(data.getTime(), 0, Time.hours(1).toMilliseconds()) + Time.hours(1).toMilliseconds();
return new _07_AdKey1(data.getId(), endTime, data.getDevId().hashCode() % 3);
}
});
SingleOutputStreamOperator<Tuple2<Boolean, Tuple3<Integer, Long, Long>>> processStream1 = keyedStream1.process(new _07_DistinctProcessFunction01());
KeyedStream<Tuple2<Boolean, Tuple3<Integer, Long, Long>>, _07_AdKey2> keyedStream2 = processStream1.keyBy(new KeySelector<Tuple2<Boolean, Tuple3<Integer, Long, Long>>, _07_AdKey2>() {
@Override
public _07_AdKey2 getKey(Tuple2<Boolean, Tuple3<Integer, Long, Long>> tp2) throws Exception {
return new _07_AdKey2(tp2.f1.f0, tp2.f1.f1);
}
});
keyedStream2.process(new _07_DistinctProcessFunction02());
env.execute("_07_DistinctHotpot");
}
}
Custom function
package com.yyds.flink_distinct;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class _07_DistinctProcessFunction01 extends KeyedProcessFunction<_07_AdKey1,_07_AdData, Tuple2<Boolean, Tuple3<Integer,Long,Long>>> {
// Define the first state MapState
MapState<String,Integer> deviceIdState ;
// Define the second state ValueState
ValueState<Long> countState ;
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<String, Integer> deviceIdStateDescriptor = new MapStateDescriptor<>("deviceIdState", String.class, Integer.class);
deviceIdState = getRuntimeContext().getMapState(deviceIdStateDescriptor);
ValueStateDescriptor<Long> countStateDescriptor = new ValueStateDescriptor<>("countState", Long.class);
countState = getRuntimeContext().getState(countStateDescriptor);
}
@Override
public void processElement(_07_AdData adData, Context context, Collector<Tuple2<Boolean, Tuple3<Integer, Long, Long>>> collector) throws Exception {
String devId = adData.getDevId();
Integer i = deviceIdState.get(devId);
if(i == null){
i = 0;
}
int id = context.getCurrentKey().getId();
long time = context.getCurrentKey().getTime();
long code = context.getCurrentKey().getBucketCode();
Long c = countState.value();
if(c == null){
c = 0L;
}
// System.out.println("id = " + id + ",time = " + time + ",c = " + c + ",code = " + code);
if( i == 1 ){
// It means that there is already
}else {
// Does not exist , Put into the state
deviceIdState.put(devId,1);
// The statistical data + 1
Long count = c + 1;
countState.update(count);
System.out.println("id = " + id + ",time = " + time + ",count = " + count + ",code = " + code);
if(count > 1){
// Think greater than 1 Need to withdraw
System.out.println("======== withdraw ======");
collector.collect(Tuple2.of(false,Tuple3.of(id,time,c)));
collector.collect(Tuple2.of(true,Tuple3.of(id,time,count)));
}else {
collector.collect(Tuple2.of(true,Tuple3.of(id,time,count)));
}
}
}
}
package com.yyds.flink_distinct;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
/** * The point is that if the received code is false The data of , Then you need to subtract the withdrawn count value from the current count . */
public class _07_DistinctProcessFunction02 extends KeyedProcessFunction<_07_AdKey2, Tuple2<Boolean, Tuple3<Integer,Long,Long>>,Void> {
// Define the State ValueState
ValueState<Long> countState ;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Long> countStateDescriptor = new ValueStateDescriptor<>("countState", Long.class);
countState = getRuntimeContext().getState(countStateDescriptor);
}
@Override
public void processElement(Tuple2<Boolean, Tuple3<Integer, Long, Long>> tp2, Context context, Collector<Void> collector) throws Exception {
Long count = countState.value();
if(count == null) count = 0L;
Boolean bool = tp2.f0;
System.out.println(bool);
if(bool){
countState.update(count + tp2.f1.f2);
System.out.println(context.getCurrentKey() + ":" + countState.value());
}else {
// Withdrawal occurs , Then you need to subtract the withdrawn count value from the current count .
countState.update(count - tp2.f1.f2);
System.out.println(context.getCurrentKey() + ":" + countState.value());
}
}
}
javaBean:
package com.yyds.flink_distinct;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@AllArgsConstructor
@NoArgsConstructor
@Data
@ToString
/** * Raw data */
public class _07_AdData {
private int id;
private String devId;
private Long time;
}
package com.yyds.flink_distinct;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@AllArgsConstructor
@NoArgsConstructor
@Data
@ToString
/** * for the first time keyBy The data of */
public class _07_AdKey1 {
private int id;
private Long time;
private int bucketCode ; // Coding of barrels
}
package com.yyds.flink_distinct;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@AllArgsConstructor
@NoArgsConstructor
@Data
@ToString
/** * The second time keyBy The data of */
public class _07_AdKey2 {
private int id;
private Long time;
}
边栏推荐
- The difference between critical section (the code that accesses critical resources in each thread) and mutex (mutex between processes, shared memory, virtual address)
- C# Winfrom 常用功能整合-2
- 软件测试十大必问面试题(附答案和解析)
- (2022杭电多校三)1011.Taxi(曼哈顿最值+二分)
- ? Experiment 7 implementation of PHP management system based on MySQL
- flink中维表Join几种常见方式总结
- MySQL: 提高最大连接数
- Functools module
- Chapter 6 Shell Logic and Arithmetic
- SQLite common function integration
猜你喜欢

利用 Amazon DynamoDB 和 Amazon S3 结合 gzip 压缩,最大化存储玩家数据

Internal class -- just read this article~

MySQL2

Perl: 将要执行的外部命令拆分为多行

Using loops to process data in tables in kettle

连接MySQL时报错:Public Key Retrieval is not allowed 【解决方法】

在Perl程序中暴露Prometheus指标

Zabbix: 将收集到值映射为易读的语句

Mysql: increase the maximum number of connections

(2022杭电多校三)1011.Taxi(曼哈顿最值+二分)
随机推荐
sql语句批量更新 时间减去1天
Compiling and using log4cxx in rhel7.3
Zabbix: map collected values to readable statements
Tcp/ip protocol analysis (tcp/ip three handshakes & four waves + OSI & TCP / IP model)
C语言实现猜数字小游戏项目实战(基于srand函数、rand函数,Switch语句、while循环、if条件判据等)
oracle清理含有引用分区的表的数据库磁盘空间
Usage of string class
用户解锁SM04 SM12
SQLite 常用功能整合
C# Winfrom 常用功能整合-2
Synchronized锁
Bash: 创建返回布尔类型值的函数
Advanced IO outline
Excuse me, MySQL timestamp (6) using flick SQL is null. Is there a way to deal with this
Using loops to process data in tables in kettle
【WSL2】配置连接 USB 设备并使用主机的 USB 摄像头
Gossip: Recently, many friends talk about going abroad
oracle的触发器的使用举例
VLAN trunk实验
软件测试十大必问面试题(附答案和解析)