当前位置:网站首页>Flink Learning 12: DataStreaming API
Flink Learning 12: DataStreaming API
2022-08-05 07:11:00 【hzp666】

1. Concept



2.DatStream programming
There are 3 parts that need to be implemented by code, 1. Data source 2. Conversion 3. Sink specified output format

2.1 Create environment
val env = ExecutionEnvironment.getExecutionEnvironment
The statement is relatively fixed

2.2 Data Source
There are three main built-in data sources: 
2.2.1 Text Data Source

import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentobject readText {def main(args: Array[String]): Unit = {//create envval env = StreamExecutionEnvironment.getExecutionEnvironment//create datasourceval dataStream = env.readTextFile(filePath = "C:\\doc\\temp\\1. Introduction to the main line item.txt")//printdataStream.print()//executeenv.execute()}}2.2.2 Socket Streams

Use the NC that comes with the Ubantu system to generate a socket data source
Then write code to monitor socket data
import org.apache.flink.api.scala._import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentobject socketSourceTest {def main(args: Array[String]): Unit = {//create envval env = StreamExecutionEnvironment.getExecutionEnvironment//create datasourceval socketDataStream = env.socketTextStream("localhost", 9999, '\n')//printsocketDataStream.print()//executeenv.execute()}}Output results

2.2.3 Collection Data Sources

import org.apache.flink.api.scala._object CollectionSourceTest {def main(args: Array[String]): Unit = {//create envval env = ExecutionEnvironment.getExecutionEnvironment//collection data sourceval collectionDataStream = env.fromElements(Tuple1(1L, 2L), Tuple1(3L, 4L))//printcollectionDataStream.print()}}Output:
边栏推荐
- UDP广播
- After working for 3 years, I recalled the comparison between the past and the present when I first started, and joked about my testing career
- Promise (3) async/await
- protobuf根据有关联的.proto文件进行编译
- 任务流调度工具AirFlow,,220804,,
- RK3568环境安装
- 2022熔化焊接与热切割操作证考试题及模拟考试
- 在STM32中使用printf函数
- Redis
- Nacos cluster construction
猜你喜欢

Mysql为什么 建立数据库失败

typescript60-泛型工具类型(readonly)

2022杭电多校六 1007-Shinobu loves trip(同余方程)

八大排序之快速排序

typescript66-分析partial的实现

After working for 3 years, I recalled the comparison between the past and the present when I first started, and joked about my testing career

算法拾遗十五补链表相关面试题

利用将网页项目部署到阿里云上(ngnix)

(JLK105D)中山爆款LED恒流电源芯片方案

UDP组(多)播
随机推荐
typescript67-索引查询类型
Flink学习10:使用idea编写WordCount,并打包运行
一天学会从抓包到接口测试,通过智慧物业项目深度解析
《PyTorch深度学习实践》第十课(卷积神经网络CNN)
MySQL: basic part
不能比较或排序 text、ntext 和 image 数据类型
DevExpress中针对指定列进行百分比转换
Week 8 Document Clustering
在小程序中关于js数字精度丢失的解决办法
Falsely bamboo brother today and found a localization of API to use tools
Hong Kong International Jewellery Show and Hong Kong International Diamond, Gem and Pearl Show kick off
re正则表达式
typescript68-索引查询类型(查询多个)
【JVM调优】Xms和Xmx为什么要保持一致
1、Citrix XenDesktop 2203之AD域系统安装(一)
铠侠携手Aerospike提升数据库应用性能
2022 Fusion Welding and Thermal Cutting Operation Certificate Exam Questions and Mock Exams
Shared memory + inotify mechanism to achieve multi-process low-latency data sharing
IO process thread -> communication between processes -> day7
Technical Analysis Mode (7) Playing the Gap
