当前位置:网站首页>Spark Learning Notes (IV) -- spark core programming RDD
Spark Learning Notes (IV) -- spark core programming RDD
2022-07-27 03:24:00 【One's cow】
Catalog
RDD Definition & characteristic
RDD Parallelism and partitioning
RDD Definition & characteristic
RDD(Resilient Distributed Dataset) It's called elastic distributed data sets , yes Spark The most basic data processing model in . It represents a flexible 、 immutable 、 Divisible 、 A set of elements that can be calculated in parallel .
RDD characteristic :
(1) elastic ;
1) Storage flexibility : Automatic switch between memory and disk ;
2) The resilience of fault tolerance : Data loss can be recovered automatically ;
3) Elasticity of calculation : Calculation error retrial mechanism ;
4) The elasticity of slices : It can be re sliced as needed .
RDD Processing flow
Execution principle

Simple RDD Processing flow

wordcount Processing flow
wordcount Code
def main(args: Array[String]): Unit = {
// establish spark Run configuration object
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
// Establish connection object
val sc:SparkContext = new SparkContext(sparkConf)
// Read the file
val fileRDD: RDD[String] = sc.textFile("datas")
// participle
val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))
// Data structure transformation
val word2RDD: RDD[(String, Int)] = wordRDD.map((_,1))
// Group aggregation
val word2count: RDD[(String, Int)] = word2RDD.reduceByKey(_+_)
// Collect results to memory
val wordend: Array[(String, Int)] = word2count.collect()
// Print
word2count.foreach(println)
// close spark
sc.stop()
}RDD Core attributes
(1) Partition list
RDD Partition list exists in data structure , Used for parallel computing when executing tasks , It is an important attribute to realize distributed computing .
(2) Partition calculation function
RDD Basic programming
establish RDD
1) From the collection / Create... In memory RDD
Create... From the collection RDD,Spark It mainly provides two methods :parallelize and makeRDD;
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
// Create from memory RDD, The data source is memory data
//Spark It mainly provides two methods :parallelize and makeRDD
val seq1 = Seq[Int](1,2,3,4)
val seq2 = Seq[Int](2,4,6,8)
//parallelize
val rdd1: RDD[Int] = sc.parallelize(seq1)
//makeRDD
val rdd2: RDD[Int] = sc.makeRDD(seq2)
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)
// Shut down the environment
sc.stop()
}
2) Create from external files RDD
Create from a dataset of an external storage system RDD; Including the local file system and all Hadoop Supported datasets , such as HDFS、HBase;
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
// Create from a file , The data source is a file
//path The default path is the root path of the current environment , You can write absolute paths or relative paths
val rdd: RDD[String] = sc.textFile("datas/word1.txt")
//sc.textFile("D:\\spark\\ec\\word.txt") Local file path
//sc.textFile("hdfs://hadoop01:8020/datas/word.txt") Distributed storage path
//sc.textFile("datas") Directory path
//sc.textFile("datas/*.txt") Wildcard path
//sc.wholeTextFiles("datas")
// Read data in file units , The results are expressed as tuples , The first is the file path , The second is content
rdd.collect().foreach(println)
// Shut down the environment
sc.stop()
}
3) From the other RDD establish RDD
Through one RDD After the calculation , Generate new RDD;
example :wordcount Program
4) Create directly RDD
Use new It's a way to construct RDD, Generally by Spark The framework itself uses .
RDD Parallelism and partitioning
Spark Divide a job into multiple tasks , Send to Executor Node parallel computing , The number of tasks that can be processed in parallel is called parallelism .
This number can be built in RDD When you specify . The number of tasks executed in parallel here does not refer to the number of segmentation tasks .
example : Cut the task into 100 individual , The number of tasks is 100, The degree of parallelism is 20, The number of tasks executed in parallel is 20.
Memory
When reading memory data , Data can be partitioned according to the setting of parallelism .
def main(args: Array[String]): Unit = {
// Environmental preparation
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
//sparkConf.set("spark.default.parallelism","4") To configure 4 Zones ( Mode one )
val sc = new SparkContext(sparkConf)
// establish RDD
// Parallelism & Partition
//sc.makeRDD(List(1,2,3,4,5,6,7,8),4) To configure 4 Zones ( Mode two )
val rdd = sc.makeRDD(List(1,2,3,4,5,6,7,8),2)
rdd.saveAsTextFile("out")
// Shut down the environment
sc.stop()
}

file
When reading file data , The data is based on Hadoop File read rules slice partition .

def main(args: Array[String]): Unit = {
// Create an environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
//textFile Use files as data sources , It can be partitioned by default
//minPartitions: Minimum number of partitions
//math.min(defaultParallelism, 2) The default partition is 2
//val rdd = sc.textFile("datas/word1.txt") Default partition
//val rdd = sc.textFile("datas/word1.txt",3) Set the minimum number of partitions to 3
val rdd = sc.textFile("datas/word1.txt",3)
// Partition calculation method :totalSize=7,goalSize = 7/3=2...1 3 Partition notes :7 Is the number of bytes
//val rdd = sc.textFile("datas/word1.txt",3)
//totalSize=9,goalSize = 7/4 =1....3 2+1=3 Partition
rdd.saveAsTextFile("output1")
// Shut down the environment
sc.stop()
} 
RDD Conversion operator
RDD Action operator
边栏推荐
- shell awk
- HCIP第十四天笔记
- Data Lake (20): Flink is compatible with iceberg, which is currently insufficient, and iceberg is compared with Hudi
- 【flask】服务端获取客户端的请求头信息
- 排列与二进制(吉,大)(DAY 84)
- 带你了解什么是 Web3.0
- Bulk copy baby upload prompt garbled, how to solve?
- How to uniquely identify a user SQL in Youxuan database cluster
- 《稻盛和夫给年轻人的忠告》阅读笔记
- day6
猜你喜欢

【树链剖分】2022杭电多校2 1001 Static Query on Tree

Idea 中添加支持@Data 插件

JMeter distributed pressure measurement

Activiti5.22.0 extension supports domestic databases, taking gbase database as an example

Code practice when the queue reaches the maximum length

Explain详解

Explain

图解 SQL,这也太形象了吧!

vector 转 svg 方法
![[从零开始学习FPGA编程-54]:高阶篇 - 基于IP核的FPGA开发-PLL锁相环IP核的原理与配置(Altera)](/img/4f/f75cfeb4422120ef9ac70cdeb0a840.png)
[从零开始学习FPGA编程-54]:高阶篇 - 基于IP核的FPGA开发-PLL锁相环IP核的原理与配置(Altera)
随机推荐
Explain
Call jshaman's Web API interface to realize JS code encryption.
spark学习笔记(五)——sparkcore核心编程-RDD转换算子
PyCharm中Debug模式进行调试详解
积分发放带给商家的两个帮助
An error in the fourth edition of the red book?
vector 转 svg 方法
How to visit the latest version of burpsuite pro in vain
【树链剖分】模板题
docker 创建mysql 8.x容器,支持mac ,arm架构芯片
day6
Comprehensive care analysis lyriq Ruige battery safety design
Leetcode 207. curriculum (July 26, 2022)
The diagram of user login verification process is well written!
常见弱口令大全
Abbkine AbFluor 488 细胞凋亡检测试剂盒特点及实验建议
Mysql: summary of common sub database and sub table schemes of Internet companies
opiodr aborting process unknown ospid (21745) as a result of ORA-609
技术风向标 | 云原生技术架构成熟度模型解读
Activiti5.22.0扩展支持达国产数据库,以GBase据库为例