当前位置:网站首页>Flink学习12:DataStreaming API
Flink学习12:DataStreaming API
2022-08-05 06:35:00 【hzp666】
1.概念
2.DatStream编程
需要代码实现的就3部分,1.数据源 2.转换 3.sink指定输出格式
2.1 创建环境
val env = ExecutionEnvironment.getExecutionEnvironment
语句比较固定
2.2 数据源
内置数据源主要有3种:
2.2.1 文本数据源
import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object readText { def main(args: Array[String]): Unit = { //create env val env = StreamExecutionEnvironment.getExecutionEnvironment //create datasource val dataStream = env.readTextFile(filePath = "C:\\doc\\temp\\1.正行项目介绍.txt") //print dataStream.print() //execute env.execute() } }
2.2.2 套接字流
使用Ubantu系统自带的NC生成一个socket数据源
然后编写代码,监听socket数据
import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object socketSourceTest { def main(args: Array[String]): Unit = { //create env val env = StreamExecutionEnvironment.getExecutionEnvironment //create datasource val socketDataStream = env.socketTextStream("localhost", 9999, '\n') //print socketDataStream.print() //execute env.execute() } }
输出结果
2.2.3 集合数据源
import org.apache.flink.api.scala._ object CollectionSourceTest { def main(args: Array[String]): Unit = { //create env val env = ExecutionEnvironment.getExecutionEnvironment //collection data source val collectionDataStream = env.fromElements(Tuple1(1L, 2L), Tuple1(3L, 4L)) //print collectionDataStream.print() } } 输出结果:
边栏推荐
- 盒子模型大详解
- 边缘盒子+时序数据库,美的数字化平台 iBUILDING 背后的技术选型
- lingo入门——河北省第三届研究生建模竞赛B题
- FPGA parsing B code----serial 4
- 2022熔化焊接与热切割操作证考试题及模拟考试
- 共享内存+inotify机制实现多进程低延迟数据共享
- JS控制只能输入数字并且最多允许小数点两位
- IO process thread -> communication between processes -> day7
- LaTeX uses frame to make PPT pictures without labels
- 关于Antd的Affix突然不好用了,或者Window的scroll监听不好用了
猜你喜欢
真实字节跳动测试开发面试题,拿下年薪50万offer。
淘宝宝贝页面制作
在STM32中使用printf函数
【网友真实投稿】为女友放弃国企舒适圈,转行软件测试12k*13薪
《PyTorch深度学习实践》第十一课(卷积神经网络CNN高级版)
MySQL:连接查询 | 内连接,外连接
Linux中安装Redis教程
AI + video technology helps to ensure campus security, how to build a campus intelligent security platform?
typescript67-索引查询类型
TCP sticky packet unpacking problem + solution
随机推荐
MyCat安装
腾讯业务安全岗 IDP 谈话总结
Pytorch distributed parallel processing
(JLK105D)中山爆款LED恒流电源芯片方案
【网友真实投稿】为女友放弃国企舒适圈,转行软件测试12k*13薪
After working for 3 years, I recalled the comparison between the past and the present when I first started, and joked about my testing career
Redis
protobuf根据有关联的.proto文件进行编译
UDP广播
MySQL:基础部分
【Go】IM系统Centrifugo
Mysql主从延迟的原因和解决方案
TCP的粘包拆包问题+解决方案
DNSlog外带数据注入
【instancetype类型 Objective-C】
长度以及颜色单位基本概念
document.querySelector() method
Mysql master-slave delay reasons and solutions
typescript62-泛型工具类型(record)
Redis