当前位置:网站首页>Flink从入门到真香(3、从集合和文件中读取数据)
Flink从入门到真香(3、从集合和文件中读取数据)
2020-11-08 12:06:00 【osc_neocf7df】
关于环境准备可以参考: https://blog.51cto.com/mapengfei/2546985
从集合中读取数据
新建包,com.mafei.apitest,新建一个scala Object类,
package com.mafei.apitest
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
//获取传感器数据
case class SensorReading(id: String,timestamp: Long, temperature: Double)
object SourceTest {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 1、从集合中读取数据
val dataList = List(
SensorReading("sensor1",1603766281,41),
SensorReading("sensor2",1603766282,42),
SensorReading("sensor3",1603766283,43),
SensorReading("sensor4",1603766284,44)
)
val stream1 = env.fromCollection(dataList)
stream1.print()
//执行
env.execute(" source test")
}
}
代码目录图:
运行效果
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2> SensorReading(sensor3,1603766283,43.0)
4> SensorReading(sensor1,1603766281,41.0)
3> SensorReading(sensor4,1603766284,44.0)
1> SensorReading(sensor2,1603766282,42.0)
从文件读取数据
和第一步一样,新建包,com.mafei.apitest,新建一个scala Object类,
package com.mafei.apitest
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
//获取传感器数据
case class SensorReading(id: String,timestamp: Long, temperature: Double)
object SourceTest {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//从文件中读取数据
val stream2= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
stream2.print()
//执行
env.execute(" source test")
}
}
在resources目录下新建sensor.txt,写入以下内容
sensor1,1603766281,41
sensor2,1603766282,42
sensor3,1603766283,43
sensor4,1603766284,44
代码结构图:
代码运行效果:
1> sensor1,1603766281,41
1> sensor2,1603766282,42
2> sensor3,1603766283,43
3> sensor4,1603766284,44
版权声明
本文为[osc_neocf7df]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4416758/blog/4708099
边栏推荐
- Why is Schnorr Signature known as the biggest technology update after bitcoin segwit
- Iqkeyboardmanager source code to see
- 分布式文档存储数据库之MongoDB基础入门
- 年轻一代 winner 的程序人生,改变世界的起点藏在身边
- Where is the new target market? What is the anchored product? |Ten questions 2021 Chinese enterprise service
- 你搞不懂与别人的差距,永远成不了架构师!月薪15K和月薪65K,你差在那了?
- How TCP protocol ensures reliable transmission
- Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
- python基础教程python opencv pytesseract 验证码识别的实现
- PMP心得分享
猜你喜欢
python基础教程python opencv pytesseract 验证码识别的实现
Analysis of ArrayList source code
OR Talk NO.19 | Facebook田渊栋博士:基于蒙特卡洛树搜索的隐动作集黑盒优化 - 知乎
Adobe Lightroom / LR 2021 software installation package (with installation tutorial)
攻防世界之web新手题
Flink的sink实战之一:初探
C language I blog assignment 03
211考研失败后,熬夜了两个月拿下字节offer!【面经分享】
[computer network] learning notes, Part 3: data link layer (Xie Xiren version)
YGC问题排查,又让我涨姿势了!
随机推荐
ArrayList源码分析
分布式文档存储数据库之MongoDB基础入门
C语言I博客作业03
渤海银行百万级罚单不断:李伏安却称治理完善,增速呈下滑趋势
OR Talk NO.19 | Facebook田渊栋博士:基于蒙特卡洛树搜索的隐动作集黑盒优化 - 知乎
我们采访了阿里云云数据库SQL Server的产品经理,他说了解这四个问题就可以了...
用科技赋能教育创新与重构 华为将教育信息化落到实处
Entry level! Teach you how to develop small programs without asking for help (with internet disk link)
python小工具:编码转换
2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
Iqkeyboardmanager source code to see
What can your cloud server do? What is the purpose of cloud server?
一文读懂机器学习“数据中毒”
个人目前技术栈
Q & A and book giving activities of harbor project experts
Powershell 使用.Net对象发送邮件
11 server monitoring tools commonly used by operation and maintenance personnel
这次,快手终于比抖音'快'了!
年轻一代 winner 的程序人生,改变世界的起点藏在身边
华为云重大变革:Cloud&AI 升至华为第四大 BG ,火力全开