当前位置:网站首页>结合案例:Flink框架中的最底层API(ProcessFunction)用法
结合案例:Flink框架中的最底层API(ProcessFunction)用法
2022-07-04 12:46:00 【InfoQ】
概述

ProcessFunction
KeyedProcessFunction
CoProcessFunction
ProcessJoinFunction
BroadcastProcessFunction
KeyedBroadcastProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction
KeyedProcessFunction<K, I, O>
- 数据流中的每一个元素都会调用这个方法,调用结果将会放在 Collector 数据类型中输出。Context 可以访问元素的时间戳,元素的key,以及 TimerService 时间服务。Context 还可以将结果输出到别的流(side outputs)。
processElement(I value, Context ctx, Collector<O> out)
- 当之前注册的定时器触发时调用。参数 timestamp 为定时器所设定的触发的时间戳。Collector为输出结果的集合。OnTimerContext 和processElement 的 Context 参数一样,提供了上下文的一些信息。例如定时器触发的时间信息(事件时间或者处理时间)。
onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
定时器
- 返回当前处理时间
long currentProcessingTime()
- 返回当前 watermark 的时间戳
long currentWatermark()
- 注册当前 key 的processing time 的定时器,当 processing time 到达定时时间时,触发 timer。
void registerProcessingTimeTimer(long timestamp)
- 注册当前 key 的 event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
void registerEventTimeTimer(long timestamp)
- 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行
void deleteProcessingTimeTimer(long timestamp)
- 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。
void deleteEventTimeTimer(long timestamp)
public class ProcessTest1_KeyedProcessFunction {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
//测试keyedprocessFunction 先分组,自定义处理
dataStream.keyBy("id")
.process(new MyProcess())
.print();
env.execute();
}
// 实现自定义处理函数
public static class MyProcess extends KeyedProcessFunction<Tuple,SensorReading,Integer> {
ValueState<Long> tsTimeState;
@Override
public void open(Configuration parameters) throws Exception {
tsTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
"tsTimeState",Long.class
));
}
@Override
public void processElement(SensorReading value, Context ctx, Collector<Integer> out) throws Exception {
out.collect(value.getId().length());
// Context操作
ctx.timestamp();
ctx.getCurrentKey();
// 侧流
//ctx.output();
// 获取当前系统处理时间
ctx.timerService().currentProcessingTime();
// 获取当前事件时间
ctx.timerService().currentWatermark();
// 注册系统处理时间定时器
ctx.timerService().registerProcessingTimeTimer( ctx.timerService().currentProcessingTime() + 1000L);
tsTimeState.update( ctx.timerService().currentProcessingTime() + 1000L);
// 注册事件时间定时器
//ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 10) * 1000L);
// 删除时间
//ctx.timerService().deleteProcessingTimeTimer(tsTimeState.value());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
System.out.println(timestamp+"定时器触发");
ctx.getCurrentKey();
//ctx.output();
ctx.timeDomain();
}
@Override
public void close() throws Exception {
tsTimeState.clear();
}
}
}
public class ProcessTest2_ApplicationCase {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
dataStream.keyBy("id")
.process(new TempConsIncreWarring(10))
.print();
env.execute();
}
// 自定义函数 检测一段时间(时间域)内温度连续上升,输出报警
private static class TempConsIncreWarring extends KeyedProcessFunction<Tuple, SensorReading, String> {
// 定义时间域
private Integer interval;
public TempConsIncreWarring(Integer interval) {
this.interval = interval;
}
// 定义状态,保存上一个温度值,定时器时间戳
private ValueState<Double> lastTempState;
private ValueState<Long> tsTimeState;
@Override
public void open(Configuration parameters) throws Exception {
lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>(
"lastTempState", Double.class, Double.MIN_VALUE));
tsTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
"tsTimeState", Long.class));
}
@Override
public void processElement(SensorReading value, Context ctx, Collector<String> out) throws Exception {
// 取出状态
Double lastTemp = lastTempState.value();
Long tsTime = tsTimeState.value();
// 如果温度上升就要注册10秒后的定时器且在无定时器时,等待
if (value.getTemperature() > lastTemp && tsTime == null) {
// 计算出定时器时间戳
Long ts = ctx.timerService().currentProcessingTime() + interval * 1000L;
// 注册定时器
ctx.timerService().registerProcessingTimeTimer(ts);
// 更新时间状态
tsTimeState.update(ts);
}
// 如果温度下降 需要删除定时器
if (value.getTemperature() < lastTemp && tsTime != null) {
ctx.timerService().deleteProcessingTimeTimer(tsTime);
// 清空时间状态定时器
tsTimeState.clear();
}
// 更新温度状态
lastTempState.update(value.getTemperature());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 定时器触发 输出报局信息
out.collect("传感器"+ ctx.getCurrentKey().getField(0) + "温度值连续"+ interval +"秒一直处于上升");
tsTimeState.clear();
}
@Override
public void close() throws Exception {
lastTempState.clear();
}
}
}

侧流输出
public class ProcessTest3_SideOutputCase {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 定义outputTag 表示低温流
OutputTag lowTemp = new OutputTag<SensorReading>("lowTemp") {
};
// 自定义测输出流实现分流操作
SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>() {
@Override
public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {
// 判断温度大于30 为高温流 输出到主流 低温流输出在侧流
if (value.getTemperature() > 30) {
out.collect(value);
} else {
ctx.output(lowTemp, value);
}
}
});
highTempStream.print("high-Temp");
highTempStream.getSideOutput(lowTemp).print("low");
env.execute();
}
}

边栏推荐
- The only core indicator of high-quality software architecture
- WPF double slider control and forced capture of mouse event focus
- MySQL45讲——学习极客时间MySQL实战45讲笔记—— 06 | 全局锁和表锁_给表加个字段怎么有这么多阻碍
- Don't turn down, three sentences to clarify the origin of cross domain resource request errors
- After the game starts, you will be prompted to install HMS core. Click Cancel, and you will not be prompted to install HMS core again (initialization failure returns 907135003)
- JVM series - stack and heap, method area day1-2
- 2022年中国移动阅读市场年度综合分析
- XML入门一
- [AI system frontier dynamics, issue 40] Hinton: my deep learning career and research mind method; Google refutes rumors and gives up tensorflow; The apotheosis framework is officially open source
- 数据库公共字段自动填充
猜你喜欢
字节面试算法题
It is six orders of magnitude faster than the quantum chemical method. An adiabatic artificial neural network method based on adiabatic state can accelerate the simulation of dual nitrogen benzene der
Etcd storage, watch and expiration mechanism
Three schemes to improve the efficiency of MySQL deep paging query
Haproxy high availability solution
Database lock table? Don't panic, this article teaches you how to solve it
分布式BASE理论
基于链表管理的单片机轮询程序框架
Building intelligent gray-scale data system from 0 to 1: Taking vivo game center as an example
Building intelligent gray-scale data system from 0 to 1: Taking vivo game center as an example
随机推荐
【AI系统前沿动态第40期】Hinton:我的深度学习生涯与研究心法;Google辟谣放弃TensorFlow;封神框架正式开源
聊聊支付流程的设计与实现逻辑
源码编译安装MySQL
Zhongang Mining: in order to ensure sufficient supply of fluorite, it is imperative to open source and save flow
7 月数据库排行榜:MongoDB 和 Oracle 分数下降最多
Scripy framework learning
分布式BASE理论
C语言集合运算
CTF competition problem solution STM32 reverse introduction
Using scrcpy projection
[cloud native | kubernetes] in depth understanding of ingress (12)
Building intelligent gray-scale data system from 0 to 1: Taking vivo game center as an example
模块化笔记软件综合评测:Craft、Notion、FlowUs
C语言职工管理系统
Three schemes to improve the efficiency of MySQL deep paging query
Don't turn down, three sentences to clarify the origin of cross domain resource request errors
C语言个人通讯录管理系统
Iptables foundation and Samba configuration examples
XML入门二
Runc hang causes the kubernetes node notready