当前位置:网站首页>One of the Flink requirements - sideoutput (Application of side output flow: output the temperature higher than 30 ℃ to the mainstream, and output the temperature lower than 30 ℃ to the side flow)
One of the Flink requirements - sideoutput (Application of side output flow: output the temperature higher than 30 ℃ to the mainstream, and output the temperature lower than 30 ℃ to the side flow)
2022-07-27 00:59:00 【A photographer who can't play is not a good programmer】
1. First, let's introduce the output stream on the lower side (SideOutPut)
Most of DataStream API The output of the operator of is a single output , It's a stream of some data type . except split operator , You can divide a stream into multiple streams , These streams have the same data type .process function Of side outputs Function can generate multiple streams , And the data types of these streams can be different . One side output Can be defined as OutputTag[X] object ,X Is the data type of the output stream .process function Can pass Context Objects emit an event to one or more side outputs.
2. The realization of requirements
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector
import java.util.Properties
object SideOutPutTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.enableCheckpointing(1000L)
// use kafka As a data source
val properties = new Properties()
properties.setProperty("bootstrap.servers", "hadoop101:9092")
val data = env.addSource(new FlinkKafkaConsumer[String]("sensor",new SimpleStringSchema(),properties))
val dataStream = data.map(x => {
val arr = x.split(",")
SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
})
val highTempStream = dataStream
.process( new SplitTempProcessor(30.0) )
highTempStream.print("high")
highTempStream.getSideOutput(new OutputTag[(String,Long,Double)]("low")).print("low")
env.execute()
}
}
class SplitTempProcessor(threshold: Double) extends ProcessFunction[SensorReading,SensorReading] {
override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = {
if(value.temperature > threshold){
// If the current temperature value is greater than 30 Celsius output to the mainstream
out.collect(value)
}else{
// If not more than 10 Centigrade , Output to side output stream
ctx.output(new OutputTag[(String,Long,Double)]("low"),(value.id,value.timestamp,value.temperature))
}
}
}
3. Result display

Explain , Why? high and low Different types of output , Because of the custom SplitTempProcessor In the realization of ProcessFunction The output type of the side output stream is customized , Similarly, you can continue to use the sample class without defining . The custom data type is used here to illustrate ProcessFunction It can realize more complex business logic .
边栏推荐
- 通过FlinkCDC将MySQL中变更的数据写入到kafka(DataStream方式)
- Flink的容错机制(checkpoint)
- 2022.7.13
- Golang切片make与new的区别
- 07 - 日志服务器的搭建与攻击
- [Network Research Institute] attackers scan 1.6 million WordPress websites to find vulnerable plug-ins
- Flink 1.15 implements SQL script to recover data from savepointh
- Flink面试常见的25个问题(无答案)
- MYSQL分表DDL操作(存储过程)
- JSCORE day_ 04(7.5)
猜你喜欢
![[ciscn2019 finals Day2 web1]easyweb](/img/36/1ca4b6cae4e0dda0916b511d4bcd9f.png)
[ciscn2019 finals Day2 web1]easyweb

flink需求之—SideOutPut(侧输出流的应用:将温度大于30℃的输出到主流,低于30℃的输出到侧流)
![[NPUCTF2020]ezinclude](/img/24/ee1a6d49a74ce09ec721c1a3b5dce4.png)
[NPUCTF2020]ezinclude

Checked status in El checkbox 2021-08-02
![[watevrCTF-2019]Cookie Store](/img/24/8baaa1ac9daa62c641472d5efac895.png)
[watevrCTF-2019]Cookie Store

Valueerror: the device should not be 'GPU', since paddepaddle is not compiled with CUDA
![[BJDCTF2020]EzPHP](/img/be/a48a1a9147f1f3b21ef2d60fc1f59f.png)
[BJDCTF2020]EzPHP

flink需求之—ProcessFunction(需求:如果30秒内温度连续上升就报警)

Use csrftester to automatically detect CSRF vulnerabilities

Application of encoding in XSS
随机推荐
MySQL第一篇
[HITCON 2017]SSRFme
DOM day_ 02 (7.8) web page production process, picture SRC attribute, carousel chart, custom attribute, tab bar, input box event, check operation, accessor syntax
Search engine realizes keyword highlighting
2022.7.16DAY606
Flink面试常见的25个问题(无答案)
深入理解Golang - 闭包
Detailed explanation of CSRF forged user request attack
05 - 钓鱼网站的攻击与防御
[By Pass] WAF 的绕过方式
5_ Linear regression
DOM day_ 04 (7.12) BOM, open new page (delayed opening), address bar operation, browser information reading, historical operation
2022.7.14DAY604
细说 call、apply 以及 bind 的区别和用法 20211031
flink需求之—SideOutPut(侧输出流的应用:将温度大于30℃的输出到主流,低于30℃的输出到侧流)
Canal introduction
ES6中的export和import
基于Flink实时项目:用户行为分析(二:实时流量统计)
Canal 安装
JSCORE day_01(6.30) RegExp 、 Function