当前位置:网站首页>Flink Learning 12: DataStreaming API
Flink Learning 12: DataStreaming API
2022-08-05 07:11:00 【hzp666】
1. Concept
2.DatStream programming
There are 3 parts that need to be implemented by code, 1. Data source 2. Conversion 3. Sink specified output format
2.1 Create environment
val env = ExecutionEnvironment.getExecutionEnvironment
The statement is relatively fixed
2.2 Data Source
There are three main built-in data sources: 
2.2.1 Text Data Source
import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentobject readText {def main(args: Array[String]): Unit = {//create envval env = StreamExecutionEnvironment.getExecutionEnvironment//create datasourceval dataStream = env.readTextFile(filePath = "C:\\doc\\temp\\1. Introduction to the main line item.txt")//printdataStream.print()//executeenv.execute()}}
2.2.2 Socket Streams
Use the NC that comes with the Ubantu system to generate a socket data source
Then write code to monitor socket data
import org.apache.flink.api.scala._import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentobject socketSourceTest {def main(args: Array[String]): Unit = {//create envval env = StreamExecutionEnvironment.getExecutionEnvironment//create datasourceval socketDataStream = env.socketTextStream("localhost", 9999, '\n')//printsocketDataStream.print()//executeenv.execute()}}
Output results
2.2.3 Collection Data Sources
import org.apache.flink.api.scala._object CollectionSourceTest {def main(args: Array[String]): Unit = {//create envval env = ExecutionEnvironment.getExecutionEnvironment//collection data sourceval collectionDataStream = env.fromElements(Tuple1(1L, 2L), Tuple1(3L, 4L))//printcollectionDataStream.print()}}Output:
边栏推荐
- Flink学习11:flink程序并行度
- 基于KECA-IGWO-KELM的间歇过程故障诊断方法
- Technical Analysis Mode (7) Playing the Gap
- 更改小程序原生radio的颜色及大小
- #Sealos#使用工具部署kubernetesV1.24.0
- 蓝牙gap协议
- Promise (三) async/await
- Rapid Medical's Ultra-Small and Only Adjustable Thromb Retriever Receives FDA Clearance
- 对数据类型而言运算符无效。运算符为 add,类型为 text。
- DevExpress中针对指定列进行百分比转换
猜你喜欢
随机推荐
Using printf function in STM32
typescript64-映射类型
技术分析模式(十一)如何交易头肩形态
(4) Rotating object detection data roLabelImg to DOTA format
mysql使用in函数的一个小问题
360度反馈调查表中的问题示范
400 times performance improvement 丨 swap valuation optimization case calculation
typescript67-索引查询类型
ndk编译so库
Hash 这些知识你也应该知道
Hong Kong International Jewellery Show and Hong Kong International Diamond, Gem and Pearl Show kick off
TCP sticky packet unpacking problem + solution
Mysql主从延迟的原因和解决方案
基于KECA-IGWO-KELM的间歇过程故障诊断方法
(2022杭电多校六)1012-Loop(单调栈+思维)
腾讯实习总结
Mysql master-slave delay reasons and solutions
Technical Analysis Patterns (11) How to Trade Head and Shoulders Patterns
Day9 of Hegong Daqiong team vision team training - camera calibration
在STM32中使用printf函数