当前位置:网站首页>Flink Learning 12: DataStreaming API
Flink Learning 12: DataStreaming API
2022-08-05 07:11:00 【hzp666】
1. Concept
2.DatStream programming
There are 3 parts that need to be implemented by code, 1. Data source 2. Conversion 3. Sink specified output format
2.1 Create environment
val env = ExecutionEnvironment.getExecutionEnvironment
The statement is relatively fixed
2.2 Data Source
There are three main built-in data sources: 
2.2.1 Text Data Source
import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentobject readText {def main(args: Array[String]): Unit = {//create envval env = StreamExecutionEnvironment.getExecutionEnvironment//create datasourceval dataStream = env.readTextFile(filePath = "C:\\doc\\temp\\1. Introduction to the main line item.txt")//printdataStream.print()//executeenv.execute()}}
2.2.2 Socket Streams
Use the NC that comes with the Ubantu system to generate a socket data source
Then write code to monitor socket data
import org.apache.flink.api.scala._import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentobject socketSourceTest {def main(args: Array[String]): Unit = {//create envval env = StreamExecutionEnvironment.getExecutionEnvironment//create datasourceval socketDataStream = env.socketTextStream("localhost", 9999, '\n')//printsocketDataStream.print()//executeenv.execute()}}
Output results
2.2.3 Collection Data Sources
import org.apache.flink.api.scala._object CollectionSourceTest {def main(args: Array[String]): Unit = {//create envval env = ExecutionEnvironment.getExecutionEnvironment//collection data sourceval collectionDataStream = env.fromElements(Tuple1(1L, 2L), Tuple1(3L, 4L))//printcollectionDataStream.print()}}Output:
边栏推荐
猜你喜欢
随机推荐
IO process thread -> communication between processes -> day7
Flink学习11:flink程序并行度
export使用
Source code analysis of Nacos configuration service (full)
2022起重机司机(限桥式起重机)考试题库及模拟考试
Advanced Redis
在小程序中关于js数字精度丢失的解决办法
DNSlog外带数据注入
Shiny04---Application of DT and progress bar in shiny
360度反馈调查表中的问题示范
IO进程线程->进程间的通信->day7
【动态类型检测 Objective-C】
访问被拒绝:“microsoft.web.ui.webcontrols”的解决办法
Flink学习10:使用idea编写WordCount,并打包运行
腾讯业务安全岗 IDP 谈话总结
Japan Sanitary Equipment Industry Association: Japan's warm water shower toilet seat shipments reached 100 million sets
RK3568环境安装
2022杭电多校六 1006-Maex (树形DP)
typescript59-泛型工具类型(partial )
400 times performance improvement 丨 swap valuation optimization case calculation