当前位置:网站首页>Flink从入门到真香(7、Sink数据输出-文件)
Flink从入门到真香(7、Sink数据输出-文件)
2020-11-08 12:06:00 【osc_u9mt0sus】
Source 是 Flink 程序的输入,Sink 就是 Flink 程序处理完Source后数据的输出,比如将输出写到文件、sockets、外部系统、或者仅仅是显示(在大数据生态中,很多类似的,比如Flume里也是对应的Source/Channel/Sink),Flink 提供了多种数据输出方式
跟在代码中直接写不同(比如可以在RickMap中open、close、map中直接写)他可以保存一些状态,容错重试机制等等
package com.mafei.sinktest
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
case class SensorReadingTest3(id: String,timestamp: Long, temperature: Double)
object FileSink {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
//先转换成样例类类型
val dataStream = inputStream
.map(data =>{
val arr = data.split(",") //按照,分割数据,获取结果
SensorReadingTest3(arr(0), arr(1).toLong,arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
})
dataStream.print()
//简单的输出到txt中的方法,已被flink弃用
// dataStream.writeAsText("/opt/java2020_study/maven/flink1/src/main/resources/sink.txt")
//新的输出方式-推荐
dataStream.addSink(
StreamingFileSink.forRowFormat(
new Path("/opt/java2020_study/maven/flink1/src/main/resources/sink2.txt"),
new SimpleStringEncoder[SensorReadingTest3]() //可以在括号中传入编码,默认是udf-8
).build()
)
env.execute("udf test")
}
}
代码结构及最终输出效果:
版权声明
本文为[osc_u9mt0sus]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4365833/blog/4708100
边栏推荐
- Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
- 当Kubernetes遇到机密计算,看阿里巴巴如何保护容器内数据的安全!(附网盘链接)
- It's worth seeing! EMR elastic low cost offline big data analysis best practice (with network disk link)
- 比Python快20%,就问你兴不兴奋?
- Research on WLAN direct connection (peer-to-peer connection or P2P) and cross platform research of IOS
- 用 Python 写出来的进度条,竟如此美妙~
- Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
- Don't look! Full interpretation of Alibaba cloud's original data lake system! (Internet disk link attached)
- YGC问题排查,又让我涨姿势了!
- Improvement of rate limit for laravel8 update
猜你喜欢
Istio traffic management -- progress gateway
Learning summary (about deep learning, vision and learning experience)
Introduction to mongodb foundation of distributed document storage database
What can your cloud server do? What is the purpose of cloud server?
Ali teaches you how to use the Internet of things platform! (Internet disk link attached)
Is software testing training class easy to find a job
分布式文档存储数据库之MongoDB基础入门
Entry level! Teach you how to develop small programs without asking for help (with internet disk link)
Adobe media encoder /Me 2021软件安装包(附安装教程)
Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
随机推荐
解析Istio访问控制
Flink的sink实战之一:初探
A scheme to improve the memory utilization of flutter
Is software testing training class easy to find a job
11 server monitoring tools commonly used by operation and maintenance personnel
Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
Written interview topic: looking for the lost pig
Rust: performance test criteria Library
PMP考试通过心得分享
虚拟机中安装 macOS 11 big sur
Ali tear off the e-commerce label
2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
Major changes in Huawei's cloud: Cloud & AI rises to Huawei's fourth largest BG with full fire
漫画|讲解一下如何写简历&项目
Xamarin deploys IOS from scratch Walterlv.CloudKeyboard application
维图PDMS切图软件
华为云重大变革:Cloud&AI 升至华为第四大 BG ,火力全开
学习小结(关于深度学习、视觉和学习体会)
IQKeyboardManager 源代码看看
你搞不懂与别人的差距,永远成不了架构师!月薪15K和月薪65K,你差在那了?