当前位置:网站首页>Flink从入门到真香(7、Sink数据输出-文件)
Flink从入门到真香(7、Sink数据输出-文件)
2020-11-08 12:06:00 【osc_u9mt0sus】
Source 是 Flink 程序的输入,Sink 就是 Flink 程序处理完Source后数据的输出,比如将输出写到文件、sockets、外部系统、或者仅仅是显示(在大数据生态中,很多类似的,比如Flume里也是对应的Source/Channel/Sink),Flink 提供了多种数据输出方式
跟在代码中直接写不同(比如可以在RickMap中open、close、map中直接写)他可以保存一些状态,容错重试机制等等
package com.mafei.sinktest
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
case class SensorReadingTest3(id: String,timestamp: Long, temperature: Double)
object FileSink {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
//先转换成样例类类型
val dataStream = inputStream
.map(data =>{
val arr = data.split(",") //按照,分割数据,获取结果
SensorReadingTest3(arr(0), arr(1).toLong,arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
})
dataStream.print()
//简单的输出到txt中的方法,已被flink弃用
// dataStream.writeAsText("/opt/java2020_study/maven/flink1/src/main/resources/sink.txt")
//新的输出方式-推荐
dataStream.addSink(
StreamingFileSink.forRowFormat(
new Path("/opt/java2020_study/maven/flink1/src/main/resources/sink2.txt"),
new SimpleStringEncoder[SensorReadingTest3]() //可以在括号中传入编码,默认是udf-8
).build()
)
env.execute("udf test")
}
}
代码结构及最终输出效果:

版权声明
本文为[osc_u9mt0sus]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4365833/blog/4708100
边栏推荐
- “1024”征文活动结果新鲜出炉!快来看看是否榜上有名?~~
- 2 days, using 4 hours after work to develop a test tool
- Adobe media encoder / me 2021 software installation package (with installation tutorial)
- python基本语法 变量
- 吐血整理!阿里巴巴 Android 开发手册!(附网盘链接)
- When kubernetes encounters confidential computing, see how Alibaba protects the data in the container! (Internet disk link attached)
- 新的目标市场在哪里?锚定的产品是什么?| 十问2021中国企业服务
- ArrayList源码分析
- 一文剖析2020年最火十大物联网应用|IoT Analytics 年度重磅报告出炉!
- Or talk No.19 | Facebook Dr. Tian Yuandong: black box optimization of hidden action set based on Monte Carlo tree search
猜你喜欢

Flink的sink实战之一:初探

Tight supply! Apple's iPhone 12 power chip capacity exposed

Q & A and book giving activities of harbor project experts

OR Talk NO.19 | Facebook田渊栋博士:基于蒙特卡洛树搜索的隐动作集黑盒优化 - 知乎

吐血整理!阿里巴巴 Android 开发手册!(附网盘链接)

原创 | 数据资产确权浅议

How to deploy pytorch lightning model to production

Win10 Terminal + WSL 2 安装配置指南,精致开发体验

Win10 terminal + WSL 2 installation and configuration guide, exquisite development experience

阿里撕下电商标签
随机推荐
BCCOIN告诉您:年底最靠谱的投资项目是什么!
Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
Research on WLAN direct connection (peer-to-peer connection or P2P) and cross platform research of IOS
渤海银行百万级罚单不断:李伏安却称治理完善,增速呈下滑趋势
Dogs can also operate drones! You're right, but it's actually an autonomous drone - you know
Iqkeyboardmanager source code to see
你搞不懂与别人的差距,永远成不了架构师!月薪15K和月薪65K,你差在那了?
Ali! Visual computing developer's series of manuals (with internet disk link)
分布式文档存储数据库之MongoDB基础入门
Rust: performance test criteria Library
AQS解析
The most complete! Alibaba economy cloud original practice! (Internet disk link attached)
How TCP protocol ensures reliable transmission
来自朋友最近阿里、腾讯、美团等P7级Python开发岗位面试题
Ubuntu20.04下访问FTP服务器乱码问题+上传文件
Oops, the system is under attack again
在51CTO学院Get到PMP证书
C language I blog assignment 03
YGC问题排查,又让我涨姿势了!
A scheme to improve the memory utilization of flutter