当前位置:网站首页>Sessionwindow of Flink
Sessionwindow of Flink
2022-06-10 05:45:00 【Dragon man who can't fly】
sessionWindows Session window : Cut into different partition windows according to inactive time , And calculate the window
Sample environment
java.version: 1.8.x
flink.version: 1.11.1Sample data source ( Project code cloud download )
Flink System examples And Build development environment and data
SessionWindow.java
import com.flink.examples.DataSource;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.List;
/**
* @Description sessionWindows Session window : Cut into different partition windows according to inactive time , And calculate the window
*/
public class SessionWindow {
/**
* Ergodic set , Return to the session sliding window after segmentation by inactive time , The largest age data record in the gender section under each window
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set the flow processing time event , This time type must be set for the session window , There are three types :
//1.ProcessingTime: With operator The processing time is subject to , It uses the system time of the machine as data stream Time for
//2.IngestionTime: Enter with data flink streaming data flow According to the time of
//3.EventTime: Subject to the time stamp field of the data , The application needs to specify how to start from record Extract the timestamp field from
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setParallelism(4);
DataStream<Tuple3<String, String, Integer>> inStream = env.addSource(new MyRichSourceFunction());
DataStream<Tuple3<String, String, Integer>> dataStream = inStream.keyBy((KeySelector<Tuple3<String, String, Integer>, String>) k ->k.f1)
// Press session window to scroll , When 2 No partition data stream specified in seconds , Then calculate once
// The session window is based on the fact that there is no active data access after a specified time , The window ends , Do window calculation
.window(EventTimeSessionWindows.withGap(Time.seconds(2)))
.reduce(new ReduceFunction<Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> t1, Tuple3<String, String, Integer> t2) throws Exception {
// Return to the oldest
return t1.f2 > t2.f2 ? t1: t2;
}
});
dataStream.print();
env.execute("flink EventTimeSessionWindows job");
}
/**
* Analog data continues to output
*/
public static class MyRichSourceFunction extends RichParallelSourceFunction<Tuple3<String, String, Integer>> {
@Override
public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList();
for (Tuple3 tuple3 : tuple3List){
ctx.collect(tuple3);
//1 One per second
Thread.sleep(2 * 1000);
}
}
@Override
public void cancel() {
try{
super.close();
}catch (Exception e){
e.printStackTrace();
}
}
}
}Print the results
2> ( Zhang San ,man,20)
4> ( Li Si ,girl,24)
2> ( Wang Wu ,man,29)
4> ( Liu Liu ,girl,32)
2> ( Wu ba ,man,30)边栏推荐
- Curator - implement service registration and discovery
- The first BMW I brand exclusive experience store was opened to fully demonstrate the charm of BMW electric products
- 【软件工程导论】知识点汇总 | 适用于考试复习 | 轻松通过考试
- Talk about CTF web WP
- What's new in the latest version of Camtasia 2022
- Talk about "honest responsibility" in ant AI technology
- With the advent of the digital wave, how to achieve agile business delivery and sustainable technology governance? Uncover the ant bizstack
- Model Lightweight - cutting distillation Series yolov5 nonestructive Cutting (attached source)
- 顺序查找、二分查找
- Landing of global organizational structure control
猜你喜欢

Safari's favorites item does not appear on the home page

Simple and interesting Snake growth game -- greedy snake

Web89~web96---php feature of ctfshow (1)

The first BMW I brand exclusive experience store was opened to fully demonstrate the charm of BMW electric products

Hevc HM learning 01

Idea cancels data simplification

Use of redis

Jifeng lighting | the spotlight without secondary light spot is perfect. The hill is perfect

五项最优!蚂蚁集团通过信通院“稳保计划”最高级评测

Ant group's all-in-one privacy computing machine has obtained double certification, and 83 indicators meet the requirements
随机推荐
In the kernel_ init,_ Role in exit
蚂蚁集团隐私计算一体机获得双认证,83项指标均达要求
刃 7000P 内存频率限制 2400 的解决方法
MYSQL第二篇(核心技术)
Talking about thread pool with pictures and texts
npm命令大全
Top prize! The leading scientific and technological achievement "new technology" of 2022 digital Expo was awarded to oceanbase database
Safari's favorites item does not appear on the home page
IDC released China Cloud native market analysis. Ant group has become one of the most comprehensive manufacturers
[live dialogue] graph computing is the next frontier of science and technology
Learning of common functional interfaces
Use of redis
How to ensure system stability and achieve emission reduction? Ant group has these key technologies
Five best! Ant group passed the highest level evaluation of the "stability Assurance Plan" of the ICT Institute
[stacking | fast scheduling] Top-k problem
Apm飞控学习笔记之悬停loiter模式-Cxm
Basic concepts of multithreading family
idea 远程调试代码
聊一聊蚂蚁AI技术里的“老实担当”
In R language, GLM is used to build logistic regression model, and the relationship model between multiple covariates and grouped variables is built to calculate, estimate and predict propensity score