当前位置:网站首页>Flink from introduction to Zhenxiang (7. Sink data output file)
Flink from introduction to Zhenxiang (7. Sink data output file)
2020-11-08 12:06:00 【osc_u9mt0sus】
Source yes Flink Program input ,Sink Namely Flink The program is finished Source After the data output , For example, writing output to a file 、sockets、 An external system 、 Or just show ( In the big data Ecology , A lot of similar , such as Flume It's also corresponding to Source/Channel/Sink),Flink Provides a variety of data output methods
It's not like writing directly in code ( For example, you can RickMap in open、close、map Direct writing ) He can save some state , Fault tolerant retrial mechanism and so on
package com.mafei.sinktest
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
case class SensorReadingTest3(id: String,timestamp: Long, temperature: Double)
object FileSink {
def main(args: Array[String]): Unit = {
// Create an execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
// First convert to sample class type
val dataStream = inputStream
.map(data =>{
val arr = data.split(",") // according to , Split data , To get the results
SensorReadingTest3(arr(0), arr(1).toLong,arr(2).toDouble) // Generate data for a sensor class , Parameters are passed in the middle toLong and toDouble Because the default split is string category
})
dataStream.print()
// Simple output to txt The method in , Has been flink Abandoning
// dataStream.writeAsText("/opt/java2020_study/maven/flink1/src/main/resources/sink.txt")
// New output mode - recommend
dataStream.addSink(
StreamingFileSink.forRowFormat(
new Path("/opt/java2020_study/maven/flink1/src/main/resources/sink2.txt"),
new SimpleStringEncoder[SensorReadingTest3]() // You can pass in the encoding in parentheses , The default is udf-8
).build()
)
env.execute("udf test")
}
}
Code structure and final output effect :
版权声明
本文为[osc_u9mt0sus]所创,转载请带上原文链接,感谢
边栏推荐
- 最全!阿里巴巴经济体云原生实践!(附网盘链接)
- 阿里出品!视觉计算开发者系列手册(附网盘链接)
- Flink's sink: a preliminary study
- Automatically generate RSS feeds for docsify
- The progress bar written in Python is so wonderful~
- 2 days, using 4 hours after work to develop a test tool
- Can you do it with only six characters?
- Introduction to mongodb foundation of distributed document storage database
- Implementation of verification code recognition in Python opencv pytesseract
- python基础教程python opencv pytesseract 验证码识别的实现
猜你喜欢
用科技赋能教育创新与重构 华为将教育信息化落到实处
学习小结(关于深度学习、视觉和学习体会)
Why is Schnorr Signature known as the biggest technology update after bitcoin segwit
Rust: performance test criteria Library
Tidb performance competition 11.02-11.06
Implementation of verification code recognition in Python opencv pytesseract
Service architecture and transformation optimization process of e-commerce trading platform in mogujie (including ppt)
Windows10关机问题----只有“睡眠”、“更新并重启”、“更新并关机”,但是又不想更新,解决办法
Win10 terminal + WSL 2 installation and configuration guide, exquisite development experience
Where is the new target market? What is the anchored product? |Ten questions 2021 Chinese enterprise service
随机推荐
If you don't understand the gap with others, you will never become an architect! What's the difference between a monthly salary of 15K and a monthly salary of 65K?
2天,利用下班后的4小时开发一个测试工具
Understanding design patterns
虚拟机中安装 macOS 11 big sur
[data structure Python description] use hash table to manually implement a dictionary class based on Python interpreter
PMP考试通过心得分享
Automatically generate RSS feeds for docsify
墨者学院SQL注入解题
Written interview topic: looking for the lost pig
个人目前技术栈
Where is the new target market? What is the anchored product? |Ten questions 2021 Chinese enterprise service
分布式文档存储数据库之MongoDB基础入门
ArrayList源码分析
The most complete! Alibaba economy cloud original practice! (Internet disk link attached)
Mozi college SQL injection solution
What can your cloud server do? What is the purpose of cloud server?
This year's salary is 35W +! Why is the salary of Internet companies getting higher and higher?
Research on WLAN direct connection (peer-to-peer connection or P2P) and cross platform research of IOS
Flink从入门到真香(7、Sink数据输出-文件)
IQKeyboardManager 源代码看看