当前位置:网站首页>One of the Flink requirements - sideoutput (Application of side output flow: output the temperature higher than 30 ℃ to the mainstream, and output the temperature lower than 30 ℃ to the side flow)
One of the Flink requirements - sideoutput (Application of side output flow: output the temperature higher than 30 ℃ to the mainstream, and output the temperature lower than 30 ℃ to the side flow)
2022-07-27 00:59:00 【A photographer who can't play is not a good programmer】
1. First, let's introduce the output stream on the lower side (SideOutPut)
Most of DataStream API The output of the operator of is a single output , It's a stream of some data type . except split operator , You can divide a stream into multiple streams , These streams have the same data type .process function Of side outputs Function can generate multiple streams , And the data types of these streams can be different . One side output Can be defined as OutputTag[X] object ,X Is the data type of the output stream .process function Can pass Context Objects emit an event to one or more side outputs.
2. The realization of requirements
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector
import java.util.Properties
object SideOutPutTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.enableCheckpointing(1000L)
// use kafka As a data source
val properties = new Properties()
properties.setProperty("bootstrap.servers", "hadoop101:9092")
val data = env.addSource(new FlinkKafkaConsumer[String]("sensor",new SimpleStringSchema(),properties))
val dataStream = data.map(x => {
val arr = x.split(",")
SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
})
val highTempStream = dataStream
.process( new SplitTempProcessor(30.0) )
highTempStream.print("high")
highTempStream.getSideOutput(new OutputTag[(String,Long,Double)]("low")).print("low")
env.execute()
}
}
class SplitTempProcessor(threshold: Double) extends ProcessFunction[SensorReading,SensorReading] {
override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = {
if(value.temperature > threshold){
// If the current temperature value is greater than 30 Celsius output to the mainstream
out.collect(value)
}else{
// If not more than 10 Centigrade , Output to side output stream
ctx.output(new OutputTag[(String,Long,Double)]("low"),(value.id,value.timestamp,value.temperature))
}
}
}
3. Result display

Explain , Why? high and low Different types of output , Because of the custom SplitTempProcessor In the realization of ProcessFunction The output type of the side output stream is customized , Similarly, you can continue to use the sample class without defining . The custom data type is used here to illustrate ProcessFunction It can realize more complex business logic .
边栏推荐
猜你喜欢
![[Network Research Institute] attackers scan 1.6 million WordPress websites to find vulnerable plug-ins](/img/91/4d6e7d46599a67e3d7c73afb375abd.png)
[Network Research Institute] attackers scan 1.6 million WordPress websites to find vulnerable plug-ins

2022.7.13

14 web vulnerability: types of SQL injection and submission injection
![[WUSTCTF2020]CV Maker](/img/64/06023938e83acc832f06733b6c4d63.png)
[WUSTCTF2020]CV Maker

Flink 1.15实现 Sql 脚本从savepointh恢复数据

数据仓库知识点
![[BJDCTF2020]EzPHP](/img/be/a48a1a9147f1f3b21ef2d60fc1f59f.png)
[BJDCTF2020]EzPHP

flink需求之—ProcessFunction(需求:如果30秒内温度连续上升就报警)

Valueerror: the device should not be 'GPU', since paddepaddle is not compiled with CUDA

DOM day_ 04 (7.12) BOM, open new page (delayed opening), address bar operation, browser information reading, historical operation
随机推荐
Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=
Flask学习最佳入门指南
Point to plane projection
JSCORE day_ 03(7.4)
[NPUCTF2020]ezinclude
Leetcode 301 week
[NCTF2019]SQLi
[ciscn2019 North China Day1 web5] cyberpunk
10个Web API
2022.DAY600
BUUCTF-随便注、Exec、EasySQL、Secret File
Canal installation
JS screen detection method summary 2021-10-05
Flink 1.15 implements SQL script to recover data from savepointh
通过FlinkCDC将MySQL中变更的数据写入到kafka(DataStream方式)
10 - CentOS 7 上部署MySql
[SQL注入] 扩展注入手法
MYSQL数据库事务的隔离级别(详解)
DOM day_ 04 (7.12) BOM, open new page (delayed opening), address bar operation, browser information reading, historical operation
CUDA version difference between NVIDIA SMI and nvcc -v