当前位置:网站首页>Flink从入门到真香(3、从集合和文件中读取数据)
Flink从入门到真香(3、从集合和文件中读取数据)
2020-11-08 12:06:00 【osc_neocf7df】
关于环境准备可以参考: https://blog.51cto.com/mapengfei/2546985
从集合中读取数据
新建包,com.mafei.apitest,新建一个scala Object类,
package com.mafei.apitest
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
//获取传感器数据
case class SensorReading(id: String,timestamp: Long, temperature: Double)
object SourceTest {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 1、从集合中读取数据
val dataList = List(
SensorReading("sensor1",1603766281,41),
SensorReading("sensor2",1603766282,42),
SensorReading("sensor3",1603766283,43),
SensorReading("sensor4",1603766284,44)
)
val stream1 = env.fromCollection(dataList)
stream1.print()
//执行
env.execute(" source test")
}
}
代码目录图:
运行效果
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2> SensorReading(sensor3,1603766283,43.0)
4> SensorReading(sensor1,1603766281,41.0)
3> SensorReading(sensor4,1603766284,44.0)
1> SensorReading(sensor2,1603766282,42.0)
从文件读取数据
和第一步一样,新建包,com.mafei.apitest,新建一个scala Object类,
package com.mafei.apitest
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
//获取传感器数据
case class SensorReading(id: String,timestamp: Long, temperature: Double)
object SourceTest {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//从文件中读取数据
val stream2= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
stream2.print()
//执行
env.execute(" source test")
}
}
在resources目录下新建sensor.txt,写入以下内容
sensor1,1603766281,41
sensor2,1603766282,42
sensor3,1603766283,43
sensor4,1603766284,44
代码结构图:
代码运行效果:
1> sensor1,1603766281,41
1> sensor2,1603766282,42
2> sensor3,1603766283,43
3> sensor4,1603766284,44
版权声明
本文为[osc_neocf7df]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4416758/blog/4708099
边栏推荐
- Tight supply! Apple's iPhone 12 power chip capacity exposed
- Harbor项目高手问答及赠书活动
- 2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
- 【计算机网络】学习笔记,第三篇:数据链路层(谢希仁版)
- 一个方案提升Flutter内存利用率
- Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
- 分布式文档存储数据库之MongoDB基础入门
- Flink的sink实战之一:初探
- Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
- Adobe media encoder /Me 2021软件安装包(附安装教程)
猜你喜欢

Shell uses. Net objects to send mail

阿里教你深入浅出玩转物联网平台!(附网盘链接)

Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom

Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom

阿里出品!视觉计算开发者系列手册(附网盘链接)

Personal current technology stack

Dogs can also operate drones! You're right, but it's actually an autonomous drone - you know

值得一看!EMR弹性低成本离线大数据分析最佳实践(附网盘链接)

还不快看!对于阿里云云原生数据湖体系全解读!(附网盘链接)

Ali tear off the e-commerce label
随机推荐
PMP考试通过心得分享
11 server monitoring tools commonly used by operation and maintenance personnel
This time Kwai tiktok is faster than shaking.
Can you do it with only six characters?
Xamarin deploys IOS from scratch Walterlv.CloudKeyboard application
We interviewed the product manager of SQL server of Alibaba cloud database, and he said that it is enough to understand these four problems
华为云重大变革:Cloud&AI 升至华为第四大 BG ,火力全开
Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
YGC问题排查,又让我涨姿势了!
Web novice problem of attacking and defending the world
Python Gadgets: code conversion
阿里撕下电商标签
供货紧张!苹果被曝 iPhone 12 电源芯片产能不足
Ali tear off the e-commerce label
这次,快手终于比抖音'快'了!
Personal current technology stack
Python基础语法
墨者学院SQL注入解题
From a friend recently Ali, Tencent, meituan and other P7 Python development post interview questions
不多不少,大学里必做的五件事(从我的大一说起)