当前位置:网站首页>11.1.2 overview of Flink_ Wordcount case
11.1.2 overview of Flink_ Wordcount case
2022-06-26 00:26:00 【Loves_ dccBigData】
WordCount Case study
flink The operator in is a stateful operator
— same key Be assigned to the same task in
— Note the error of implicit conversion
import org.apache.flink.streaming.api.scala._
object Demo01WordCount {
def main(args: Array[String]): Unit = {
// establish flink Environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
/**
* Use nc To simulate streaming data
* yum install nc
* nc -lk 8888
*/
// Set parallelism , If it is not set , Different data are different task in ,12 Nuclear is 12 Before a number
// The data is divided into different cores to calculate the data
env.setParallelism(1)
// Read scoket The data of
val lineDS: DataStream[String] = env.socketTextStream("master", 8888)
// Take the data apart
val wordsDS: DataStream[String] = lineDS.flatMap(_.split(","))
// Set the number after the data to 1
val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))
// Use KeyBy Grouping , Put different data in different places reduce task in
val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)
// Sum up , Sum the second element , Incoming subscript , Divide by subscript data , Subscript from 0 Start
// There are state operators
val countDS: DataStream[(String, Int)] = keyByDS.sum(1)
// Printout
countDS.print()
// start-up , Execution flow ,7*24 Perform the task in hours
env.execute()
}
}
边栏推荐
猜你喜欢

《SQL优化核心思想》

86.(cesium篇)cesium叠加面接收阴影效果(gltf模型)

The development context of Ba Kong Yuan universe industry

CaMKIIa和GCaMP6f是一样的嘛?

Cloud rendering and Intel jointly create the "core" era of cloud rendering

学习识别对话式问答中的后续问题

7. common instructions (Part 2): common operations of v-on, v-bind and V-model

When installing PSU /usr/bin/ld:warning: -z lazload ignore

10.2.3、Kylin_ The dimension is required for kylin

Explain from a process perspective what happens to the browser after entering a URL?
随机推荐
Farsync simple test
CaMKIIa和GCaMP6f是一样的嘛?
Camkiia et gcamp6f sont - ils les mêmes?
oracle RAC 集群无法启动
SSL unresponsive in postman test
安装PSU的时候/usr/bin/ld:warning: -z lazyload ignore
MySQL master-slave replication
Thrift getting started
深圳台电:联合国的“沟通”之道
Servlet response下载文件
2021-04-28
Darkent2ncnn error
删库跑路、“投毒”、改协议,开源有哪几大红线千万不能踩?
Comprehensive introduction to Simulink solver
基于OpenVINOTM开发套件“无缝”部署PaddleNLP模型
14.1.1 promethues monitoring, four data types metrics, pushgateway
Resolve thread concurrency security issues
Ffmpeg version switching
redux工作流程讲解+小例子
在同一台机器上部署OGG并测试