当前位置:网站首页>Spark-day03-core programming RDD operator
Spark-day03-core programming RDD operator
2022-06-26 12:09:00 【There will always be daylight】
One :RDD operator
RDD Operators are also called RDD Method , There are two main categories . Transformation operator and action operator .

Two :RDD Conversion operator
According to different data processing methods, the operators are divided into value type 、 double value The type and key-value type
2.1:map Value conversion
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark01_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd = sc.makeRDD(List(1,2,3,4))
def mapFunction(num:Int):Int = {
num * 2
}
val mapRDD:RDD[Int] = rdd.map(mapFunction)
mapRDD.collect().foreach(println) //2 4 6 8
// Shut down the environment
sc.stop()
}
}
2.2:map Parallel effect display
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_Operator_Transform_Par {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
//1.rdd The calculation of , The data in a partition executes logic one by one
// Only after all the logic of the previous data is executed , To execute the next data
// The execution of the data in the partition is orderly
//2. The calculation of data in different partitions is out of order
val rdd = sc.makeRDD(List(1,2,3,4))
val mapRDD = rdd.map(
num => {
println(">>>>>>" + num)
num
}
)
val mapRDD1 = mapRDD.map(
num => {
println("#####" + num)
num
}
)
mapRDD1.collect()
// Shut down the environment
sc.stop()
}
}
2.3:mapPartitions: Data conversion operations can be performed in partition units
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark02_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd = sc.makeRDD(List(1,2,3,4),2)
val mapRDD:RDD[Int] = rdd.mapPartitions(
// The number of executions is the number of partitions
iter => {
println(">>>>>>>>")
iter.map(_*2)
}
)
mapRDD.collect().foreach(println) //2 4 6 8
// Shut down the environment
sc.stop()
}
}
2.4:map and mapPartitions The difference between
Data processing perspective
map: The execution of one data in a partition , Similar to serial operation .
mapParitions: Batch operations are performed on a partition by partition basis .
From a functional point of view
map: Transform and change the data in the data source , But it will not reduce or increase data
mapParitions: You need to pass an iterator , Returns an iterator , The number of elements not required remains the same , So you can increase or decrease data .
Performance perspective
map: Similar to serial operation , The performance is relatively low .
mapParitions: Similar to batch operation , Higher performance . But it takes up content for a long time , This will result in insufficient memory , A memory overflow error occurred . So with limited memory , It is not recommended to use . Use map operation .
2.5:mapParitionsWithIndex: Gets the specified partition
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark03_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
// Get the data in the second partition
val rdd = sc.makeRDD(List(1,2,3,4),2)
val mapRDD:RDD[Int] = rdd.mapPartitionsWithIndex(
(index,iter) => {
if (index == 1){
iter
}else{
Nil.iterator
}
}
)
mapRDD.collect().foreach(println) //2 4 6 8
// Shut down the environment
sc.stop()
}
}
2.6:flatmap: Flattening operation
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark04_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd:RDD[List[Int]] = sc.makeRDD(List(List(1,2),List(3,4)))
val flatRDD:RDD[Int] = rdd.flatMap(
list => {
list
}
)
flatRDD.collect().foreach(println)
// Shut down the environment
sc.stop()
}
}
2.6: Flattening exercise : Output 1,2,3,4,5
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
object Spark04_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd = sc.makeRDD(List(List(1,2),3,List(4,5)))
val flatRDD = rdd.flatMap(
date => {
date match {
case list:List[_] => list
case dat => List(dat)
}
}
)
flatRDD.collect().foreach(println)
// Shut down the environment
sc.stop()
}
}
2.7:glom
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark05_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd:RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
//List => Int
//Int => List
val glomRDD:RDD[Array[Int]] = rdd.glom()
glomRDD.collect().foreach(data=>println(data.mkString(",")))
// Shut down the environment
sc.stop()
}
}
2.8:glom Case study : Calculate the sum of the maximum values of all partitions , Maximum value in the partition , Sum the maximum value between partitions
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark06_RDD_Operator_Transform_Test {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd:RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
val glomRDD:RDD[Array[Int]] = rdd.glom()
val maxRDD:RDD[Int] = glomRDD.map(
array => {
array.max
}
)
println(maxRDD.collect().sum)
// Shut down the environment
sc.stop()
}
}
2.9 The meaning of partition invariance , The partition name and data are unchanged ,output Yes 00000 and 00001 Namely 1,2 and 3,4.output1 Yes 00000 and 00001, Namely 2,4 and 6,8.
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_Operator_Transform_Part {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd = sc.makeRDD(List(1,2,3,4),2)
rdd.saveAsTextFile("output")
//
val mapRDD = rdd.map(_*2)
mapRDD.saveAsTextFile("output1")
// Shut down the environment
sc.stop()
}
}
2.10:groupby, Grouping and partitioning are not necessarily related .groupby The data Upset , Back together (shuffle)
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark06_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd:RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
//groupby Each data in the data source will be grouped and judged , According to the returned group key Grouping
// same key Values are placed in a group
def groupFunction(num:Int):Int = {
num % 2
}
val groupRDD:RDD[(Int,Iterable[Int])] = rdd.groupBy(groupFunction)
groupRDD.collect().foreach(println)
// Shut down the environment
sc.stop()
}
}
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark06_RDD_Operator_Transform1 {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
// Group by initials , There is no necessary relationship between grouping and grouping
val rdd = sc.makeRDD(List("hello","spark","scala","hadoop"),2)
val groupRDD = rdd.groupBy(_.charAt(0))
groupRDD.collect().foreach(println)
// Shut down the environment
sc.stop()
}
}
2.11:filter. Data filtering operation . May cause data skew . For example, the previous two partitions , There were 1000 Data , After filtering , This may cause a large difference in the amount of data in the two partitions .
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark07_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd:RDD[Int] = sc.makeRDD(List(1,2,3,4))
val filter:RDD[Int] = rdd.filter(num=>num%2!=0)
filter.collect().foreach(println)
// Shut down the environment
sc.stop()
}
}
2.12:sample: Data extraction operations , We can judge what affects the data skew problem according to random sampling .
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark08_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10))
//sample The operator needs to pass three parameters
//1. First parameter representation , Whether to return the data after extracting the data true( Put back ),false( discarded )
//2. The second parameter represents ,
// If you don't put it back : The probability that each piece of data in the data source is extracted : The concept of reference value
// If you take it out and put it back , Represents the number of times each data in the data source is likely to be extracted
//3. The third parameter represents , The seed of random algorithm when extracting data
// If you don't pass the third parameter , Then use the current system time
println(rdd.sample(
false,
fraction = 0.4, // Probability of each value , It doesn't have to be 10 Data , There must be four pieces of data
seed = 1
).collect().mkString(","))
// Shut down the environment
sc.stop()
}
}
2.13:distinct: duplicate removal
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark09_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd:RDD[Int] = sc.makeRDD(List(1,2,3,4,1,2,3,4))
val rdd1:RDD[Int] = rdd.distinct()
rdd1.collect().foreach(println)
// Shut down the environment
sc.stop()
}
}
2.14:coalesce: After filtering , There may be a lot less data in the partition , Avoid waste of resources , Partitions can be reduced according to the amount of data , Used after big data set filtering , Improve the execution efficiency of small data sets ..
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark10_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),3)
//coalesce: By default, the data will not be disordered and recombined , Data in the same partition as before will not be , Assign to different partitions
// Reducing partitions in this case may lead to data imbalance , Skew data
// If you want to balance the data , have access to shuffle, The second parameter represents the use of shuffle
val newRDD:RDD[Int] = rdd.coalesce(2,true)
newRDD.saveAsTextFile("output")
// Shut down the environment
sc.stop()
}
}
2.15:repartition: Expansion of zoning
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark11_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),2)
//coalesce: You can expand the partition , But pay attention to , Whether the partition expansion needs to be disrupted again , if necessary , The second parameter needs to be changed to true, Use shuffle
val newRDD:RDD[Int] = rdd.repartition(3)
newRDD.saveAsTextFile("output")
// Shut down the environment
sc.stop()
}
}
2.16:sortby: Default
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark12_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD
val rdd:RDD[Int] = sc.makeRDD(List(6,2,4,5,3,1),2)
//sortby The default is ascending , The second parameter can change the sorting method
//sortby By default , The partition will not be changed , But in the middle shuffle operation
val newRDD:RDD[Int] = rdd.sortBy(num=>num)
newRDD.saveAsTextFile("output")
// Shut down the environment
sc.stop()
}
}
2.17: double value type : intersection , Combine , Difference sets and zippers
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark13_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD double value Type of operation , Two rdd
val rdd1 = sc.makeRDD(List(1,2,3,5))
val rdd2 = sc.makeRDD(List(3,4,5,6))
// intersection 、 Combine 、 The difference set needs to keep the data types of the two data sources consistent
// Zipper operation can cause two data sources to have different data types , The zipper requires that the number of partitions of the two data sources be consistent , The content in the data source should also be consistent
// intersection [3,4]
val rdd3:RDD[Int] = rdd1.intersection(rdd2)
println(rdd3.collect().mkString(","))
// Combine [1,2,3,4,3,4,5,6]
val rdd4:RDD[Int] = rdd1.union(rdd2)
println(rdd4.collect().mkString(","))
// Difference set rdd1 The angle of ===>[1,2]
val rdd5:RDD[Int] = rdd1.subtract(rdd2)
println(rdd5.collect().mkString(","))
// zipper [1-3,2-4,3-5,4-6]
val rdd6:RDD[(Int, Int)] = rdd1.zip(rdd2)
println(rdd6.collect().mkString(","))
// Shut down the environment
sc.stop()
}
}
2.18: Key type :paritionby. Comparator , Change the location of data storage .
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark14_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD double value Type of operation , Two rdd
val rdd = sc.makeRDD(List(1,2,3,4),2)
val mapRDD = rdd.map((_,1))
mapRDD.partitionBy(new HashPartitioner(2))
.saveAsTextFile("output")
// Shut down the environment
sc.stop()
}
}
2.19:reduceByKey: same key data , Conduct value Aggregation of data
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
object Spark15_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD double value Type of operation , Two rdd
val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("a",4)))
// same key The data of , Conduct value Aggregation of data
//reduceByKey If key There's only one data for , It's not going to take part in the calculation
val reduceRDD = rdd.reduceByKey((x:Int,y:Int) => {x+y})
reduceRDD.collect().foreach(println)
// Shut down the environment
sc.stop()
}
}
2.20:groupByKey: The number in the data source , identical key The data are grouped together , Form a dual tuple
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark16_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD double value Type of operation , Two rdd
val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",4)))
//groupByKey: Put the data in the data source , identical key The data are grouped together , Form a dual tuple
// The first element in a tuple is key, The second element in a tuple is the same key Of value Set
val groupRDD:RDD[(String,Iterable[Int])] = rdd.groupByKey()
groupRDD.collect().foreach(println)
// Shut down the environment
sc.stop()
}
}
2.21:reduceByKey and groupByKey The difference between
performance :reduceByKey be better than groupByKey
reduceByKey Support the function of pre aggregation within partitions , Can effectively reduce shuffle The amount of data falling on the disk
function :groupByKey be better than reduceByKey
Let's say we only need to group , No aggregation is required 

2.22: aggregateByKey: Different operations can be performed within and between partitions
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark17_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD double value Type of operation , Two rdd
val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",4)),2)
//aggregateByKey There is a function coriolisation , There are two parameter lists
// The first parameter list , You need to pass a parameter , Expressed as the initial value
// Mainly used when meeting the first key When , and value Perform intra zone calculations
// The second parameter list needs to pass 2 Parameters
// The first parameter represents the calculation rule in the partition
// The second parameter represents the calculation rules between partitions
rdd.aggregateByKey(0)(
(x,y) => math.max(x,y),
(x,y) => x + y
).collect.foreach(println)
// Shut down the environment
sc.stop()
}
}


2.23:foldByKey: The calculation rules are the same within and between partitions
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
object Spark17_RDD_Operator_Transform2 {
def main(args: Array[String]): Unit = {
// Prepare the environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD double value Type of operation , Two rdd
val rdd = sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),2)
rdd.foldByKey(0)(_+_).collect.foreach(println)
// Shut down the environment
sc.stop()
}
}
2.24:combineByKey

val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93),
("a", 95), ("b",
val input: RDD[(String, Int)] = sc.makeRDD(list, 2)
val combineRdd: RDD[(String, (Int, Int))] = input. combineByKey
(_,
(acc: (Int, Int), v) => (acc._1 + v, acc._2 +
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2.25:reduceByKey、foldByKey、aggregateByKey、combineByKey The difference between
reduceByKey: identical key The first data of does not do any calculations , The calculation rules are the same within and between partitions
FoldByKey: identical key The first data and initial value of the , The calculation rules are the same within and between partitionsAggregateByKey: identical key The first data and initial value of the , The calculation rules within and between partitions can be different
CombineByKey: When calculating , When it is found that the data structure does not meet the requirements , You can have the first data transform structure . The calculation rules within and between partitions are different .
2.26:join: Data from two different data sources , identical key Of value Will be connected together , Form tuples
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark19_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD double value Type of operation , Two rdd
val rdd1 = sc.makeRDD(List(("a",1),("b",2),("c",3)))
val rdd2 = sc.makeRDD(List(("b",4),("c",5),("a",6)))
//join: Data from two different data sources , same key Of value Will be connected together , Form tuples
// If one of the two data sources key No match , So the data doesn't appear in the results , There are... In a data source m strip , There are... In a data source n strip , The number of matches is n*m
val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
joinRDD.collect().foreach(println)
// Shut down the environment
sc.stop()
}
}
2.27:left outer join: Be similar to sql The left outer connection of // right outer join
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Spark20_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD double value Type of operation , Two rdd
val rdd1 = sc.makeRDD(List(("a",1),("b",2),("c",3)))
val rdd2 = sc.makeRDD(List(("a",4),("b",5)))
val joinRDD = rdd1.leftOuterJoin(rdd2)
joinRDD.collect().foreach(println)
// Shut down the environment
sc.stop()
}
}
2.28:cogroup: Same in the same data source key Put in a group , And then connect with other data sources
package com.atguigu.bigdata.spark.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
object Spark21_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// establish RDD double value Type of operation , Two rdd
val rdd1 = sc.makeRDD(List(("a",1),("b",2)))
val rdd2 = sc.makeRDD(List(("a",4),("b",5),("c",6),("c",7)))
// cogroup connect + group Through... In a data source same key Put in a group , And then connect with other data sources
val cgRDD = rdd1.cogroup(rdd2)
cgRDD.collect().foreach(println)
// Shut down the environment
sc.stop()
}
}
2.29:collect
package com.atguigu.bigdata.spark.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
// Action operator Trigger job (job) Method of execution
rdd.collect()
// The underlying code , Call the... Of the environment object runJob Method
// ... will be created in the underlying code ActiveJob, And submit for execution
sc.stop()
}
}
2.30: Action operator
package com.atguigu.bigdata.spark.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
/* // Action operator reduce Gather rdd All the elements in , First aggregate the data in the partition , Re aggregate data between partitions
val i = rdd.reduce(_+_)
println(i) // 10*/
//collect, Divide the data of different partitions in order , In array array Returns all elements of the dataset in the form of
/*rdd.collect().foreach(println)*/
//count: The number of data in the data source
val cnt = rdd.count()
println(cnt)
//first: Get the first... Of the data in the data source
val first = rdd.first()
println(first)
// Get the first three data in the data source
val ints = rdd.take(3)
println(ints.mkString(","))
// After sorting , Take the first three , Sort from small to large
val ints1 = rdd.takeOrdered(3)
println(ints1.mkString(","))
sc.stop()
}
}
2.31: Action operator -aggregate
package com.atguigu.bigdata.spark.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark03_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4),2)
//aggregateByKey: The initial value will only participate in the calculation within the partition
//aggregate: The initial value will participate in the calculation within the partition , And will participate in inter partition calculation
val i = rdd.aggregate(10)(_+_,_+_)
println(i)
sc.stop()
}
}
2.32: Action operator -countByKey and countByValue
package com.atguigu.bigdata.spark.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark05_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// val rdd = sc.makeRDD(List(1,2,3,4),2)
val rdd = sc.makeRDD(List(("a",1),("a",1),("a",1)))
//countByValue: The number of occurrences of the statistic
// val intToLong = rdd.countByValue()
//countByKey: Statistics key Number of occurrences
val intToLong = rdd.countByKey()
println(intToLong)
sc.stop()
}
}
2.33- Action operator -save
package com.atguigu.bigdata.spark.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark06_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3)))
rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output1")
// saveAsSequenceFile The required data format must be key-value Type to use this method
rdd.saveAsSequenceFile("output2")
sc.stop()
}
}
2.34- Action operator -foreach
package com.atguigu.bigdata.spark.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark07_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
rdd.collect().foreach(println) // Again driver Loop traversal method of end memory collection
println("-------")
rdd.foreach(println) //executor End , Memory data printing , No sequence concept
sc.stop()
}
}

边栏推荐
猜你喜欢

Mqtt disconnect and reconnect

Omni channel member link - tmall member link 3: preparation of member operation content

Deep thinking from senior member managers

【Redis 系列】redis 学习十六,redis 字典(map) 及其核心编码结构

Statistical genetics: Chapter 2, the concept of statistical analysis

Basic principle of MOS tube and important knowledge points of single chip microcomputer

Quantitative elementary -- akshare obtains stock code, the simplest strategy

Vscode solves the problem of Chinese garbled code

1、 MySQL introduction

JMeter response time and TPS listener tutorial
随机推荐
Excel operation of manual moving average method and exponential smoothing method for time series prediction
Is it safe to open a securities account in general
统计遗传学:第一章,基因组基础概念
Five strategies and suggestions of member marketing in consumer goods industry
Basic use of express in nodejs
How do consumer goods enterprises formulate membership interests?
Refined operation, extending the full life cycle value LTV
Pratique de l'attaque et de la défense du réseau HUST | 6 Expérience de sécurité du microprogramme de l'équipement IOT | expérience 2 technologie d'atténuation des attaques de l'équipement IOT basée s
【毕业季·进击的技术er】忆毕业一年有感
Prospering customs through science and technology, Ronglian and Tianjin Customs jointly build a genomic database and analysis platform
这两天搭建环境遇到的几个问题
Oracle锁表查询和解锁方法
Leetcode 78. Subset and 90 Subset II
The transformation of enterprise customers' digital assets needs to suit the case
Oracle lock table query and unlocking method
统计遗传学:第二章,统计分析概念
Member system + enterprise wechat + applet to help the efficient transformation of private domain
介绍一下实现建模中可能用到的时间序列预测之线性二次移动平均,Excel的简单操作
Vscode solves the problem of Chinese garbled code
[redis series] redis learning 16. Redis Dictionary (map) and its core coding structure