当前位置:网站首页>RDD basic knowledge points
RDD basic knowledge points
2022-06-24 07:24:00 【Andy86666】
The third chapter :RDD

List of articles
- The third chapter :RDD
- One 、 Create... From existing data in memory RDD
- ( One )、 Set creation in basic data structure RDD
- ( Two )、 Will already exist RDD Convert to a new RDD
- 1、map Conversion data —— Conversion operation
- 2、flatMap Conversion data —— Conversion operation
- 3、sortBy Sort —— Conversion operation
- 4、collect() Inquire about —— Action operations ( Return type : Array )
- 5、take() Query some values —— Action operations , Return type : The number
- 6、union() Merge multiple RDD—— Conversion operation
- 7、filter() To filter —— Conversion operation , Return value :Boolean type
- 8、 Use distinct() duplicate removal
- 9、 Set operations
- Key value pair RDD
- 10、 Create key value pairs RDD
- 11、keys and values—— Conversion operation
- 12、reduceByKey() —— Conversion operation
- 13、groupByKey()—— Conversion operation
- Insert picture description here 14、join() Connect two RDD
- 15、zip Combine two RDD
- 16 、combineByKey Merge values of the same key
- 17、lookup (Key) Find the value of the specified key
- Two 、 Create... From external storage RDD
RDD: One Fault tolerant 、 read-only 、 Parallel operation is possible Of data structure , It is a kind of distributed in each node of the cluster == A collection of storage elements ==
RDD Yes 3 There are two ways to create
- Parallelize the set in the basic data structure existing in the program
- To what already exists RDD Convert to a new RDD
- Directly read the data set stored externally
One 、 Create... From existing data in memory RDD
( One )、 Set creation in basic data structure RDD
1、parallelize
parallelize There are two parameters
- Seq aggregate ( finger : Object data that can be accessed iteratively )
- Partition number ( Default : For the sake of Application Assigned to resources CPU Count )
val data = Array(1,2,3,4,5)
val disData = sc.parallelize(data)
disData.partitions.size
val disData =sc.parallelize(data,3)
// Check the number of partitions
disData.partitions.size

2、makeRDD
makeRDD Parameter method and parallelize equally
( Two )、 Will already exist RDD Convert to a new RDD

1、map Conversion data —— Conversion operation
Belong to : Conversion operation
map Is a kind of basic RDD Conversion operation , Is used to RDD Each data element in the is passed through Some kind of function Transform and generate a new RDD, But not immediately .
because RDD Is read-only 、 Immutable , Therefore, after modification , It is bound to generate new RDD
eg: take 5 Data are squared
val data=sc.parallelize(List(1,2,4,6))
val now_data=data.map(x=>x*x)
now_data.collect

2、flatMap Conversion data —— Conversion operation
Belong to : Conversion operation
flatMap Carry out map, Proceed again flat( Flattening )
eg: Do sentence segmentation
val data=sc.parallelize(List("hello i am jacky","I like spark"))
data.collect
// Be careful :map and flatMap The difference between
data.map(x=>x.split(" ")).collect
data.flatMap(x=>x.split(" ")).collect

3、sortBy Sort —— Conversion operation
Belong to : Conversion operation
sortBy() Yes 3 Parameters
- The first argument is a function f:(T)=>K, On the left is each element in the object to be sorted , On the right is the returned value, which is the value to be sorted in the element
- The second parameter is ascending, The default is true It's in ascending order ,false For the descending order
- The third parameter is numPartitions Partition number , This parameter determines the sort order RDD The number of partitions .
- Be careful : By default, the number of partitions after sorting is the same as that before sorting , namely this.partitions.size
eg: For the first element in the list , Sort in ascending order
val data=sc.parallelize(List((1,3),(5,4),(2,6)))
val sortData=data.sortBy(x=>x._1,true,2).collect

4、collect() Inquire about —— Action operations ( Return type : Array )
Belong to : Action operations
return : Array
collect() hold RDD All elements are converted to Array And back to Driver In the middle , Suitable for Small data set,
Be careful : If the data set used to view is large ,Driver Will load data into memory , It can lead to Driver End memory overflow .
Method 1 :
Go straight back to RDD All elements in , The return type is numeric
Code :
val data=sc.parallelize(List(1,23,4))
data.collect
result :
scala> val data=sc.parallelize(List(1,23,4))
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at <console>:24
scala> data.collect
res25: Array[Int] = Array(1, 23, 4)
Method 2 :collect[U:ClassTag](f:PartialFunction[T,U]):RDD[U]
This usage is less , A standard Partial function
eg: Put gender F、M Replace 1,0
Code :
val one:PartialFunction[Char,Int]={case 'F'=>1;case 'M'=>0}
val data=sc.parallelize(List('F','M','F'))
data.collect(one)
Be careful :one:PartialFunction[Char,Int] , The input data type is Char, The output data type is Int
result :
scala> val one:PartialFunction[Char,Int]={case 'F'=>1;case 'M'=>0}
one: PartialFunction[Char,Int] = <function1>
scala> val data=sc.parallelize(List('F','M','F'))
data: org.apache.spark.rdd.RDD[Char] = ParallelCollectionRDD[31] at parallelize at <console>:24
scala> data.collect(one)
res27: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[33] at collect at <console>:28
scala> data.collect(one).collect
res28: Array[Int] = Array(1, 0, 1)

5、take() Query some values —— Action operations , Return type : The number
Belong to : Action operations
Return value : value type
take(N) Is to obtain RDD Before N Elements , The return type is array .
Be careful :collect and take(N) difference :collect Is to output all data sets ,take(N) Is before output N Data
eg:
val data=sc.makeRDD(1 to 10)
data.take(5)
result :
scala> val data=sc.makeRDD(1 to 10)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at <console>:24
scala> data.take(5)
res29: Array[Int] = Array(1, 2, 3, 4, 5)

6、union() Merge multiple RDD—— Conversion operation
Belong to : Conversion operation
Used to put two RDD The elements of are merged into one , No weight removal operation , And two RDD The value in each element The number and element type should be consistent
Be careful :union Data will not be de duplicated , Instead, they are directly spliced together
eg:
val data=sc.parallelize(List(('a',1),('b',2),('c',3)))
val data1=sc.makeRDD(List(('d',1),('e',2),('a',3),('a',3)))
data.union(data1)
data.union(data1).collect
result :
scala> val data=sc.parallelize(List(('a',1),('b',2),('c',3)))
data: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[42] at parallelize at <console>:24
scala> val data1=sc.makeRDD(List(('d',1),('e',2),('a',3),('a',3)))
data1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[43] at makeRDD at <console>:24
scala> data.union(data1)
res33: org.apache.spark.rdd.RDD[(Char, Int)] = UnionRDD[44] at union at <console>:28
scala> data.union(data1).collect
res34: Array[(Char, Int)] = Array((a,1), (b,2), (c,3), (d,1), (e,2), (a,3), (a,3))

7、filter() To filter —— Conversion operation , Return value :Boolean type
attribute : Conversion operation
Return value :Boolean type
filter You need an argument , Parameter is a function that uses filtering , The function returns Boolean Type of
eg: Filter out greater than 2
val data=sc.parallelize(List(('a',1),('c',3),('d',2)))
data.filter(x=>x._2>2).collect
data.filter(_._2>2).collect
result :
scala> val data=sc.parallelize(List(('a',1),('c',3),('d',2)))
data: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[51] at parallelize at <console>:24
scala> data.filter(x=>x._2>2).collect
res38: Array[(Char, Int)] = Array((c,3))
scala> data.filter(_._2>2).collect
res39: Array[(Char, Int)] = Array((c,3))

8、 Use distinct() duplicate removal
Belong to : Conversion operation
effect : Remove identical elements
eg: Remove the same elements
val data=sc.parallelize(List(('a',1),('c',3),('d',2),('a',1),('c',3),('d',2)))
data.distinct()
data.distinct().collect
result :
scala> val data=sc.parallelize(List(('a',1),('c',3),('d',2),('a',1),('c',3),('d',2)))
data: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[58] at parallelize at <console>:24
scala> data.distinct()
res43: org.apache.spark.rdd.RDD[(Char, Int)] = MapPartitionsRDD[61] at distinct at <console>:26
scala> data.distinct().collect
res44: Array[(Char, Int)] = Array((d,2), (c,3), (a,1))

9、 Set operations
Belong to : Conversion operation
- intersection() : Find out two RDD Common elements of ( intersection )
- union: Merge two RDD( Combine )
- subtract(): Remove RDD The same in ( Complement set )
- cartesian(): Two, please RDD Cartesian product of
(1)intersection()
Find the intersection
val data=sc.parallelize(List(('a',1),('w',3),('a',3)))
val data1=sc.parallelize(List(('a',1),('b',1)))
data.intersection(data1)
data.intersection(data1).collect
result :
scala> val data=sc.parallelize(List(('a',1),('w',3),('a',3)))
data: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val data1=sc.parallelize(List(('a',1),('b',1)))
data1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> data.intersection(data1)
res0: org.apache.spark.rdd.RDD[(Char, Int)] = MapPartitionsRDD[7] at intersection at <console>:28
scala> data.intersection(data1).collect
res1: Array[(Char, Int)] = Array((a,1))

(2)subtract()
Find the complement
val data=sc.parallelize(List(('a',1),('b',2),('d',2)))
val data1=sc.parallelize(List(('b',2),('c',2)))
data.subtract(data1)
data.subtract(data1).collect
result :
scala> val data=sc.parallelize(List(('a',1),('b',2),('d',2)))
data: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[14] at parallelize at <console>:24
scala> val data1=sc.parallelize(List(('b',2),('c',2)))
data1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[15] at parallelize at <console>:24
scala> data.subtract(data1)
res2: org.apache.spark.rdd.RDD[(Char, Int)] = MapPartitionsRDD[19] at subtract at <console>:28
scala> data.subtract(data1).collect
res3: Array[(Char, Int)] = Array((a,1))

Key value pair RDD
10、 Create key value pairs RDD
Method 1 : When the key value reads data of type , Directly return the key value pairs PairRDD
Method 2 : Use map() Function to operate
Method 3 :zip() function ==》 But the number of partitions 、 The number of elements is the same
eg: Use the first word of the statement as Key, Sentence as value
val data=sc.parallelize(List("this is test","play an import in","parallelize makeRDD map flatMap filter collect take(N) union sortBy(data,true,numPartitions) distinct()"))
val words=data.map(x=>(x.split(" ")(0),x))
words.collect
result :
scala> val data=sc.parallelize(List("this is test","play an import in","parallelize makeRDD map flatMap filter collect take(N) union sortBy(data,true,numPartitions) distinct()"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val words=data.map(x=>(x.split(" ")(0),x))
words: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[1] at map at <console>:25
scala> words.collect
res0: Array[(String, String)] = Array((this,this is test), (play,play an import in), (parallelize,parallelize makeRDD map flatMap filter collect take(N) union sortBy(data,true,numPartitions) distinct()))

11、keys and values—— Conversion operation
Belong to : Conversion operation
- keys Returns only those that contain the key RDD
- values Returns only the containing values RDD
val data=sc.parallelize(List("this is test","play an import in","parallelize makeRDD map flatMap filter collect take(N) union sortBy(data,true,numPartitions) distinct()"))
val words=data.map(x=>(x.split(" ")(0),x))
val key=words.keys
key.collect
val value=words.values
value.collect
result :
scala> val data=sc.parallelize(List("this is test","play an import in","parallelize makeRDD map flatMap filter collect take(N) union sortBy(data,true,numPartitions) distinct()"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> val words=data.map(x=>(x.split(" ")(0),x))
words: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[3] at map at <console>:25
scala> val key=words.keys
key: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at keys at <console>:25
scala> key.collect
res1: Array[String] = Array(this, play, parallelize)
scala> val value=words.values
value: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at values at <console>:25
scala> value.collect
res2: Array[String] = Array(this is test, play an import in, parallelize makeRDD map flatMap filter collect take(N) union sortBy(data,true,numPartitions) distinct())
12、reduceByKey() —— Conversion operation
reduceByKey() The function is == Merge values with the same key ==
Be careful : Only right value To deal with
reduceByKey Process flow : Will a Key The first two of Value In the incoming function , After processing, new value, This value It will be the same as the next one k Of value Passed into the function , Until there is only one value .
eg: Will be the same k The values of are merged together
Method 1 :
val data=sc.parallelize(List(('a',1),('c',2),('e',2),('d',1),('a',2),('c',1)))
data.map(x=>(x._1,x._2))
val reduce=data.reduceByKey((a,b)=>a+b)
reduce.collect
result :
scala> val data=sc.parallelize(List(('a',1),('c',2),('e',2),('d',1),('a',2),('c',1)))
data: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24
scala> data.map(x=>(x._1,x._2))
res3: org.apache.spark.rdd.RDD[(Char, Int)] = MapPartitionsRDD[7] at map at <console>:26
scala> val reduce=data.reduceByKey((a,b)=>a+b)
reduce: org.apache.spark.rdd.RDD[(Char, Int)] = ShuffledRDD[8] at reduceByKey at <console>:25
scala> reduce.collect
res4: Array[(Char, Int)] = Array((d,1), (e,2), (a,3), (c,3))
Method 2 :
val data=sc.parallelize(List(('a',1),('c',2),('e',2),('d',1),('a',2),('c',1)))
val reduce=data.reduceByKey((a,b)=>a+b)
reduce.collect
result :
scala> val data=sc.parallelize(List(('a',1),('c',2),('e',2),('d',1),('a',2),('c',1)))
data: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[9] at parallelize at <console>:24
scala> val reduce=data.reduceByKey((a,b)=>a+b)
reduce: org.apache.spark.rdd.RDD[(Char, Int)] = ShuffledRDD[10] at reduceByKey at <console>:25
scala> reduce.collect
res5: Array[(Char, Int)] = Array((d,1), (e,2), (a,3), (c,3))
13、groupByKey()—— Conversion operation
Belong to : Conversion operation
groupByKey() Will be the same Key Group values of
groupByKey() Got RDD The type is [Key,Iterable[Value]].
Grouping is often used to count data for the same group 、 Statistics, etc
eg: Count the number of each group
val data=sc.parallelize(List(('a',1),('c',2),('e',2),('d',1),('a',2),('c',1)))
val data1=data.groupByKey()
data1.collect
data1.map(x=>(x._1,x._2.size)).collect
result :
scala> val data=sc.parallelize(List(('a',1),('c',2),('e',2),('d',1),('a',2),('c',1)))
data: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:24
scala> val data1=data.groupByKey()
data1: org.apache.spark.rdd.RDD[(Char, Iterable[Int])] = ShuffledRDD[4] at groupByKey at <console>:25
scala> data1.collect
res2: Array[(Char, Iterable[Int])] = Array((d,CompactBuffer(1)), (e,CompactBuffer(2)), (a,CompactBuffer(1, 2)), (c,CompactBuffer(2, 1)))
scala> data1.map(x=>(x._1,x._2.size)).collect
res3: Array[(Char, Int)] = Array((d,1), (e,1), (a,2), (c,2))
14、join() Connect two RDD
The connection is divided into : Right connection 、 The left outer join 、 Full outer join 、 Internal connection
- join Internal connection ( Put two RDD The middle key is the same Value in a tuple in , Only two... Are returned RDD in All existing keys The connection result of )
- rightOuterJoin Right connection ( It will return the results of all key connections of the right connection )
- leftOuterJoin The left outer join ( All results of the left join will be returned )
- fullOuterJoin Full outer join ( Two connections will be preserved == All keys == All the results )
(1)join—— same Key
val rdd1=sc.parallelize(List(('a',1),('b',2)))
val rdd2=sc.parallelize(List(('d',1),('a',1)))
rdd1.join(rdd2)
result :
scala> val rdd1=sc.parallelize(List(('a',1),('b',2)))
rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24
scala> val rdd2=sc.parallelize(List(('d',1),('a',1)))
rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> rdd1.join(rdd2)
res4: org.apache.spark.rdd.RDD[(Char, (Int, Int))] = MapPartitionsRDD[10] at join at <console>:28
scala> rdd1.join(rdd2).collect
res5: Array[(Char, (Int, Int))] = Array((a,(1,1)))

(2)leftOuterJoin
val rdd1=sc.parallelize(List(('a',1),('b',2)))
val rdd2=sc.parallelize(List(('d',1),('a',1)))
rdd1.leftOuterJoin(rdd2).collect
[ Failed to transfer the external chain picture , The origin station may have anti-theft chain mechanism , It is suggested to save the pictures and upload them directly (img-pTo2p2Ck-1655809429139)(C:/Users/%E6%80%9D%E9%9D%99/AppData/Roaming/Typora/typora-user-images/image-20220619215610815.png)]
If there is the same key,some type , without , Then for None
(3)fullOuterJoin
val rdd1=sc.parallelize(List(('a',1),('b',2)))
val rdd2=sc.parallelize(List(('d',1),('a',1)))
rdd1.fullOuterJoin(rdd2).collect
result :
scala> val rdd1=sc.parallelize(List(('a',1),('b',2)))
rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:24
scala> val rdd2=sc.parallelize(List(('d',1),('a',1)))
rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[11] at parallelize at <console>:24
scala> rdd1.fullOuterJoin(rdd2).collect
res2: Array[(Char, (Option[Int], Option[Int]))] = Array((d,(None,Some(1))), (a,(Some(1),Some(1))), (b,(Some(2),None)))
15、zip Combine two RDD
requirement :partitions Same number of partitions , Same number of elements
val rdd1=sc.parallelize(1 to 4,2)
val rdd2=sc.parallelize(List("a","b","c","d"),2)
rdd1.zip(rdd2)
rdd1.zip(rdd2).collect

16 、combineByKey Merge values of the same key
combinByKey It is the base of other high-order key value pairs , All depend on it to realize eg:reduceByKey 、groupByKey etc.
combinByKey Used to transfer data of the same key polymerization , It also allows the return value with a different return type from the input value type .
combineByKey The definition of :
combineByKey(createCombiner,mergeValue,mergeCombiners,numPartitions=None)
createCombiner:V=>C,V Is the value of the key value pair , Convert the value to another type C,C Will be used as the initial value of the accumulator for each pair of keysmergeValue:(C,V)=>C, This function puts the element V Merge to previous elements C On ( This operation is performed in each partition )mergeCombiners:(C:C)=>C, This function takes two elements C A merger ( This operation is performed between different partitions )
Supplementary knowledge :
Because the aggregation operation will traverse all the elements in the partition , So there are only two kinds of bonds for each element : It hasn't happened before 、 It has appeared before
(1) If it doesn't happen , execute createCombiner Method , Otherwise execution mergeValue Method
(2) For keys that have already appeared , call mergeValue A merger , The original C Type and now V Type
(3) Be careful :mergeValue Is carried out in different partitions , So the same Key There are multiple accumulators , Need to pass through mergeCombiners Merge different partition data
17、lookup (Key) Find the value of the specified key
lookup(key) Act on Key/Value Type of RDD On , Returns the specified Key All of the Value value
eg:
val data=sc.parallelize(Array(("a",1),("c",1),("a",2))
data.lookup("a")
result :
scala> val data=sc.parallelize(Array(("a",1),("c",1),("a",2)))
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[25] at parallelize at <console>:24
scala> data.lookup("a")
res8: Seq[Int] = WrappedArray(1, 2)
Two 、 Create... From external storage RDD
Create... From external storage RDD It refers to directly reading a data file stored in the file system and creating RDD.
adopt SparkContext Object's textFile Method to read data , And set the number of partitions
(1) Read HDFS Documents in
val data =sc.textFile("hdfs://spark1:9000/word.txt")
// The statistical number of rows
data.count
// Show everything
data.collect

(2) Read Linux Documents in
Be careful : In the reading linux When you file , It needs to be preceded by **file://**
val data=sc.textFile("file:///home/bigdata/test.txt")
data.count

边栏推荐
- JVM debugging tool -jvisualvm
- 利用微搭低代码实现级联选择
- OMX的初始化流程
- [image fusion] image fusion based on pseudo Wigner distribution (PWD) with matlab code
- MySQL - three tables (student, course, score) to query the name, number and score of students whose course is mathematics
- The fund management of London gold is more important than others
- 0 foundation a literature club low code development member management applet (5)
- Outils de débogage JVM - Arthas
- 基因检测,如何帮助患者对抗疾病?
- Bay area enterprises quick look! The focus of the data regulations of Shenzhen Special Economic Zone just released is coming!
猜你喜欢

What are the dazzling skills of spot gold?

二分专题训练
![[MRCTF2020]千层套路](/img/8e/d7b6e7025b87ea0f43a6123760a113.png)
[MRCTF2020]千层套路

取模软件 模拟显示验证取模数据正确性 逆向 把点阵数组bin文件转显示

Mysql---三张表(student,课程,分数) 查询课程为数学的学生姓名,编号,成绩

基因检测,如何帮助患者对抗疾病?

【图像融合】基于方向离散余弦变换和主成分分析的图像融合附matlab代码

MFC使用控制台时 项目路径中不能有空格和中文,否则会报错误 LNK1342 未能保存要编辑的二进制文件的备份副本等

如何删除/选择电脑上的输入法

Learning to use BACnet gateway of building control system is not so difficult
随机推荐
6000多万铲屎官,捧得出一个国产主粮的春天吗?
【小技巧】使用matlab的深度学习工具箱deepNetworkDesigner快速设计
伦敦金的资金管理比其他都重要
[image fusion] image fusion based on directional discrete cosine transform and principal component analysis with matlab code
前缀和专题训练
华为云图引擎服务
[image fusion] image fusion based on pseudo Wigner distribution (PWD) with matlab code
Functions in setinterval cannot have parentheses
在终端pip install xxx但在pycharm却no module named xxx
[WordPress website] 5 Set code highlight
[cnpm] tutorial
【图像特征提取】基于脉冲耦合神经网络(PCNN)实现图像特征提取含Matlab源码
【MySQL 使用秘籍】克隆数据表、保存查询数据至数据表以及创建临时表
Muxvlan principle, Huawei MUX VLAN experimental configuration
華為雲數據庫進階學習
The first common node of two linked lists_ The entry of the link in the linked list (Sword finger offer)
The latest crawler tutorial in 2021: video demonstration of web crawling
使用SystemParametersInfo访问用户界面设置
App management platform app host
The initial user names and passwords of Huawei devices are a large collection that engineers involved in Huawei business should keep in mind and collect!