当前位置:网站首页>Spark-day03-core programming RDD operator
Spark-day03-core programming RDD operator
2022-06-26 12:09:00 【There will always be daylight】
One :RDD operator
RDD Operators are also called RDD Method , There are two main categories . Transformation operator and action operator .

Two :RDD Conversion operator
According to different data processing methods, the operators are divided into value type 、 double value The type and key-value type
2.1:map Value conversion
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark01_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd = sc.makeRDD(List(1,2,3,4))
def mapFunction(num:Int):Int = {
num * 2
}
val mapRDD:RDD[Int] = rdd.map(mapFunction)
mapRDD.collect().foreach(println) //2 4 6 8
// Shut down the environment
sc.stop()
}
}
2.2:map Parallel effect display
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_Operator_Transform_Par {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
//1.rdd The calculation of , The data in a partition executes logic one by one
// Only after all the logic of the previous data is executed , To execute the next data
// The execution of the data in the partition is orderly
//2. The calculation of data in different partitions is out of order
val rdd = sc.makeRDD(List(1,2,3,4))
val mapRDD = rdd.map(
num => {
println(">>>>>>" + num)
num
}
)
val mapRDD1 = mapRDD.map(
num => {
println("#####" + num)
num
}
)
mapRDD1.collect()
// Shut down the environment
sc.stop()
}
}
2.3:mapPartitions: Data conversion operations can be performed in partition units
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark02_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd = sc.makeRDD(List(1,2,3,4),2)
val mapRDD:RDD[Int] = rdd.mapPartitions(
// The number of executions is the number of partitions
iter => {
println(">>>>>>>>")
iter.map(_*2)
}
)
mapRDD.collect().foreach(println) //2 4 6 8
// Shut down the environment
sc.stop()
}
}
2.4:map and mapPartitions The difference between
Data processing perspective
map: The execution of one data in a partition , Similar to serial operation .
mapParitions: Batch operations are performed on a partition by partition basis .
From a functional point of view
map: Transform and change the data in the data source , But it will not reduce or increase data
mapParitions: You need to pass an iterator , Returns an iterator , The number of elements not required remains the same , So you can increase or decrease data .
Performance perspective
map: Similar to serial operation , The performance is relatively low .
mapParitions: Similar to batch operation , Higher performance . But it takes up content for a long time , This will result in insufficient memory , A memory overflow error occurred . So with limited memory , It is not recommended to use . Use map operation .
2.5:mapParitionsWithIndex: Gets the specified partition
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark03_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
// Get the data in the second partition
val rdd = sc.makeRDD(List(1,2,3,4),2)
val mapRDD:RDD[Int] = rdd.mapPartitionsWithIndex(
(index,iter) => {
if (index == 1){
iter
}else{
Nil.iterator
}
}
)
mapRDD.collect().foreach(println) //2 4 6 8
// Shut down the environment
sc.stop()
}
}
2.6:flatmap: Flattening operation
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark04_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd:RDD[List[Int]] = sc.makeRDD(List(List(1,2),List(3,4)))
val flatRDD:RDD[Int] = rdd.flatMap(
list => {
list
}
)
flatRDD.collect().foreach(println)
// Shut down the environment
sc.stop()
}
}
2.6: Flattening exercise : Output 1,2,3,4,5
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
object Spark04_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd = sc.makeRDD(List(List(1,2),3,List(4,5)))
val flatRDD = rdd.flatMap(
date => {
date match {
case list:List[_] => list
case dat => List(dat)
}
}
)
flatRDD.collect().foreach(println)
// Shut down the environment
sc.stop()
}
}
2.7:glom
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark05_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd:RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
//List => Int
//Int => List
val glomRDD:RDD[Array[Int]] = rdd.glom()
glomRDD.collect().foreach(data=>println(data.mkString(",")))
// Shut down the environment
sc.stop()
}
}
2.8:glom Case study : Calculate the sum of the maximum values of all partitions , Maximum value in the partition , Sum the maximum value between partitions
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark06_RDD_Operator_Transform_Test {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd:RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
val glomRDD:RDD[Array[Int]] = rdd.glom()
val maxRDD:RDD[Int] = glomRDD.map(
array => {
array.max
}
)
println(maxRDD.collect().sum)
// Shut down the environment
sc.stop()
}
}
2.9 The meaning of partition invariance , The partition name and data are unchanged ,output Yes 00000 and 00001 Namely 1,2 and 3,4.output1 Yes 00000 and 00001, Namely 2,4 and 6,8.
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_Operator_Transform_Part {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd = sc.makeRDD(List(1,2,3,4),2)
rdd.saveAsTextFile("output")
//
val mapRDD = rdd.map(_*2)
mapRDD.saveAsTextFile("output1")
// Shut down the environment
sc.stop()
}
}
2.10:groupby, Grouping and partitioning are not necessarily related .groupby The data Upset , Back together (shuffle)
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark06_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd:RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
//groupby Each data in the data source will be grouped and judged , According to the returned group key Grouping
// same key Values are placed in a group
def groupFunction(num:Int):Int = {
num % 2
}
val groupRDD:RDD[(Int,Iterable[Int])] = rdd.groupBy(groupFunction)
groupRDD.collect().foreach(println)
// Shut down the environment
sc.stop()
}
}
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark06_RDD_Operator_Transform1 {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
// Group by initials , There is no necessary relationship between grouping and grouping
val rdd = sc.makeRDD(List("hello","spark","scala","hadoop"),2)
val groupRDD = rdd.groupBy(_.charAt(0))
groupRDD.collect().foreach(println)
// Shut down the environment
sc.stop()
}
}
2.11:filter. Data filtering operation . May cause data skew . For example, the previous two partitions , There were 1000 Data , After filtering , This may cause a large difference in the amount of data in the two partitions .
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark07_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd:RDD[Int] = sc.makeRDD(List(1,2,3,4))
val filter:RDD[Int] = rdd.filter(num=>num%2!=0)
filter.collect().foreach(println)
// Shut down the environment
sc.stop()
}
}
2.12:sample: Data extraction operations , We can judge what affects the data skew problem according to random sampling .
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark08_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10))
//sample The operator needs to pass three parameters
//1. First parameter representation , Whether to return the data after extracting the data true( Put back ),false( discarded )
//2. The second parameter represents ,
// If you don't put it back : The probability that each piece of data in the data source is extracted : The concept of reference value
// If you take it out and put it back , Represents the number of times each data in the data source is likely to be extracted
//3. The third parameter represents , The seed of random algorithm when extracting data
// If you don't pass the third parameter , Then use the current system time
println(rdd.sample(
false,
fraction = 0.4, // Probability of each value , It doesn't have to be 10 Data , There must be four pieces of data
seed = 1
).collect().mkString(","))
// Shut down the environment
sc.stop()
}
}
2.13:distinct: duplicate removal
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark09_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd:RDD[Int] = sc.makeRDD(List(1,2,3,4,1,2,3,4))
val rdd1:RDD[Int] = rdd.distinct()
rdd1.collect().foreach(println)
// Shut down the environment
sc.stop()
}
}
2.14:coalesce: After filtering , There may be a lot less data in the partition , Avoid waste of resources , Partitions can be reduced according to the amount of data , Used after big data set filtering , Improve the execution efficiency of small data sets ..
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark10_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),3)
//coalesce: By default, the data will not be disordered and recombined , Data in the same partition as before will not be , Assign to different partitions
// Reducing partitions in this case may lead to data imbalance , Skew data
// If you want to balance the data , have access to shuffle, The second parameter represents the use of shuffle
val newRDD:RDD[Int] = rdd.coalesce(2,true)
newRDD.saveAsTextFile("output")
// Shut down the environment
sc.stop()
}
}
2.15:repartition: Expansion of zoning
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark11_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),2)
//coalesce: You can expand the partition , But pay attention to , Whether the partition expansion needs to be disrupted again , if necessary , The second parameter needs to be changed to true, Use shuffle
val newRDD:RDD[Int] = rdd.repartition(3)
newRDD.saveAsTextFile("output")
// Shut down the environment
sc.stop()
}
}
2.16:sortby: Default
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark12_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd:RDD[Int] = sc.makeRDD(List(6,2,4,5,3,1),2)
//sortby The default is ascending , The second parameter can change the sorting method
//sortby By default , The partition will not be changed , But in the middle shuffle operation
val newRDD:RDD[Int] = rdd.sortBy(num=>num)
newRDD.saveAsTextFile("output")
// Shut down the environment
sc.stop()
}
}
2.17: double value type : intersection , Combine , Difference sets and zippers
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark13_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD double value Type of operation , Two rdd
val rdd1 = sc.makeRDD(List(1,2,3,5))
val rdd2 = sc.makeRDD(List(3,4,5,6))
// intersection 、 Combine 、 The difference set needs to keep the data types of the two data sources consistent
// Zipper operation can cause two data sources to have different data types , The zipper requires that the number of partitions of the two data sources be consistent , The content in the data source should also be consistent
// intersection [3,4]
val rdd3:RDD[Int] = rdd1.intersection(rdd2)
println(rdd3.collect().mkString(","))
// Combine [1,2,3,4,3,4,5,6]
val rdd4:RDD[Int] = rdd1.union(rdd2)
println(rdd4.collect().mkString(","))
// Difference set rdd1 The angle of ===>[1,2]
val rdd5:RDD[Int] = rdd1.subtract(rdd2)
println(rdd5.collect().mkString(","))
// zipper [1-3,2-4,3-5,4-6]
val rdd6:RDD[(Int, Int)] = rdd1.zip(rdd2)
println(rdd6.collect().mkString(","))
// Shut down the environment
sc.stop()
}
}
2.18: Key type :paritionby. Comparator , Change the location of data storage .
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark14_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD double value Type of operation , Two rdd
val rdd = sc.makeRDD(List(1,2,3,4),2)
val mapRDD = rdd.map((_,1))
mapRDD.partitionBy(new HashPartitioner(2))
.saveAsTextFile("output")
// Shut down the environment
sc.stop()
}
}
2.19:reduceByKey: same key data , Conduct value Aggregation of data
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
object Spark15_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD double value Type of operation , Two rdd
val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("a",4)))
// same key The data of , Conduct value Aggregation of data
//reduceByKey If key There's only one data for , It's not going to take part in the calculation
val reduceRDD = rdd.reduceByKey((x:Int,y:Int) => {x+y})
reduceRDD.collect().foreach(println)
// Shut down the environment
sc.stop()
}
}
2.20:groupByKey: The number in the data source , identical key The data are grouped together , Form a dual tuple
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark16_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD double value Type of operation , Two rdd
val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",4)))
//groupByKey: Put the data in the data source , identical key The data are grouped together , Form a dual tuple
// The first element in a tuple is key, The second element in a tuple is the same key Of value Set
val groupRDD:RDD[(String,Iterable[Int])] = rdd.groupByKey()
groupRDD.collect().foreach(println)
// Shut down the environment
sc.stop()
}
}
2.21:reduceByKey and groupByKey The difference between
performance :reduceByKey be better than groupByKey
reduceByKey Support the function of pre aggregation within partitions , Can effectively reduce shuffle The amount of data falling on the disk
function :groupByKey be better than reduceByKey
Let's say we only need to group , No aggregation is required 

2.22: aggregateByKey: Different operations can be performed within and between partitions
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark17_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD double value Type of operation , Two rdd
val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",4)),2)
//aggregateByKey There is a function coriolisation , There are two parameter lists
// The first parameter list , You need to pass a parameter , Expressed as the initial value
// Mainly used when meeting the first key When , and value Perform intra zone calculations
// The second parameter list needs to pass 2 Parameters
// The first parameter represents the calculation rule in the partition
// The second parameter represents the calculation rules between partitions
rdd.aggregateByKey(0)(
(x,y) => math.max(x,y),
(x,y) => x + y
).collect.foreach(println)
// Shut down the environment
sc.stop()
}
}


2.23:foldByKey: The calculation rules are the same within and between partitions
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
object Spark17_RDD_Operator_Transform2 {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD double value Type of operation , Two rdd
val rdd = sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),2)
rdd.foldByKey(0)(_+_).collect.foreach(println)
// Shut down the environment
sc.stop()
}
}
2.24:combineByKey

val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93),
("a", 95), ("b",
val input: RDD[(String, Int)] = sc.makeRDD(list, 2)
val combineRdd: RDD[(String, (Int, Int))] = input. combineByKey
(_,
(acc: (Int, Int), v) => (acc._1 + v, acc._2 +
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2.25:reduceByKey、foldByKey、aggregateByKey、combineByKey The difference between
reduceByKey: identical key The first data of does not do any calculations , The calculation rules are the same within and between partitions
FoldByKey: identical key The first data and initial value of the , The calculation rules are the same within and between partitionsAggregateByKey: identical key The first data and initial value of the , The calculation rules within and between partitions can be different
CombineByKey: When calculating , When it is found that the data structure does not meet the requirements , You can have the first data transform structure . The calculation rules within and between partitions are different .
2.26:join: Data from two different data sources , identical key Of value Will be connected together , Form tuples
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark19_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD double value Type of operation , Two rdd
val rdd1 = sc.makeRDD(List(("a",1),("b",2),("c",3)))
val rdd2 = sc.makeRDD(List(("b",4),("c",5),("a",6)))
//join: Data from two different data sources , same key Of value Will be connected together , Form tuples
// If one of the two data sources key No match , So the data doesn't appear in the results , There are... In a data source m strip , There are... In a data source n strip , The number of matches is n*m
val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
joinRDD.collect().foreach(println)
// Shut down the environment
sc.stop()
}
}
2.27:left outer join: Be similar to sql The left outer connection of // right outer join
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark20_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD double value Type of operation , Two rdd
val rdd1 = sc.makeRDD(List(("a",1),("b",2),("c",3)))
val rdd2 = sc.makeRDD(List(("a",4),("b",5)))
val joinRDD = rdd1.leftOuterJoin(rdd2)
joinRDD.collect().foreach(println)
// Shut down the environment
sc.stop()
}
}
2.28:cogroup: Same in the same data source key Put in a group , And then connect with other data sources
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
object Spark21_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD double value Type of operation , Two rdd
val rdd1 = sc.makeRDD(List(("a",1),("b",2)))
val rdd2 = sc.makeRDD(List(("a",4),("b",5),("c",6),("c",7)))
// cogroup connect + group Through... In a data source same key Put in a group , And then connect with other data sources
val cgRDD = rdd1.cogroup(rdd2)
cgRDD.collect().foreach(println)
// Shut down the environment
sc.stop()
}
}
2.29:collect
package com.atguigu.bigdata.spark.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
// Action operator Trigger job (job) Method of execution
rdd.collect()
// The underlying code , Call the... Of the environment object runJob Method
// ... will be created in the underlying code ActiveJob, And submit for execution
sc.stop()
}
}
2.30: Action operator
package com.atguigu.bigdata.spark.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
/* // Action operator reduce Gather rdd All the elements in , First aggregate the data in the partition , Re aggregate data between partitions
val i = rdd.reduce(_+_)
println(i) // 10*/
//collect, Divide the data of different partitions in order , In array array Returns all elements of the dataset in the form of
/*rdd.collect().foreach(println)*/
//count: The number of data in the data source
val cnt = rdd.count()
println(cnt)
//first: Get the first... Of the data in the data source
val first = rdd.first()
println(first)
// Get the first three data in the data source
val ints = rdd.take(3)
println(ints.mkString(","))
// After sorting , Take the first three , Sort from small to large
val ints1 = rdd.takeOrdered(3)
println(ints1.mkString(","))
sc.stop()
}
}
2.31: Action operator -aggregate
package com.atguigu.bigdata.spark.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark03_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4),2)
//aggregateByKey: The initial value will only participate in the calculation within the partition
//aggregate: The initial value will participate in the calculation within the partition , And will participate in inter partition calculation
val i = rdd.aggregate(10)(_+_,_+_)
println(i)
sc.stop()
}
}
2.32: Action operator -countByKey and countByValue
package com.atguigu.bigdata.spark.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark05_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// val rdd = sc.makeRDD(List(1,2,3,4),2)
val rdd = sc.makeRDD(List(("a",1),("a",1),("a",1)))
//countByValue: The number of occurrences of the statistic
// val intToLong = rdd.countByValue()
//countByKey: Statistics key Number of occurrences
val intToLong = rdd.countByKey()
println(intToLong)
sc.stop()
}
}
2.33- Action operator -save
package com.atguigu.bigdata.spark.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark06_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3)))
rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output1")
// saveAsSequenceFile The required data format must be key-value Type to use this method
rdd.saveAsSequenceFile("output2")
sc.stop()
}
}
2.34- Action operator -foreach
package com.atguigu.bigdata.spark.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark07_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
rdd.collect().foreach(println) // Again driver Loop traversal method of end memory collection
println("-------")
rdd.foreach(println) //executor End , Memory data printing , No sequence concept
sc.stop()
}
}

边栏推荐
- Compréhension approfondie de l'expérience de port série stm32 (registre) [Tutoriel de niveau nounou]
- In depth understanding of STM32 serial port experiment (register) [nanny level tutorial]
- Ctrip ticket app KMM cross end kV repository mmkv kotlin | open source
- Assembly language (7) operation instruction
- Statistical genetics: Chapter 1, basic concepts of genome
- Dynamic programming to solve stock problems (Part 2)
- webgame开发中的文件解密
- Five strategies and suggestions of member marketing in consumer goods industry
- leetcode 715. Range module (hard)
- SolidWorks rendering tips how not to display edges -- display style settings
猜你喜欢

科技兴关,荣联与天津海关共建基因组数据库及分析平台

MQTT断开重连

2021 q3-q4 investigation report on the use status of kotlin multiplatform

Redis best practices? If I don't feel excited after reading it, I will lose!!

Mqtt disconnect and reconnect

Redis的最佳实践?看完不心动,算我输!!

统计遗传学:第二章,统计分析概念

ctfshow web入门 命令执行web75-77

FasterRCNN

Ctfshow web getting started command execution web75-77
随机推荐
MS17_ 010 utilization summary
Rookie practical UML - activity diagram
4. N queen problem
In depth understanding of STM32 serial port experiment (register) [nanny level tutorial]
修改calico网络模式为host-gw
Dynamic programming to solve stock problems (Part 2)
十大券商有哪些?手机开户安全么?
How can we reach members more effectively?
Oracle lock table query and unlocking method
Vulnerability scanning and reverse osmosis of Internet anti artifact
利用 Repository 中的方法解决实际问题
VMware virtual machine bridging mode can not access the campus network "suggestions collection"
How do consumer goods enterprises formulate membership interests?
How to calculate flops and params in deep learning
MOS管基本原理,单片机重要知识点
VMware虚拟机 桥接模式 无法上网 校园网「建议收藏」
18: Chapter 3: development of pass service: 1: SMS login & registration process, introduction; (SMS verification code is used here)
SolidWorks rendering tips how not to display edges -- display style settings
The loss of female scientists
19: Chapter 3: develop pass service: 2: get through Alibaba cloud SMS service in the program; (it only connects with Alibaba cloud SMS server, and does not involve specific business development)