当前位置:网站首页>Combined with case: the usage of the lowest API (processfunction) in Flink framework
Combined with case: the usage of the lowest API (processfunction) in Flink framework
2022-07-04 14:32:00 【InfoQ】
summary

ProcessFunctionKeyedProcessFunctionCoProcessFunctionProcessJoinFunctionBroadcastProcessFunctionKeyedBroadcastProcessFunctionProcessWindowFunctionProcessAllWindowFunctionKeyedProcessFunction<K, I, O>
- This method is called by every element in the data flow , The result of the call will be placed in Collector The output type in the data .Context The timestamp that can access the element , Elemental key, as well as TimerService Time service .Context You can also output the results to other streams (side outputs).
processElement(I value, Context ctx, Collector<O> out)
- Called when a previously registered timer triggers . Parameters timestamp The trigger time stamp set for the timer .Collector Is the set of output results .OnTimerContext and processElement Of Context Same parameter , Provides some information about the context . For example, timer triggered time information ( Event time or processing time ).
onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
Timer
- Returns the current processing time
long currentProcessingTime()
- Returns the current watermark The timestamp
long currentWatermark()
- Register current key Of processing time Timer for , When processing time When the time comes , Trigger timer.
void registerProcessingTimeTimer(long timestamp)
- Register current key Of event time Timer . When the water level is greater than or equal to the timer registration time , Trigger timer to execute callback function .
void registerEventTimeTimer(long timestamp)
- Before deleting the register processing time timer . If there is no timer for this timestamp , Do not perform
void deleteProcessingTimeTimer(long timestamp)
- Delete previously registered event time timers , If there is no timer for this timestamp , Do not perform .
void deleteEventTimeTimer(long timestamp)
public class ProcessTest1_KeyedProcessFunction {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// test keyedprocessFunction Group first , Custom processing
dataStream.keyBy("id")
.process(new MyProcess())
.print();
env.execute();
}
// Implement custom processing functions
public static class MyProcess extends KeyedProcessFunction<Tuple,SensorReading,Integer> {
ValueState<Long> tsTimeState;
@Override
public void open(Configuration parameters) throws Exception {
tsTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
"tsTimeState",Long.class
));
}
@Override
public void processElement(SensorReading value, Context ctx, Collector<Integer> out) throws Exception {
out.collect(value.getId().length());
// Context operation
ctx.timestamp();
ctx.getCurrentKey();
// Sidestream
//ctx.output();
// Get the current system processing time
ctx.timerService().currentProcessingTime();
// Get the current event time
ctx.timerService().currentWatermark();
// Register the system processing time timer
ctx.timerService().registerProcessingTimeTimer( ctx.timerService().currentProcessingTime() + 1000L);
tsTimeState.update( ctx.timerService().currentProcessingTime() + 1000L);
// Register the event time timer
//ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 10) * 1000L);
// Delete time
//ctx.timerService().deleteProcessingTimeTimer(tsTimeState.value());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
System.out.println(timestamp+" Timer triggered ");
ctx.getCurrentKey();
//ctx.output();
ctx.timeDomain();
}
@Override
public void close() throws Exception {
tsTimeState.clear();
}
}
}
public class ProcessTest2_ApplicationCase {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
dataStream.keyBy("id")
.process(new TempConsIncreWarring(10))
.print();
env.execute();
}
// Custom function Test for a period of time ( Time domain ) The internal temperature rises continuously , Output alarm
private static class TempConsIncreWarring extends KeyedProcessFunction<Tuple, SensorReading, String> {
// Define time fields
private Integer interval;
public TempConsIncreWarring(Integer interval) {
this.interval = interval;
}
// Define the State , Save the last temperature value , Timer timestamp
private ValueState<Double> lastTempState;
private ValueState<Long> tsTimeState;
@Override
public void open(Configuration parameters) throws Exception {
lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>(
"lastTempState", Double.class, Double.MIN_VALUE));
tsTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
"tsTimeState", Long.class));
}
@Override
public void processElement(SensorReading value, Context ctx, Collector<String> out) throws Exception {
// Removal status
Double lastTemp = lastTempState.value();
Long tsTime = tsTimeState.value();
// Register if the temperature rises 10 Seconds later and when there is no timer , wait for
if (value.getTemperature() > lastTemp && tsTime == null) {
// Calculate the timer timestamp
Long ts = ctx.timerService().currentProcessingTime() + interval * 1000L;
// Register timer
ctx.timerService().registerProcessingTimeTimer(ts);
// Update time status
tsTimeState.update(ts);
}
// If the temperature drops The timer needs to be deleted
if (value.getTemperature() < lastTemp && tsTime != null) {
ctx.timerService().deleteProcessingTimeTimer(tsTime);
// Clear the time status timer
tsTimeState.clear();
}
// Update temperature status
lastTempState.update(value.getTemperature());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// Timer triggered Output office reporting information
out.collect(" sensor "+ ctx.getCurrentKey().getField(0) + " The temperature value is continuous "+ interval +" Seconds have been rising ");
tsTimeState.clear();
}
@Override
public void close() throws Exception {
lastTempState.clear();
}
}
}

Side flow output
public class ProcessTest3_SideOutputCase {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// Definition outputTag Indicates low temperature flow
OutputTag lowTemp = new OutputTag<SensorReading>("lowTemp") {
};
// Customize the measurement output flow to realize shunting operation
SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>() {
@Override
public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {
// Judge that the temperature is greater than 30 High temperature flow Output to the mainstream The low temperature flow is output in the side flow
if (value.getTemperature() > 30) {
out.collect(value);
} else {
ctx.output(lowTemp, value);
}
}
});
highTempStream.print("high-Temp");
highTempStream.getSideOutput(lowTemp).print("low");
env.execute();
}
}

边栏推荐
- 【信息检索】链接分析
- scratch古堡历险记 电子学会图形化编程scratch等级考试三级真题和答案解析2022年6月
- Chapter 17 process memory
- 聊聊保证线程安全的 10 个小技巧
- Ruiji takeout notes
- flink sql-client.sh 使用教程
- 92. (cesium chapter) cesium building layering
- redis 日常笔记
- 【MySQL从入门到精通】【高级篇】(四)MySQL权限管理与控制
- [MySQL from introduction to proficiency] [advanced chapter] (IV) MySQL permission management and control
猜你喜欢

The implementation of OSD on rk1126 platform supports color translucency and multi-channel support for Chinese

leetcode:6110. 网格图中递增路径的数目【dfs + cache】

PyTorch的自动求导机制详细解析,PyTorch的核心魔法

数据湖(十三):Spark与Iceberg整合DDL操作

迅为IMX6Q开发板QT系统移植tinyplay

Count the running time of PHP program and set the maximum running time of PHP

No servers available for service: xxxx

软件测试之测试评估

统计php程序运行时间及设置PHP最长运行时间

Talk about 10 tips to ensure thread safety
随机推荐
R language uses dplyr package group_ The by function and the summarize function calculate the mean and standard deviation of the target variables based on the grouped variables
LVGL 8.2 Line wrap, recoloring and scrolling
ML之shap:基于boston波士顿房价回归预测数据集利用shap值对XGBoost模型实现可解释性案例
去除重複字母[貪心+單調棧(用數組+len來維持單調序列)]
Intelligence d'affaires bi analyse financière, analyse financière au sens étroit et analyse financière au sens large sont - ils différents?
leetcode:6110. The number of incremental paths in the grid graph [DFS + cache]
开发中常见问题总结
(1) The standard of performance tuning and the correct posture for tuning - if you have performance problems, go to the heapdump performance community!
Supprimer les lettres dupliquées [avidité + pile monotone (maintenir la séquence monotone avec un tableau + Len)]
關於miui12.5 紅米k20pro用au或者povo2出現問題的解决辦法
【云原生】我怎么会和这个数据库杠上了?
Leetcode T49: 字母异位词分组
The failure rate is as high as 80%. What are the challenges on the way of enterprise digital transformation?
92.(cesium篇)cesium楼栋分层
opencv3.2 和opencv2.4安装
Chapter 17 process memory
LiveData
Leetcode t49: grouping of alphabetic words
Talk about 10 tips to ensure thread safety
Explain of SQL optimization