当前位置:网站首页>结合案例:Flink框架中的最底层API(ProcessFunction)用法
结合案例:Flink框架中的最底层API(ProcessFunction)用法
2022-07-04 12:46:00 【InfoQ】
概述

ProcessFunctionKeyedProcessFunctionCoProcessFunctionProcessJoinFunctionBroadcastProcessFunctionKeyedBroadcastProcessFunctionProcessWindowFunctionProcessAllWindowFunctionKeyedProcessFunction<K, I, O>
- 数据流中的每一个元素都会调用这个方法,调用结果将会放在 Collector 数据类型中输出。Context 可以访问元素的时间戳,元素的key,以及 TimerService 时间服务。Context 还可以将结果输出到别的流(side outputs)。
processElement(I value, Context ctx, Collector<O> out)
- 当之前注册的定时器触发时调用。参数 timestamp 为定时器所设定的触发的时间戳。Collector为输出结果的集合。OnTimerContext 和processElement 的 Context 参数一样,提供了上下文的一些信息。例如定时器触发的时间信息(事件时间或者处理时间)。
onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
定时器
- 返回当前处理时间
long currentProcessingTime()
- 返回当前 watermark 的时间戳
long currentWatermark()
- 注册当前 key 的processing time 的定时器,当 processing time 到达定时时间时,触发 timer。
void registerProcessingTimeTimer(long timestamp)
- 注册当前 key 的 event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
void registerEventTimeTimer(long timestamp)
- 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行
void deleteProcessingTimeTimer(long timestamp)
- 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。
void deleteEventTimeTimer(long timestamp)
public class ProcessTest1_KeyedProcessFunction {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
//测试keyedprocessFunction 先分组,自定义处理
dataStream.keyBy("id")
.process(new MyProcess())
.print();
env.execute();
}
// 实现自定义处理函数
public static class MyProcess extends KeyedProcessFunction<Tuple,SensorReading,Integer> {
ValueState<Long> tsTimeState;
@Override
public void open(Configuration parameters) throws Exception {
tsTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
"tsTimeState",Long.class
));
}
@Override
public void processElement(SensorReading value, Context ctx, Collector<Integer> out) throws Exception {
out.collect(value.getId().length());
// Context操作
ctx.timestamp();
ctx.getCurrentKey();
// 侧流
//ctx.output();
// 获取当前系统处理时间
ctx.timerService().currentProcessingTime();
// 获取当前事件时间
ctx.timerService().currentWatermark();
// 注册系统处理时间定时器
ctx.timerService().registerProcessingTimeTimer( ctx.timerService().currentProcessingTime() + 1000L);
tsTimeState.update( ctx.timerService().currentProcessingTime() + 1000L);
// 注册事件时间定时器
//ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 10) * 1000L);
// 删除时间
//ctx.timerService().deleteProcessingTimeTimer(tsTimeState.value());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
System.out.println(timestamp+"定时器触发");
ctx.getCurrentKey();
//ctx.output();
ctx.timeDomain();
}
@Override
public void close() throws Exception {
tsTimeState.clear();
}
}
}
public class ProcessTest2_ApplicationCase {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
dataStream.keyBy("id")
.process(new TempConsIncreWarring(10))
.print();
env.execute();
}
// 自定义函数 检测一段时间(时间域)内温度连续上升,输出报警
private static class TempConsIncreWarring extends KeyedProcessFunction<Tuple, SensorReading, String> {
// 定义时间域
private Integer interval;
public TempConsIncreWarring(Integer interval) {
this.interval = interval;
}
// 定义状态,保存上一个温度值,定时器时间戳
private ValueState<Double> lastTempState;
private ValueState<Long> tsTimeState;
@Override
public void open(Configuration parameters) throws Exception {
lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>(
"lastTempState", Double.class, Double.MIN_VALUE));
tsTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
"tsTimeState", Long.class));
}
@Override
public void processElement(SensorReading value, Context ctx, Collector<String> out) throws Exception {
// 取出状态
Double lastTemp = lastTempState.value();
Long tsTime = tsTimeState.value();
// 如果温度上升就要注册10秒后的定时器且在无定时器时,等待
if (value.getTemperature() > lastTemp && tsTime == null) {
// 计算出定时器时间戳
Long ts = ctx.timerService().currentProcessingTime() + interval * 1000L;
// 注册定时器
ctx.timerService().registerProcessingTimeTimer(ts);
// 更新时间状态
tsTimeState.update(ts);
}
// 如果温度下降 需要删除定时器
if (value.getTemperature() < lastTemp && tsTime != null) {
ctx.timerService().deleteProcessingTimeTimer(tsTime);
// 清空时间状态定时器
tsTimeState.clear();
}
// 更新温度状态
lastTempState.update(value.getTemperature());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 定时器触发 输出报局信息
out.collect("传感器"+ ctx.getCurrentKey().getField(0) + "温度值连续"+ interval +"秒一直处于上升");
tsTimeState.clear();
}
@Override
public void close() throws Exception {
lastTempState.clear();
}
}
}

侧流输出
public class ProcessTest3_SideOutputCase {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 定义outputTag 表示低温流
OutputTag lowTemp = new OutputTag<SensorReading>("lowTemp") {
};
// 自定义测输出流实现分流操作
SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>() {
@Override
public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {
// 判断温度大于30 为高温流 输出到主流 低温流输出在侧流
if (value.getTemperature() > 30) {
out.collect(value);
} else {
ctx.output(lowTemp, value);
}
}
});
highTempStream.print("high-Temp");
highTempStream.getSideOutput(lowTemp).print("low");
env.execute();
}
}

边栏推荐
- C#基础补充
- #yyds干货盘点# 解决名企真题:连续最大和
- PostgreSQL 9.1 飞升之路
- 2022年中国移动阅读市场年度综合分析
- Alibaba cloud award winning experience: build a highly available system with polardb-x
- Golang sets the small details of goproxy proxy proxy, which is applicable to go module download timeout and Alibaba cloud image go module download timeout
- 提高MySQL深分页查询效率的三种方案
- 使用宝塔部署halo博客
- .NET 使用 redis
- DGraph: 大规模动态图数据集
猜你喜欢

JVM系列——栈与堆、方法区day1-2
![[AI system frontier dynamics, issue 40] Hinton: my deep learning career and research mind method; Google refutes rumors and gives up tensorflow; The apotheosis framework is officially open source](/img/2c/b1d6277c1b23a6a77f90d5b2874759.png)
[AI system frontier dynamics, issue 40] Hinton: my deep learning career and research mind method; Google refutes rumors and gives up tensorflow; The apotheosis framework is officially open source

.Net之延迟队列

Comparative study of the gods in the twilight Era

室外LED屏幕防水吗?

爬虫练习题(一)

Flet教程之 03 FilledButton基础入门(教程含源码)(教程含源码)

AI painting minimalist tutorial

8个扩展子包!RecBole推出2.0!

源码编译安装MySQL
随机推荐
使用宝塔部署halo博客
C array supplement
Samsung's mass production of 3nm products has attracted the attention of Taiwan media: whether it can improve the input-output rate in the short term is the key to compete with TSMC
c#数组补充
After the game starts, you will be prompted to install HMS core. Click Cancel, and you will not be prompted to install HMS core again (initialization failure returns 907135003)
WPF double slider control and forced capture of mouse event focus
Besides, rsync+inotify realizes real-time backup of data
Xilinx/system-controller-c/boardui/ unable to connect to the development board, the solution of jamming after arbitrary operation
Scrapy 框架学习
Oracle was named the champion of Digital Innovation Award by Ventana research
聊聊支付流程的设计与实现逻辑
Introduction to XML III
面试官:Redis中哈希数据类型的内部实现方式是什么?
基于链表管理的单片机轮询程序框架
CommVault cooperates with Oracle to provide metallic data management as a service on Oracle cloud
When MDK uses precompiler in header file, ifdef is invalid
请问大佬们有遇到这个情况吗,cdc 1.4 连接MySQL 5.7 无法使用 timestamp
C语言集合运算
上汽大通MAXUS正式发布全新品牌“MIFA”,旗舰产品MIFA 9正式亮相!
Building intelligent gray-scale data system from 0 to 1: Taking vivo game center as an example