当前位置:网站首页>[Flink] aggregation operator
[Flink] aggregation operator
2022-07-25 03:27:00 【No bug is the biggest bug】
One 、 Entity class
@Data
public class Event {
public String user;
public String url;
public Long timestamp;
public Event(String user, String url, Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
}Two 、 Custom data sources simulate streaming data
public class ClickSource implements SourceFunction<Event> {
// Declare a flag class
private Boolean running = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
// Randomly generated data
Random random = new Random();
// Define the data set selected by the field
String[] users = {"Mary","Alice","Bob","Cary"};
String[] urls = {"./home","./cart","./fav","./prod?id=100","./prod:id=10"};
// Loop data
while (running){
String user = users[random.nextInt(users.length)];
String url = urls[random.nextInt(user.length())];
Long time = System.currentTimeMillis();
ctx.collect(new Event(user,url,time));
Thread.sleep(2000);
}
}
@Override
public void cancel() {
running = false;
}
}3、 ... and 、 Task code
public static void main(String[] args) throws Exception {
// Create an execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set parallelism
env.setParallelism(1);
// Read data source
DataStreamSource<Event> dataStream = env.addSource(new ClickSource());
// Press key groups to perform aggregate query Extract the latest access data of the current user max Only take the maximum value of the current specified field maxBy Round the maximum value of a piece of data
SingleOutputStreamOperator<Event> max = dataStream.keyBy(new KeySelector<Event, String>() {
// Group here by user name
@Override
public String getKey(Event event) throws Exception {
return event.user;
}
}).max("timestamp");
SingleOutputStreamOperator<Event> maxBy = dataStream.keyBy(new KeySelector<Event, String>() {
// Group here by user name
@Override
public String getKey(Event event) throws Exception {
return event.user;
}
}).maxBy("timestamp");
// Print
max.print("max");
maxBy.print("maxBy");
// Start
env.execute();
}边栏推荐
- C language writes a circular advertising lantern or changes it to a confession system
- Hal library serial port for note taking
- Unity: test rotation function
- Leetcode programming practice -- Tencent selected 50 questions (I)
- The relationship between private domain traffic and fission marketing. What is super app? Can our enterprise own it?
- Sword finger offer II 041. Average value of sliding window_____ Using queue / loop array implementation
- 292. Nim game
- C language introduction practice (9): completion judgment
- 04 -- two ways of writing el and data
- List type to string type
猜你喜欢

NC | progress has been made in the study of the ecological network relationship between dissolved organic carbon and microorganisms in the context of global change

Stm32cubemx quadrature encoder

Imeta | ggclusternet microbial network analysis and visualization nanny level tutorial

Advantages and disadvantages of zero trust security

A code takes you to draw multi format sangjimei pictures such as interactive +pdf+png

Force deduction brush question 26. Delete duplicates in the ordered array

Flowlayout in compose

144. Preorder traversal of binary tree

Use reflection to convert RDD to dataframe

VMware installation
随机推荐
Algorithmic interview questions
Use of stm32cubemonitor part I - data plotting and instrument display
How does Jupiter notebook change themes and font sizes?
Network construction and application in 2020 -- the answer of samba in Guosai
Use of CCleaner
Electronic bidding procurement mall system: optimize traditional procurement business and speed up enterprise digital upgrading
B. All Distinct
Visio use
Secondary vocational network security skills competition P100 dcore (light CMS system) SQL injection
Time complexity and space complexity
Function method encapsulation -- mutual conversion of image types qpixmap, qimage and mat
Detailed explanation of three factory modes
Time formatting
Take a note: Oracle conditional statement
Force deduction brush question 14. Longest common prefix
Experiment 4 CTF practice
[kaggle] how to effectively avoid oom and the long process of alchemy
Imeta | ggclusternet microbial network analysis and visualization nanny level tutorial
Openlayers draw deletes the last point when drawing
Recursive and non recursive methods are used to realize the first order, middle order and second order traversal of binary tree respectively