当前位置:网站首页>Combined with case: the usage of the lowest API (processfunction) in Flink framework
Combined with case: the usage of the lowest API (processfunction) in Flink framework
2022-07-04 14:32:00 【InfoQ】
summary

ProcessFunctionKeyedProcessFunctionCoProcessFunctionProcessJoinFunctionBroadcastProcessFunctionKeyedBroadcastProcessFunctionProcessWindowFunctionProcessAllWindowFunctionKeyedProcessFunction<K, I, O>
- This method is called by every element in the data flow , The result of the call will be placed in Collector The output type in the data .Context The timestamp that can access the element , Elemental key, as well as TimerService Time service .Context You can also output the results to other streams (side outputs).
processElement(I value, Context ctx, Collector<O> out)
- Called when a previously registered timer triggers . Parameters timestamp The trigger time stamp set for the timer .Collector Is the set of output results .OnTimerContext and processElement Of Context Same parameter , Provides some information about the context . For example, timer triggered time information ( Event time or processing time ).
onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
Timer
- Returns the current processing time
long currentProcessingTime()
- Returns the current watermark The timestamp
long currentWatermark()
- Register current key Of processing time Timer for , When processing time When the time comes , Trigger timer.
void registerProcessingTimeTimer(long timestamp)
- Register current key Of event time Timer . When the water level is greater than or equal to the timer registration time , Trigger timer to execute callback function .
void registerEventTimeTimer(long timestamp)
- Before deleting the register processing time timer . If there is no timer for this timestamp , Do not perform
void deleteProcessingTimeTimer(long timestamp)
- Delete previously registered event time timers , If there is no timer for this timestamp , Do not perform .
void deleteEventTimeTimer(long timestamp)
public class ProcessTest1_KeyedProcessFunction {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// test keyedprocessFunction Group first , Custom processing
dataStream.keyBy("id")
.process(new MyProcess())
.print();
env.execute();
}
// Implement custom processing functions
public static class MyProcess extends KeyedProcessFunction<Tuple,SensorReading,Integer> {
ValueState<Long> tsTimeState;
@Override
public void open(Configuration parameters) throws Exception {
tsTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
"tsTimeState",Long.class
));
}
@Override
public void processElement(SensorReading value, Context ctx, Collector<Integer> out) throws Exception {
out.collect(value.getId().length());
// Context operation
ctx.timestamp();
ctx.getCurrentKey();
// Sidestream
//ctx.output();
// Get the current system processing time
ctx.timerService().currentProcessingTime();
// Get the current event time
ctx.timerService().currentWatermark();
// Register the system processing time timer
ctx.timerService().registerProcessingTimeTimer( ctx.timerService().currentProcessingTime() + 1000L);
tsTimeState.update( ctx.timerService().currentProcessingTime() + 1000L);
// Register the event time timer
//ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 10) * 1000L);
// Delete time
//ctx.timerService().deleteProcessingTimeTimer(tsTimeState.value());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
System.out.println(timestamp+" Timer triggered ");
ctx.getCurrentKey();
//ctx.output();
ctx.timeDomain();
}
@Override
public void close() throws Exception {
tsTimeState.clear();
}
}
}
public class ProcessTest2_ApplicationCase {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
dataStream.keyBy("id")
.process(new TempConsIncreWarring(10))
.print();
env.execute();
}
// Custom function Test for a period of time ( Time domain ) The internal temperature rises continuously , Output alarm
private static class TempConsIncreWarring extends KeyedProcessFunction<Tuple, SensorReading, String> {
// Define time fields
private Integer interval;
public TempConsIncreWarring(Integer interval) {
this.interval = interval;
}
// Define the State , Save the last temperature value , Timer timestamp
private ValueState<Double> lastTempState;
private ValueState<Long> tsTimeState;
@Override
public void open(Configuration parameters) throws Exception {
lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>(
"lastTempState", Double.class, Double.MIN_VALUE));
tsTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
"tsTimeState", Long.class));
}
@Override
public void processElement(SensorReading value, Context ctx, Collector<String> out) throws Exception {
// Removal status
Double lastTemp = lastTempState.value();
Long tsTime = tsTimeState.value();
// Register if the temperature rises 10 Seconds later and when there is no timer , wait for
if (value.getTemperature() > lastTemp && tsTime == null) {
// Calculate the timer timestamp
Long ts = ctx.timerService().currentProcessingTime() + interval * 1000L;
// Register timer
ctx.timerService().registerProcessingTimeTimer(ts);
// Update time status
tsTimeState.update(ts);
}
// If the temperature drops The timer needs to be deleted
if (value.getTemperature() < lastTemp && tsTime != null) {
ctx.timerService().deleteProcessingTimeTimer(tsTime);
// Clear the time status timer
tsTimeState.clear();
}
// Update temperature status
lastTempState.update(value.getTemperature());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// Timer triggered Output office reporting information
out.collect(" sensor "+ ctx.getCurrentKey().getField(0) + " The temperature value is continuous "+ interval +" Seconds have been rising ");
tsTimeState.clear();
}
@Override
public void close() throws Exception {
lastTempState.clear();
}
}
}

Side flow output
public class ProcessTest3_SideOutputCase {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// Definition outputTag Indicates low temperature flow
OutputTag lowTemp = new OutputTag<SensorReading>("lowTemp") {
};
// Customize the measurement output flow to realize shunting operation
SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>() {
@Override
public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {
// Judge that the temperature is greater than 30 High temperature flow Output to the mainstream The low temperature flow is output in the side flow
if (value.getTemperature() > 30) {
out.collect(value);
} else {
ctx.output(lowTemp, value);
}
}
});
highTempStream.print("high-Temp");
highTempStream.getSideOutput(lowTemp).print("low");
env.execute();
}
}

边栏推荐
- Leetcode t47: full arrangement II
- Matters needing attention in overseas game Investment Agency
- Some problems and ideas of data embedding point
- Stm32f1 and stm32subeide programming example -max7219 drives 8-bit 7-segment nixie tube (based on GPIO)
- 2022 game going to sea practical release strategy
- Rich text editing: wangeditor tutorial
- Progress in architecture
- 关于miui12.5 红米k20pro用au或者povo2出现问题的解决办法
- GCC [6] - 4 stages of compilation
- Leetcode 61: rotating linked list
猜你喜欢

docker-compose公网部署redis哨兵模式

Test process arrangement (2)

Leetcode T48:旋转图像

What is the difference between Bi financial analysis in a narrow sense and financial analysis in a broad sense?

No servers available for service: xxxx

sql优化之explain

Excel quickly merges multiple rows of data

潘多拉 IOT 开发板学习(RT-Thread)—— 实验3 按键实验(学习笔记)

Pandora IOT development board learning (RT thread) - Experiment 3 button experiment (learning notes)

Ultrasonic distance meter based on 51 single chip microcomputer
随机推荐
Wt588f02b-8s (c006_03) single chip voice IC scheme enables smart doorbell design to reduce cost and increase efficiency
92. (cesium chapter) cesium building layering
Xcode 异常图片导致ipa包增大问题
Digi重启XBee-Pro S2C生产,有些差别需要注意
Talk about 10 tips to ensure thread safety
2022 game going to sea practical release strategy
(1)性能调优的标准和做好调优的正确姿势-有性能问题,上HeapDump性能社区!
Some problems and ideas of data embedding point
商业智能BI财务分析,狭义的财务分析和广义的财务分析有何不同?
Excel quickly merges multiple rows of data
R language dplyr package summary_ If function calculates the mean and median of all numerical data columns in dataframe data, and summarizes all numerical variables based on conditions
Stm32f1 and stm32subeide programming example -max7219 drives 8-bit 7-segment nixie tube (based on GPIO)
Map of mL: Based on Boston house price regression prediction data set, an interpretable case is realized by using the map value to the LIR linear regression model
Sqlserver functions, creation and use of stored procedures
Test process arrangement (2)
一种架构来完成所有任务—Transformer架构正在以一己之力统一AI江湖
关于miui12.5 红米k20pro用au或者povo2出现问题的解决办法
[MySQL from introduction to proficiency] [advanced chapter] (V) SQL statement execution process of MySQL
Ml: introduction, principle, use method and detailed introduction of classic cases of snap value
ViewModel 初体验