当前位置:网站首页>结合案例:Flink框架中的最底层API(ProcessFunction)用法

结合案例:Flink框架中的最底层API(ProcessFunction)用法

2022-07-04 12:46:00 InfoQ

概述

在之前总结的文章中有提到过,Flink框架提供了三层API完成流处理任务。至此已经学习了DataStream API ,ProcessFunction API 是Flink中最底层的API,可以访问时间戳、watermark 以及注册定时事件。还可以输出特定的一些事件。、

null
Process Function 用来构建事件驱动的应用以及实现自定义的业务逻辑,若窗口函数以及转换算子都无法满足业务的要求时,需要请出ProcessFunction 去完成开发任务。Flink SQL 就是使用 Process Function 实现的。

Flink 提供了 8 个 Process Function如下:
ProcessFunction
KeyedProcessFunction
CoProcessFunction
ProcessJoinFunction
BroadcastProcessFunction
KeyedBroadcastProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction
。接下来我们以KeyedProcessFunction为例来进行学习。

KeyedProcessFunction<K, I, O>

它主要用来操作KeyedStream,会处理流的每一个元素,输出为 0 个、1 个或者多个元素。所有的 Process Function 都继承自RichFunction 接口,所以都有 open()、close()和 getRuntimeContext()等方法。除此之外还提供了两个方法:

  • 数据流中的每一个元素都会调用这个方法,调用结果将会放在 Collector 数据类型中输出。Context 可以访问元素的时间戳,元素的key,以及 TimerService 时间服务。Context 还可以将结果输出到别的流(side outputs)。

processElement(I value, Context ctx, Collector<O> out)

  • 当之前注册的定时器触发时调用。参数 timestamp 为定时器所设定的触发的时间戳。Collector为输出结果的集合。OnTimerContext 和processElement 的 Context 参数一样,提供了上下文的一些信息。例如定时器触发的时间信息(事件时间或者处理时间)。

onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) 

定时器

Context 中 TimerService对象方汇总:

  • 返回当前处理时间

long currentProcessingTime() 

  • 返回当前 watermark 的时间戳

long currentWatermark() 

  • 注册当前 key 的processing time 的定时器,当 processing time 到达定时时间时,触发 timer。

void registerProcessingTimeTimer(long timestamp)

  • 注册当前 key 的 event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。

void registerEventTimeTimer(long timestamp) 

  • 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行

void deleteProcessingTimeTimer(long timestamp)

  • 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。

void deleteEventTimeTimer(long timestamp)

测试代码:

public class ProcessTest1_KeyedProcessFunction {
 public static void main(String[] args) throws Exception{
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);

 DataStreamSource<String> inputStream = env.socketTextStream(&quot;localhost&quot;, 7777);
 DataStream<SensorReading> dataStream = inputStream.map(line -> {
 String[] fields = line.split(&quot;,&quot;);
 return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
 });

 //测试keyedprocessFunction 先分组,自定义处理
 dataStream.keyBy(&quot;id&quot;)
 .process(new MyProcess())
 .print();


 env.execute();
 }

 // 实现自定义处理函数
 public static class MyProcess extends KeyedProcessFunction<Tuple,SensorReading,Integer> {

 ValueState<Long> tsTimeState;

 @Override
 public void open(Configuration parameters) throws Exception {
 tsTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
 &quot;tsTimeState&quot;,Long.class
 ));
 }

 @Override
 public void processElement(SensorReading value, Context ctx, Collector<Integer> out) throws Exception {
 out.collect(value.getId().length());

 // Context操作
 ctx.timestamp();
 ctx.getCurrentKey();
 // 侧流
 //ctx.output();
 // 获取当前系统处理时间
 ctx.timerService().currentProcessingTime();
 // 获取当前事件时间
 ctx.timerService().currentWatermark();
 // 注册系统处理时间定时器
 ctx.timerService().registerProcessingTimeTimer( ctx.timerService().currentProcessingTime() + 1000L);
 tsTimeState.update( ctx.timerService().currentProcessingTime() + 1000L);

 // 注册事件时间定时器
 //ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 10) * 1000L);
 // 删除时间
 //ctx.timerService().deleteProcessingTimeTimer(tsTimeState.value());
 }

 @Override
 public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
 System.out.println(timestamp+&quot;定时器触发&quot;);

 ctx.getCurrentKey();
 //ctx.output();
 ctx.timeDomain();
 }

 @Override
 public void close() throws Exception {
 tsTimeState.clear();
 }
 }
}

一个案例:

监控温度传感器的温度值,如果温度值在 10 秒钟之内连续上升,则报警。

public class ProcessTest2_ApplicationCase {
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);

 DataStreamSource<String> inputStream = env.socketTextStream(&quot;localhost&quot;, 7777);
 DataStream<SensorReading> dataStream = inputStream.map(line -> {
 String[] fields = line.split(&quot;,&quot;);
 return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
 });


 dataStream.keyBy(&quot;id&quot;)
 .process(new TempConsIncreWarring(10))
 .print();

 env.execute();
 }

 // 自定义函数 检测一段时间(时间域)内温度连续上升,输出报警
 private static class TempConsIncreWarring extends KeyedProcessFunction<Tuple, SensorReading, String> {
 // 定义时间域
 private Integer interval;

 public TempConsIncreWarring(Integer interval) {
 this.interval = interval;
 }

 // 定义状态,保存上一个温度值,定时器时间戳
 private ValueState<Double> lastTempState;
 private ValueState<Long> tsTimeState;

 @Override
 public void open(Configuration parameters) throws Exception {
 lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>(
 &quot;lastTempState&quot;, Double.class, Double.MIN_VALUE));
 tsTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
 &quot;tsTimeState&quot;, Long.class));
 }

 @Override
 public void processElement(SensorReading value, Context ctx, Collector<String> out) throws Exception {
 // 取出状态
 Double lastTemp = lastTempState.value();
 Long tsTime = tsTimeState.value();

 // 如果温度上升就要注册10秒后的定时器且在无定时器时,等待
 if (value.getTemperature() > lastTemp && tsTime == null) {
 // 计算出定时器时间戳
 Long ts = ctx.timerService().currentProcessingTime() + interval * 1000L;
 // 注册定时器
 ctx.timerService().registerProcessingTimeTimer(ts);
 // 更新时间状态
 tsTimeState.update(ts);
 }
 // 如果温度下降 需要删除定时器
 if (value.getTemperature() < lastTemp && tsTime != null) {
 ctx.timerService().deleteProcessingTimeTimer(tsTime);
 // 清空时间状态定时器
 tsTimeState.clear();
 }

 // 更新温度状态
 lastTempState.update(value.getTemperature());
 }

 @Override
 public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
 // 定时器触发 输出报局信息
 out.collect(&quot;传感器&quot;+ ctx.getCurrentKey().getField(0) + &quot;温度值连续&quot;+ interval +&quot;秒一直处于上升&quot;);
 tsTimeState.clear();
 }

 @Override
 public void close() throws Exception {
 lastTempState.clear();
 }
 }
}

运行结果:

null

侧流输出

侧流输出功能可以产生多条流,并且这些流的数据类型可以不一样。一个侧流可以定义为 OutputTag[X]对象,X 是输出流的数据类型。

一个案例:

监控传感器温度值,将温度值低于 30 度的数据输出到 SideOutput

public class ProcessTest3_SideOutputCase {
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);

 DataStreamSource<String> inputStream = env.socketTextStream(&quot;localhost&quot;, 7777);

 DataStream<SensorReading> dataStream = inputStream.map(line -> {
 String[] fields = line.split(&quot;,&quot;);
 return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
 });


 // 定义outputTag 表示低温流
 OutputTag lowTemp = new OutputTag<SensorReading>(&quot;lowTemp&quot;) {
 };

 // 自定义测输出流实现分流操作
 SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>() {

 @Override
 public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {
 // 判断温度大于30 为高温流 输出到主流 低温流输出在侧流
 if (value.getTemperature() > 30) {
 out.collect(value);
 } else {
 ctx.output(lowTemp, value);
 }
 }
 });

 highTempStream.print(&quot;high-Temp&quot;);
 highTempStream.getSideOutput(lowTemp).print(&quot;low&quot;);


 env.execute();
 }
}

运行结果:

null


原网站

版权声明
本文为[InfoQ]所创,转载请带上原文链接,感谢
https://xie.infoq.cn/article/713e66fd3c9b00533fbf3d3fa