当前位置:网站首页>Combined with case: the usage of the lowest API (processfunction) in Flink framework
Combined with case: the usage of the lowest API (processfunction) in Flink framework
2022-07-04 14:32:00 【InfoQ】
summary

ProcessFunctionKeyedProcessFunctionCoProcessFunctionProcessJoinFunctionBroadcastProcessFunctionKeyedBroadcastProcessFunctionProcessWindowFunctionProcessAllWindowFunctionKeyedProcessFunction<K, I, O>
- This method is called by every element in the data flow , The result of the call will be placed in Collector The output type in the data .Context The timestamp that can access the element , Elemental key, as well as TimerService Time service .Context You can also output the results to other streams (side outputs).
processElement(I value, Context ctx, Collector<O> out)
- Called when a previously registered timer triggers . Parameters timestamp The trigger time stamp set for the timer .Collector Is the set of output results .OnTimerContext and processElement Of Context Same parameter , Provides some information about the context . For example, timer triggered time information ( Event time or processing time ).
onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
Timer
- Returns the current processing time
long currentProcessingTime()
- Returns the current watermark The timestamp
long currentWatermark()
- Register current key Of processing time Timer for , When processing time When the time comes , Trigger timer.
void registerProcessingTimeTimer(long timestamp)
- Register current key Of event time Timer . When the water level is greater than or equal to the timer registration time , Trigger timer to execute callback function .
void registerEventTimeTimer(long timestamp)
- Before deleting the register processing time timer . If there is no timer for this timestamp , Do not perform
void deleteProcessingTimeTimer(long timestamp)
- Delete previously registered event time timers , If there is no timer for this timestamp , Do not perform .
void deleteEventTimeTimer(long timestamp)
public class ProcessTest1_KeyedProcessFunction {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// test keyedprocessFunction Group first , Custom processing
dataStream.keyBy("id")
.process(new MyProcess())
.print();
env.execute();
}
// Implement custom processing functions
public static class MyProcess extends KeyedProcessFunction<Tuple,SensorReading,Integer> {
ValueState<Long> tsTimeState;
@Override
public void open(Configuration parameters) throws Exception {
tsTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
"tsTimeState",Long.class
));
}
@Override
public void processElement(SensorReading value, Context ctx, Collector<Integer> out) throws Exception {
out.collect(value.getId().length());
// Context operation
ctx.timestamp();
ctx.getCurrentKey();
// Sidestream
//ctx.output();
// Get the current system processing time
ctx.timerService().currentProcessingTime();
// Get the current event time
ctx.timerService().currentWatermark();
// Register the system processing time timer
ctx.timerService().registerProcessingTimeTimer( ctx.timerService().currentProcessingTime() + 1000L);
tsTimeState.update( ctx.timerService().currentProcessingTime() + 1000L);
// Register the event time timer
//ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 10) * 1000L);
// Delete time
//ctx.timerService().deleteProcessingTimeTimer(tsTimeState.value());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
System.out.println(timestamp+" Timer triggered ");
ctx.getCurrentKey();
//ctx.output();
ctx.timeDomain();
}
@Override
public void close() throws Exception {
tsTimeState.clear();
}
}
}
public class ProcessTest2_ApplicationCase {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
dataStream.keyBy("id")
.process(new TempConsIncreWarring(10))
.print();
env.execute();
}
// Custom function Test for a period of time ( Time domain ) The internal temperature rises continuously , Output alarm
private static class TempConsIncreWarring extends KeyedProcessFunction<Tuple, SensorReading, String> {
// Define time fields
private Integer interval;
public TempConsIncreWarring(Integer interval) {
this.interval = interval;
}
// Define the State , Save the last temperature value , Timer timestamp
private ValueState<Double> lastTempState;
private ValueState<Long> tsTimeState;
@Override
public void open(Configuration parameters) throws Exception {
lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>(
"lastTempState", Double.class, Double.MIN_VALUE));
tsTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
"tsTimeState", Long.class));
}
@Override
public void processElement(SensorReading value, Context ctx, Collector<String> out) throws Exception {
// Removal status
Double lastTemp = lastTempState.value();
Long tsTime = tsTimeState.value();
// Register if the temperature rises 10 Seconds later and when there is no timer , wait for
if (value.getTemperature() > lastTemp && tsTime == null) {
// Calculate the timer timestamp
Long ts = ctx.timerService().currentProcessingTime() + interval * 1000L;
// Register timer
ctx.timerService().registerProcessingTimeTimer(ts);
// Update time status
tsTimeState.update(ts);
}
// If the temperature drops The timer needs to be deleted
if (value.getTemperature() < lastTemp && tsTime != null) {
ctx.timerService().deleteProcessingTimeTimer(tsTime);
// Clear the time status timer
tsTimeState.clear();
}
// Update temperature status
lastTempState.update(value.getTemperature());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// Timer triggered Output office reporting information
out.collect(" sensor "+ ctx.getCurrentKey().getField(0) + " The temperature value is continuous "+ interval +" Seconds have been rising ");
tsTimeState.clear();
}
@Override
public void close() throws Exception {
lastTempState.clear();
}
}
}

Side flow output
public class ProcessTest3_SideOutputCase {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// Definition outputTag Indicates low temperature flow
OutputTag lowTemp = new OutputTag<SensorReading>("lowTemp") {
};
// Customize the measurement output flow to realize shunting operation
SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>() {
@Override
public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {
// Judge that the temperature is greater than 30 High temperature flow Output to the mainstream The low temperature flow is output in the side flow
if (value.getTemperature() > 30) {
out.collect(value);
} else {
ctx.output(lowTemp, value);
}
}
});
highTempStream.print("high-Temp");
highTempStream.getSideOutput(lowTemp).print("low");
env.execute();
}
}

边栏推荐
- Test evaluation of software testing
- Leetcode t47: full arrangement II
- 【算法leetcode】面试题 04.03. 特定深度节点链表(多语言实现)
- First experience of ViewModel
- C # WPF realizes the real-time screen capture function of screen capture box
- 数据中台概念
- R language ggplot2 visualization: gganimate package creates animated graph (GIF) and uses anim_ The save function saves the GIF visual animation
- Transplant tinyplay for imx6q development board QT system
- Industrial Internet has greater development potential and more industry scenarios
- 迅为IMX6Q开发板QT系统移植tinyplay
猜你喜欢
![[MySQL from introduction to proficiency] [advanced chapter] (V) SQL statement execution process of MySQL](/img/58/a8dbed993921f372d04f7a1ee19679.png)
[MySQL from introduction to proficiency] [advanced chapter] (V) SQL statement execution process of MySQL

数据中台概念

第十七章 进程内存

商業智能BI財務分析,狹義的財務分析和廣義的財務分析有何不同?

Ruiji takeout notes

STM32F1与STM32CubeIDE编程实例-MAX7219驱动8位7段数码管(基于GPIO)

sql优化之explain

sql优化之查询优化器

Oppo find N2 product form first exposure: supplement all short boards

Codeforce:c. sum of substrings
随机推荐
R language dplyr package summary_ If function calculates the mean and median of all numerical data columns in dataframe data, and summarizes all numerical variables based on conditions
【信息检索】分类和聚类的实验
Solutions to the problems of miui12.5 red rice k20pro using Au or povo2
10.(地图数据篇)离线地形数据处理(供Cesium使用)
R language uses the mutation function of dplyr package to standardize the specified data column (using mean function and SD function), and calculates the grouping mean of the standardized target varia
Digi重启XBee-Pro S2C生产,有些差别需要注意
redis 日常笔记
Xcode abnormal pictures cause IPA packet size problems
利用Shap值进行异常值检测
去除重複字母[貪心+單調棧(用數組+len來維持單調序列)]
Sqlserver functions, creation and use of stored procedures
R language ggplot2 visualization: gganimate package creates animated graph (GIF) and uses anim_ The save function saves the GIF visual animation
R language uses dplyr package group_ The by function and the summarize function calculate the mean and standard deviation of the target variables based on the grouped variables
C # WPF realizes the real-time screen capture function of screen capture box
Real time data warehouse
Opencv3.2 and opencv2.4 installation
Incremental ternary subsequence [greedy training]
【MySQL从入门到精通】【高级篇】(四)MySQL权限管理与控制
NowCoder 反转链表
Docker compose public network deployment redis sentinel mode