当前位置:网站首页>Combined with case: the usage of the lowest API (processfunction) in Flink framework
Combined with case: the usage of the lowest API (processfunction) in Flink framework
2022-07-04 14:32:00 【InfoQ】
summary
ProcessFunction
KeyedProcessFunction
CoProcessFunction
ProcessJoinFunction
BroadcastProcessFunction
KeyedBroadcastProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction
KeyedProcessFunction<K, I, O>
- This method is called by every element in the data flow , The result of the call will be placed in Collector The output type in the data .Context The timestamp that can access the element , Elemental key, as well as TimerService Time service .Context You can also output the results to other streams (side outputs).
processElement(I value, Context ctx, Collector<O> out)
- Called when a previously registered timer triggers . Parameters timestamp The trigger time stamp set for the timer .Collector Is the set of output results .OnTimerContext and processElement Of Context Same parameter , Provides some information about the context . For example, timer triggered time information ( Event time or processing time ).
onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
Timer
- Returns the current processing time
long currentProcessingTime()
- Returns the current watermark The timestamp
long currentWatermark()
- Register current key Of processing time Timer for , When processing time When the time comes , Trigger timer.
void registerProcessingTimeTimer(long timestamp)
- Register current key Of event time Timer . When the water level is greater than or equal to the timer registration time , Trigger timer to execute callback function .
void registerEventTimeTimer(long timestamp)
- Before deleting the register processing time timer . If there is no timer for this timestamp , Do not perform
void deleteProcessingTimeTimer(long timestamp)
- Delete previously registered event time timers , If there is no timer for this timestamp , Do not perform .
void deleteEventTimeTimer(long timestamp)
public class ProcessTest1_KeyedProcessFunction {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// test keyedprocessFunction Group first , Custom processing
dataStream.keyBy("id")
.process(new MyProcess())
.print();
env.execute();
}
// Implement custom processing functions
public static class MyProcess extends KeyedProcessFunction<Tuple,SensorReading,Integer> {
ValueState<Long> tsTimeState;
@Override
public void open(Configuration parameters) throws Exception {
tsTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
"tsTimeState",Long.class
));
}
@Override
public void processElement(SensorReading value, Context ctx, Collector<Integer> out) throws Exception {
out.collect(value.getId().length());
// Context operation
ctx.timestamp();
ctx.getCurrentKey();
// Sidestream
//ctx.output();
// Get the current system processing time
ctx.timerService().currentProcessingTime();
// Get the current event time
ctx.timerService().currentWatermark();
// Register the system processing time timer
ctx.timerService().registerProcessingTimeTimer( ctx.timerService().currentProcessingTime() + 1000L);
tsTimeState.update( ctx.timerService().currentProcessingTime() + 1000L);
// Register the event time timer
//ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 10) * 1000L);
// Delete time
//ctx.timerService().deleteProcessingTimeTimer(tsTimeState.value());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
System.out.println(timestamp+" Timer triggered ");
ctx.getCurrentKey();
//ctx.output();
ctx.timeDomain();
}
@Override
public void close() throws Exception {
tsTimeState.clear();
}
}
}
public class ProcessTest2_ApplicationCase {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
dataStream.keyBy("id")
.process(new TempConsIncreWarring(10))
.print();
env.execute();
}
// Custom function Test for a period of time ( Time domain ) The internal temperature rises continuously , Output alarm
private static class TempConsIncreWarring extends KeyedProcessFunction<Tuple, SensorReading, String> {
// Define time fields
private Integer interval;
public TempConsIncreWarring(Integer interval) {
this.interval = interval;
}
// Define the State , Save the last temperature value , Timer timestamp
private ValueState<Double> lastTempState;
private ValueState<Long> tsTimeState;
@Override
public void open(Configuration parameters) throws Exception {
lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>(
"lastTempState", Double.class, Double.MIN_VALUE));
tsTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
"tsTimeState", Long.class));
}
@Override
public void processElement(SensorReading value, Context ctx, Collector<String> out) throws Exception {
// Removal status
Double lastTemp = lastTempState.value();
Long tsTime = tsTimeState.value();
// Register if the temperature rises 10 Seconds later and when there is no timer , wait for
if (value.getTemperature() > lastTemp && tsTime == null) {
// Calculate the timer timestamp
Long ts = ctx.timerService().currentProcessingTime() + interval * 1000L;
// Register timer
ctx.timerService().registerProcessingTimeTimer(ts);
// Update time status
tsTimeState.update(ts);
}
// If the temperature drops The timer needs to be deleted
if (value.getTemperature() < lastTemp && tsTime != null) {
ctx.timerService().deleteProcessingTimeTimer(tsTime);
// Clear the time status timer
tsTimeState.clear();
}
// Update temperature status
lastTempState.update(value.getTemperature());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// Timer triggered Output office reporting information
out.collect(" sensor "+ ctx.getCurrentKey().getField(0) + " The temperature value is continuous "+ interval +" Seconds have been rising ");
tsTimeState.clear();
}
@Override
public void close() throws Exception {
lastTempState.clear();
}
}
}
Side flow output
public class ProcessTest3_SideOutputCase {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// Definition outputTag Indicates low temperature flow
OutputTag lowTemp = new OutputTag<SensorReading>("lowTemp") {
};
// Customize the measurement output flow to realize shunting operation
SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>() {
@Override
public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {
// Judge that the temperature is greater than 30 High temperature flow Output to the mainstream The low temperature flow is output in the side flow
if (value.getTemperature() > 30) {
out.collect(value);
} else {
ctx.output(lowTemp, value);
}
}
});
highTempStream.print("high-Temp");
highTempStream.getSideOutput(lowTemp).print("low");
env.execute();
}
}
边栏推荐
- docker-compose公网部署redis哨兵模式
- Digi XBee 3 RF: 4个协议,3种封装,10个大功能
- The implementation of OSD on rk1126 platform supports color translucency and multi-channel support for Chinese
- 第十七章 进程内存
- Opencv3.2 and opencv2.4 installation
- Xcode abnormal pictures cause IPA packet size problems
- R language ggplot2 visualization: gganimate package creates dynamic line graph animation (GIF) and uses transition_ The reveal function displays data step by step along a given dimension in the animat
- (1) The standard of performance tuning and the correct posture for tuning - if you have performance problems, go to the heapdump performance community!
- Docker compose public network deployment redis sentinel mode
- Supprimer les lettres dupliquées [avidité + pile monotone (maintenir la séquence monotone avec un tableau + Len)]
猜你喜欢
Leetcode T48: rotating images
Leetcode 61: rotating linked list
92.(cesium篇)cesium楼栋分层
Nowcoder reverse linked list
Vscode common plug-ins summary
Use of tiledlayout function in MATLAB
Wt588f02b-8s (c006_03) single chip voice IC scheme enables smart doorbell design to reduce cost and increase efficiency
基于51单片机的超声波测距仪
Data center concept
What is the difference between Bi financial analysis in a narrow sense and financial analysis in a broad sense?
随机推荐
An overview of 2D human posture estimation
LVGL 8.2 keyboard
How to package QT and share exe
Learn kernel 3: use GDB to track the kernel call chain
Digi XBee 3 RF: 4个协议,3种封装,10个大功能
Gin integrated Alipay payment
Why should Base64 encoding be used for image transmission
Rich text editing: wangeditor tutorial
Leetcode 61: 旋转链表
Use of tiledlayout function in MATLAB
失败率高达80%,企业数字化转型路上有哪些挑战?
潘多拉 IOT 开发板学习(RT-Thread)—— 实验3 按键实验(学习笔记)
Detailed explanation of visual studio debugging methods
Docker compose public network deployment redis sentinel mode
scratch古堡历险记 电子学会图形化编程scratch等级考试三级真题和答案解析2022年6月
R language ggplot2 visualization: gganimate package creates dynamic line graph animation (GIF) and uses transition_ The reveal function displays data step by step along a given dimension in the animat
No servers available for service: xxxx
Supprimer les lettres dupliquées [avidité + pile monotone (maintenir la séquence monotone avec un tableau + Len)]
Data warehouse interview question preparation
LVGL 8.2 List