当前位置:网站首页>Combined with case: the usage of the lowest API (processfunction) in Flink framework
Combined with case: the usage of the lowest API (processfunction) in Flink framework
2022-07-04 14:32:00 【InfoQ】
summary

ProcessFunction
KeyedProcessFunction
CoProcessFunction
ProcessJoinFunction
BroadcastProcessFunction
KeyedBroadcastProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction
KeyedProcessFunction<K, I, O>
- This method is called by every element in the data flow , The result of the call will be placed in Collector The output type in the data .Context The timestamp that can access the element , Elemental key, as well as TimerService Time service .Context You can also output the results to other streams (side outputs).
processElement(I value, Context ctx, Collector<O> out)
- Called when a previously registered timer triggers . Parameters timestamp The trigger time stamp set for the timer .Collector Is the set of output results .OnTimerContext and processElement Of Context Same parameter , Provides some information about the context . For example, timer triggered time information ( Event time or processing time ).
onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
Timer
- Returns the current processing time
long currentProcessingTime()
- Returns the current watermark The timestamp
long currentWatermark()
- Register current key Of processing time Timer for , When processing time When the time comes , Trigger timer.
void registerProcessingTimeTimer(long timestamp)
- Register current key Of event time Timer . When the water level is greater than or equal to the timer registration time , Trigger timer to execute callback function .
void registerEventTimeTimer(long timestamp)
- Before deleting the register processing time timer . If there is no timer for this timestamp , Do not perform
void deleteProcessingTimeTimer(long timestamp)
- Delete previously registered event time timers , If there is no timer for this timestamp , Do not perform .
void deleteEventTimeTimer(long timestamp)
public class ProcessTest1_KeyedProcessFunction {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// test keyedprocessFunction Group first , Custom processing
dataStream.keyBy("id")
.process(new MyProcess())
.print();
env.execute();
}
// Implement custom processing functions
public static class MyProcess extends KeyedProcessFunction<Tuple,SensorReading,Integer> {
ValueState<Long> tsTimeState;
@Override
public void open(Configuration parameters) throws Exception {
tsTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
"tsTimeState",Long.class
));
}
@Override
public void processElement(SensorReading value, Context ctx, Collector<Integer> out) throws Exception {
out.collect(value.getId().length());
// Context operation
ctx.timestamp();
ctx.getCurrentKey();
// Sidestream
//ctx.output();
// Get the current system processing time
ctx.timerService().currentProcessingTime();
// Get the current event time
ctx.timerService().currentWatermark();
// Register the system processing time timer
ctx.timerService().registerProcessingTimeTimer( ctx.timerService().currentProcessingTime() + 1000L);
tsTimeState.update( ctx.timerService().currentProcessingTime() + 1000L);
// Register the event time timer
//ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 10) * 1000L);
// Delete time
//ctx.timerService().deleteProcessingTimeTimer(tsTimeState.value());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
System.out.println(timestamp+" Timer triggered ");
ctx.getCurrentKey();
//ctx.output();
ctx.timeDomain();
}
@Override
public void close() throws Exception {
tsTimeState.clear();
}
}
}
public class ProcessTest2_ApplicationCase {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
dataStream.keyBy("id")
.process(new TempConsIncreWarring(10))
.print();
env.execute();
}
// Custom function Test for a period of time ( Time domain ) The internal temperature rises continuously , Output alarm
private static class TempConsIncreWarring extends KeyedProcessFunction<Tuple, SensorReading, String> {
// Define time fields
private Integer interval;
public TempConsIncreWarring(Integer interval) {
this.interval = interval;
}
// Define the State , Save the last temperature value , Timer timestamp
private ValueState<Double> lastTempState;
private ValueState<Long> tsTimeState;
@Override
public void open(Configuration parameters) throws Exception {
lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>(
"lastTempState", Double.class, Double.MIN_VALUE));
tsTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
"tsTimeState", Long.class));
}
@Override
public void processElement(SensorReading value, Context ctx, Collector<String> out) throws Exception {
// Removal status
Double lastTemp = lastTempState.value();
Long tsTime = tsTimeState.value();
// Register if the temperature rises 10 Seconds later and when there is no timer , wait for
if (value.getTemperature() > lastTemp && tsTime == null) {
// Calculate the timer timestamp
Long ts = ctx.timerService().currentProcessingTime() + interval * 1000L;
// Register timer
ctx.timerService().registerProcessingTimeTimer(ts);
// Update time status
tsTimeState.update(ts);
}
// If the temperature drops The timer needs to be deleted
if (value.getTemperature() < lastTemp && tsTime != null) {
ctx.timerService().deleteProcessingTimeTimer(tsTime);
// Clear the time status timer
tsTimeState.clear();
}
// Update temperature status
lastTempState.update(value.getTemperature());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// Timer triggered Output office reporting information
out.collect(" sensor "+ ctx.getCurrentKey().getField(0) + " The temperature value is continuous "+ interval +" Seconds have been rising ");
tsTimeState.clear();
}
@Override
public void close() throws Exception {
lastTempState.clear();
}
}
}

Side flow output
public class ProcessTest3_SideOutputCase {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// Definition outputTag Indicates low temperature flow
OutputTag lowTemp = new OutputTag<SensorReading>("lowTemp") {
};
// Customize the measurement output flow to realize shunting operation
SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>() {
@Override
public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {
// Judge that the temperature is greater than 30 High temperature flow Output to the mainstream The low temperature flow is output in the side flow
if (value.getTemperature() > 30) {
out.collect(value);
} else {
ctx.output(lowTemp, value);
}
}
});
highTempStream.print("high-Temp");
highTempStream.getSideOutput(lowTemp).print("low");
env.execute();
}
}

边栏推荐
- R language uses follow up of epidisplay package The plot function visualizes the longitudinal follow-up map of multiple ID (case) monitoring indicators, and uses stress The col parameter specifies the
- 關於miui12.5 紅米k20pro用au或者povo2出現問題的解决辦法
- 第十六章 字符串本地化和消息字典(二)
- Abnormal value detection using shap value
- 电商系统中红包活动设计
- Respect others' behavior
- Leetcode T48: rotating images
- flink sql-client. SH tutorial
- Excel quickly merges multiple rows of data
- 数据埋点的一些问题和想法
猜你喜欢
Test process arrangement (3)
Red envelope activity design in e-commerce system
docker-compose公网部署redis哨兵模式
Count the running time of PHP program and set the maximum running time of PHP
Xcode abnormal pictures cause IPA packet size problems
Explain of SQL optimization
92.(cesium篇)cesium楼栋分层
数据中台概念
10.(地图数据篇)离线地形数据处理(供Cesium使用)
Rich text editing: wangeditor tutorial
随机推荐
商業智能BI財務分析,狹義的財務分析和廣義的財務分析有何不同?
Chapter 17 process memory
gin集成支付宝支付
[algorithm leetcode] interview question 04.03 Specific depth node linked list (Multilingual Implementation)
软件测试之测试评估
【MySQL从入门到精通】【高级篇】(四)MySQL权限管理与控制
架构方面的进步
sql优化之explain
How to package QT and share exe
flink sql-client. SH tutorial
flink sql-client.sh 使用教程
leetcode:6109. 知道秘密的人数【dp的定义】
Redis daily notes
R language uses bwplot function in lattice package to visualize box plot and par Settings parameter custom theme mode
数据湖(十三):Spark与Iceberg整合DDL操作
Why should Base64 encoding be used for image transmission
一文概览2D人体姿态估计
Detailed explanation of visual studio debugging methods
Practical puzzle solving | how to extract irregular ROI regions in opencv
Leetcode 61: 旋转链表