当前位置:网站首页>Flink从入门到真香(7、Sink数据输出-文件)
Flink从入门到真香(7、Sink数据输出-文件)
2020-11-08 12:06:00 【osc_u9mt0sus】
Source 是 Flink 程序的输入,Sink 就是 Flink 程序处理完Source后数据的输出,比如将输出写到文件、sockets、外部系统、或者仅仅是显示(在大数据生态中,很多类似的,比如Flume里也是对应的Source/Channel/Sink),Flink 提供了多种数据输出方式
跟在代码中直接写不同(比如可以在RickMap中open、close、map中直接写)他可以保存一些状态,容错重试机制等等
package com.mafei.sinktest
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
case class SensorReadingTest3(id: String,timestamp: Long, temperature: Double)
object FileSink {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
//先转换成样例类类型
val dataStream = inputStream
.map(data =>{
val arr = data.split(",") //按照,分割数据,获取结果
SensorReadingTest3(arr(0), arr(1).toLong,arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
})
dataStream.print()
//简单的输出到txt中的方法,已被flink弃用
// dataStream.writeAsText("/opt/java2020_study/maven/flink1/src/main/resources/sink.txt")
//新的输出方式-推荐
dataStream.addSink(
StreamingFileSink.forRowFormat(
new Path("/opt/java2020_study/maven/flink1/src/main/resources/sink2.txt"),
new SimpleStringEncoder[SensorReadingTest3]() //可以在括号中传入编码,默认是udf-8
).build()
)
env.execute("udf test")
}
}
代码结构及最终输出效果:

版权声明
本文为[osc_u9mt0sus]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4365833/blog/4708100
边栏推荐
- We interviewed the product manager of SQL server of Alibaba cloud database, and he said that it is enough to understand these four problems
- 值得一看!EMR弹性低成本离线大数据分析最佳实践(附网盘链接)
- 2天,利用下班后的4小时开发一个测试工具
- 笔试面试题目:盛水最多的容器
- 攻防世界之web新手题
- 吐血整理!阿里巴巴 Android 开发手册!(附网盘链接)
- 来自朋友最近阿里、腾讯、美团等P7级Python开发岗位面试题
- 这次,快手终于比抖音'快'了!
- 浅谈单调栈
- C language I blog assignment 03
猜你喜欢

分布式文档存储数据库之MongoDB基础入门

2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...

11 server monitoring tools commonly used by operation and maintenance personnel

吐血整理!阿里巴巴 Android 开发手册!(附网盘链接)

虚拟机中安装 macOS 11 big sur

Recommend an economic science video, very valuable!

第二次作业

AQS解析

Tidb performance competition 11.02-11.06

笔试面试题目:求丢失的猪
随机推荐
原创 | 数据资产确权浅议
Introduction to mongodb foundation of distributed document storage database
Python基础语法
VC++指定目录下文件按时间排序输出
最全!阿里巴巴经济体云原生实践!(附网盘链接)
【计算机网络】学习笔记,第三篇:数据链路层(谢希仁版)
2 days, using 4 hours after work to develop a test tool
Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
Can you do it with only six characters?
C语言I博客作业03
Recommend an economic science video, very valuable!
这次,快手终于比抖音'快'了!
我们采访了阿里云云数据库SQL Server的产品经理,他说了解这四个问题就可以了...
How to write a resume and project
next.js实现服务端缓存
Personal current technology stack
在51CTO学院Get到PMP证书
PCIe enumeration process
laravel8更新之速率限制改进
ArrayList源码分析