当前位置:网站首页>结合案例:Flink框架中的最底层API(ProcessFunction)用法
结合案例:Flink框架中的最底层API(ProcessFunction)用法
2022-07-04 12:46:00 【InfoQ】
概述

ProcessFunction
KeyedProcessFunction
CoProcessFunction
ProcessJoinFunction
BroadcastProcessFunction
KeyedBroadcastProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction
KeyedProcessFunction<K, I, O>
- 数据流中的每一个元素都会调用这个方法,调用结果将会放在 Collector 数据类型中输出。Context 可以访问元素的时间戳,元素的key,以及 TimerService 时间服务。Context 还可以将结果输出到别的流(side outputs)。
processElement(I value, Context ctx, Collector<O> out)
- 当之前注册的定时器触发时调用。参数 timestamp 为定时器所设定的触发的时间戳。Collector为输出结果的集合。OnTimerContext 和processElement 的 Context 参数一样,提供了上下文的一些信息。例如定时器触发的时间信息(事件时间或者处理时间)。
onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
定时器
- 返回当前处理时间
long currentProcessingTime()
- 返回当前 watermark 的时间戳
long currentWatermark()
- 注册当前 key 的processing time 的定时器,当 processing time 到达定时时间时,触发 timer。
void registerProcessingTimeTimer(long timestamp)
- 注册当前 key 的 event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
void registerEventTimeTimer(long timestamp)
- 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行
void deleteProcessingTimeTimer(long timestamp)
- 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。
void deleteEventTimeTimer(long timestamp)
public class ProcessTest1_KeyedProcessFunction {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
//测试keyedprocessFunction 先分组,自定义处理
dataStream.keyBy("id")
.process(new MyProcess())
.print();
env.execute();
}
// 实现自定义处理函数
public static class MyProcess extends KeyedProcessFunction<Tuple,SensorReading,Integer> {
ValueState<Long> tsTimeState;
@Override
public void open(Configuration parameters) throws Exception {
tsTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
"tsTimeState",Long.class
));
}
@Override
public void processElement(SensorReading value, Context ctx, Collector<Integer> out) throws Exception {
out.collect(value.getId().length());
// Context操作
ctx.timestamp();
ctx.getCurrentKey();
// 侧流
//ctx.output();
// 获取当前系统处理时间
ctx.timerService().currentProcessingTime();
// 获取当前事件时间
ctx.timerService().currentWatermark();
// 注册系统处理时间定时器
ctx.timerService().registerProcessingTimeTimer( ctx.timerService().currentProcessingTime() + 1000L);
tsTimeState.update( ctx.timerService().currentProcessingTime() + 1000L);
// 注册事件时间定时器
//ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 10) * 1000L);
// 删除时间
//ctx.timerService().deleteProcessingTimeTimer(tsTimeState.value());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
System.out.println(timestamp+"定时器触发");
ctx.getCurrentKey();
//ctx.output();
ctx.timeDomain();
}
@Override
public void close() throws Exception {
tsTimeState.clear();
}
}
}
public class ProcessTest2_ApplicationCase {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
dataStream.keyBy("id")
.process(new TempConsIncreWarring(10))
.print();
env.execute();
}
// 自定义函数 检测一段时间(时间域)内温度连续上升,输出报警
private static class TempConsIncreWarring extends KeyedProcessFunction<Tuple, SensorReading, String> {
// 定义时间域
private Integer interval;
public TempConsIncreWarring(Integer interval) {
this.interval = interval;
}
// 定义状态,保存上一个温度值,定时器时间戳
private ValueState<Double> lastTempState;
private ValueState<Long> tsTimeState;
@Override
public void open(Configuration parameters) throws Exception {
lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>(
"lastTempState", Double.class, Double.MIN_VALUE));
tsTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
"tsTimeState", Long.class));
}
@Override
public void processElement(SensorReading value, Context ctx, Collector<String> out) throws Exception {
// 取出状态
Double lastTemp = lastTempState.value();
Long tsTime = tsTimeState.value();
// 如果温度上升就要注册10秒后的定时器且在无定时器时,等待
if (value.getTemperature() > lastTemp && tsTime == null) {
// 计算出定时器时间戳
Long ts = ctx.timerService().currentProcessingTime() + interval * 1000L;
// 注册定时器
ctx.timerService().registerProcessingTimeTimer(ts);
// 更新时间状态
tsTimeState.update(ts);
}
// 如果温度下降 需要删除定时器
if (value.getTemperature() < lastTemp && tsTime != null) {
ctx.timerService().deleteProcessingTimeTimer(tsTime);
// 清空时间状态定时器
tsTimeState.clear();
}
// 更新温度状态
lastTempState.update(value.getTemperature());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 定时器触发 输出报局信息
out.collect("传感器"+ ctx.getCurrentKey().getField(0) + "温度值连续"+ interval +"秒一直处于上升");
tsTimeState.clear();
}
@Override
public void close() throws Exception {
lastTempState.clear();
}
}
}

侧流输出
public class ProcessTest3_SideOutputCase {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 定义outputTag 表示低温流
OutputTag lowTemp = new OutputTag<SensorReading>("lowTemp") {
};
// 自定义测输出流实现分流操作
SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>() {
@Override
public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {
// 判断温度大于30 为高温流 输出到主流 低温流输出在侧流
if (value.getTemperature() > 30) {
out.collect(value);
} else {
ctx.output(lowTemp, value);
}
}
});
highTempStream.print("high-Temp");
highTempStream.getSideOutput(lowTemp).print("low");
env.execute();
}
}

边栏推荐
- Using nsproxy to forward messages
- Rsyslog配置及使用教程
- Configure WebDAV server on Apache
- 实时云交互如何助力教育行业发展
- SQL language
- Excuse me, have you encountered this situation? CDC 1.4 cannot use timestamp when connecting to MySQL 5.7
- HAProxy高可用解决方案
- 8 expansion sub packages! Recbole launches 2.0!
- Efficient! Build FTP working environment with virtual users
- C array supplement
猜你喜欢
动画与过渡效果
洞见科技解决方案总监薛婧:联邦学习助力数据要素安全流通
CANN算子:利用迭代器高效实现Tensor数据切割分块处理
Xue Jing, director of insight technology solutions: Federal learning helps secure the flow of data elements
字节面试算法题
聊聊支付流程的设计与实现逻辑
7 月数据库排行榜:MongoDB 和 Oracle 分数下降最多
Oracle 被 Ventana Research 评为数字创新奖总冠军
【AI系统前沿动态第40期】Hinton:我的深度学习生涯与研究心法;Google辟谣放弃TensorFlow;封神框架正式开源
Practice: fabric user certificate revocation operation process
随机推荐
CVPR 2022 | transfusion: Lidar camera fusion for 3D target detection with transformer
8个扩展子包!RecBole推出2.0!
爬虫练习题(一)
动画与过渡效果
Personalized online cloud database hybrid optimization system | SIGMOD 2022 selected papers interpretation
聊聊支付流程的设计与实现逻辑
[cloud native | kubernetes] in depth understanding of ingress (12)
Personalized online cloud database hybrid optimization system | SIGMOD 2022 selected papers interpretation
Oracle 被 Ventana Research 评为数字创新奖总冠军
AI painting minimalist tutorial
诸神黄昏时代的对比学习
After the game starts, you will be prompted to install HMS core. Click Cancel, and you will not be prompted to install HMS core again (initialization failure returns 907135003)
MySQL three-level distribution agent relationship storage
Optional values and functions of the itemized contenttype parameter in the request header
Web knowledge supplement
WPF double slider control and forced capture of mouse event focus
Node mongodb installation
C语言集合运算
Web知识补充
Alibaba cloud award winning experience: build a highly available system with polardb-x