当前位置:网站首页>Flink from introduction to Zhenxiang (7. Sink data output file)

Flink from introduction to Zhenxiang (7. Sink data output file)

2020-11-08 12:06:00 osc_u9mt0sus

Source yes Flink Program input ,Sink Namely Flink The program is finished Source After the data output , For example, writing output to a file 、sockets、 An external system 、 Or just show ( In the big data Ecology , A lot of similar , such as Flume It's also corresponding to Source/Channel/Sink),Flink Provides a variety of data output methods

It's not like writing directly in code ( For example, you can RickMap in open、close、map Direct writing ) He can save some state , Fault tolerant retrial mechanism and so on


package com.mafei.sinktest

import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}

case class SensorReadingTest3(id: String,timestamp: Long, temperature: Double)

object FileSink {
  def main(args: Array[String]): Unit = {
    // Create an execution environment 
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val inputStream= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
    env.setParallelism(1)

    // First convert to sample class type 
    val dataStream = inputStream
      .map(data =>{
        val arr = data.split(",")   // according to , Split data , To get the results 
        SensorReadingTest3(arr(0), arr(1).toLong,arr(2).toDouble)  // Generate data for a sensor class , Parameters are passed in the middle toLong and toDouble Because the default split is string category 
      })

    dataStream.print()

    // Simple output to txt The method in , Has been flink Abandoning 
//    dataStream.writeAsText("/opt/java2020_study/maven/flink1/src/main/resources/sink.txt")

    // New output mode - recommend 
    dataStream.addSink(
      StreamingFileSink.forRowFormat(
        new Path("/opt/java2020_study/maven/flink1/src/main/resources/sink2.txt"),
        new SimpleStringEncoder[SensorReadingTest3]()   // You can pass in the encoding in parentheses , The default is udf-8
      ).build()
    )

    env.execute("udf test")
  }

}

Code structure and final output effect :

Flink From introduction to Zhenxiang (7、Sink Data output - file )

版权声明
本文为[osc_u9mt0sus]所创,转载请带上原文链接,感谢