当前位置:网站首页>Flink从入门到真香(3、从集合和文件中读取数据)
Flink从入门到真香(3、从集合和文件中读取数据)
2020-11-08 12:06:00 【osc_neocf7df】
关于环境准备可以参考: https://blog.51cto.com/mapengfei/2546985
从集合中读取数据
新建包,com.mafei.apitest,新建一个scala Object类,
package com.mafei.apitest
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
//获取传感器数据
case class SensorReading(id: String,timestamp: Long, temperature: Double)
object SourceTest {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 1、从集合中读取数据
val dataList = List(
SensorReading("sensor1",1603766281,41),
SensorReading("sensor2",1603766282,42),
SensorReading("sensor3",1603766283,43),
SensorReading("sensor4",1603766284,44)
)
val stream1 = env.fromCollection(dataList)
stream1.print()
//执行
env.execute(" source test")
}
}
代码目录图:
运行效果
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2> SensorReading(sensor3,1603766283,43.0)
4> SensorReading(sensor1,1603766281,41.0)
3> SensorReading(sensor4,1603766284,44.0)
1> SensorReading(sensor2,1603766282,42.0)
从文件读取数据
和第一步一样,新建包,com.mafei.apitest,新建一个scala Object类,
package com.mafei.apitest
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
//获取传感器数据
case class SensorReading(id: String,timestamp: Long, temperature: Double)
object SourceTest {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//从文件中读取数据
val stream2= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
stream2.print()
//执行
env.execute(" source test")
}
}
在resources目录下新建sensor.txt,写入以下内容
sensor1,1603766281,41
sensor2,1603766282,42
sensor3,1603766283,43
sensor4,1603766284,44
代码结构图:
代码运行效果:
1> sensor1,1603766281,41
1> sensor2,1603766282,42
2> sensor3,1603766283,43
3> sensor4,1603766284,44
版权声明
本文为[osc_neocf7df]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4416758/blog/4708099
边栏推荐
- 2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
- What can your cloud server do? What is the purpose of cloud server?
- Win10 Terminal + WSL 2 安装配置指南,精致开发体验
- 2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
- Installing MacOS 11 Big Sur in virtual machine
- 当Kubernetes遇到机密计算,看阿里巴巴如何保护容器内数据的安全!(附网盘链接)
- Deeplight Technology Bluetooth protocol SRRC certification services
- “1024”征文活动结果新鲜出炉!快来看看是否榜上有名?~~
- python基础教程python opencv pytesseract 验证码识别的实现
- Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
猜你喜欢

The container with the most water

笔试面试题目:盛水最多的容器

Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom

Written interview questions: find the smallest positive integer missing

Japan PSE certification

阿里撕下电商标签

原创 | 数据资产确权浅议

仅用六种字符来完成Hello World,你能做到吗?
![[computer network] learning notes, Part 3: data link layer (Xie Xiren version)](/img/b0/b236a52e38f1cd3eff25a398dac7aa.jpg)
[computer network] learning notes, Part 3: data link layer (Xie Xiren version)

Istio traffic management -- progress gateway
随机推荐
Rust : 性能测试criterion库
Introduction to mongodb foundation of distributed document storage database
Don't look! Full interpretation of Alibaba cloud's original data lake system! (Internet disk link attached)
Improvement of rate limit for laravel8 update
蘑菇街电商交易平台服务架构及改造优化历程(含PPT)
This time Kwai tiktok is faster than shaking.
应届生年薪35w+ !倒挂老员工,互联网大厂薪资为何越来越高?
为什么 Schnorr 签名被誉为比特币 Segwit 后的最大技术更新
笔试面试题目:盛水最多的容器
漫画|讲解一下如何写简历&项目
临近双11,恶补了两个月成功拿下大厂offer,跳槽到阿里巴巴
Python基础语法
“1024”征文活动结果新鲜出炉!快来看看是否榜上有名?~~
攻防世界之web新手题
个人目前技术栈
Ali tear off the e-commerce label
C语言I博客作业03
Analysis of ArrayList source code
python小工具:编码转换
Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom