当前位置:网站首页>Summary of spark common operators

Summary of spark common operators

2022-06-22 16:47:00 ZH519080

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

class RDDSuanzi {

  private[this] def rddBasics: Unit = {

    val sparkConf: SparkConf = new SparkConf().setAppName("rdd basics implement")
    val sparkContext: SparkContext = SparkContext.getOrCreate(sparkConf)


    val rdd: RDD[Int] = sparkContext.parallelize(Array(1, 2, 3, 4, 5, 6))

    val rDD: RDD[Int] = sparkContext.makeRDD(Array(4, 5, 6, 7, 8))

    val pairRDD: RDD[(String, String)] = sparkContext.makeRDD(Array(("a", "1"), ("b", "2"), ("c", "3"), ("b", "5")))


    val collect: Array[Int] = rdd.collect()
    /**  With Array The form of the collection returns RDD The elements of , Can also be rdd.toArray Method  */

      val cacheRDD: RDD[Int] = rdd.cache()
    /** hold rdd Data is cached in memory */

    val checkpoint: Unit = rdd.checkpoint() /** Cache data into checkpoints , call checkpoint Method before the last cache/persist Make it permanent , It must be calculated repeatedly */

    val take: Array[Int] = rdd.take(num = 10)
    /**  return Array Collection [0,num-1] Subscript elements  */

    val countByValue: collection.Map[Int, Long] = rdd.countByValue()
    /**  The elements are in RDD Is the number of times  */

    rdd.dependencies  /** obtain RDD Dependency of */

    val top: Array[Int] = rdd.top(num = 10)
    /**  Returns... In descending sort order num Elements  */

    val takeOrdered: Array[Int] = rdd.takeOrdered(num = 10)
    /**  Returns... In ascending sort order num Elements  */

    val lookup: Seq[String] = pairRDD.lookup("a")
    /**  in the light of (K,V) type , Given K, Return the corresponding V */

    rdd.map(row => row.toChar)
    /**  One to one mapping rdd Each element of  */

    rdd.flatMap(row => (1 to row))
    /**  take rdd For each element of the ,map Functional pair RDD Operation of each element in ,flatMap It's also , But it has one more step of flattening ,
      *  Flattening is the original Tuple or map The plural elements of a set are flattened into a unary form  */

    /** According to the mapping function , Yes RDD The elements in are binary , Return calculation results .reduce First, operate in each partition , And then integrate */
    rdd.reduce((a,b) => a+b)

    /**  Set to... When adding partitions true,
      *  When reducing partitions , if false Do not carry out shuffle.
      *  When fewer partitions are reduced , if false In the same stage Merge partitions in ;
      *  When more partitions are reduced , if false In the same stage Merge partitions in , Insufficient parallelism affects performance , Set here to true good  */
    rdd.coalesce(numPartitions = 2, true)

    rdd.repartition(numPartitions = 2)
    /**  Call in source code coalesce(numPartitions = 2,true) */


    rdd.randomSplit(weights = Array(0.8, 0.2), seed = 11L)
    /**  according to weights The weight will be a RDD Cut into several RDD */

    val glom: RDD[Array[Int]] = rdd.glom()
    /**  take RDD The type in any partition in is T The element of is converted to an array Array[T], So each partition has only one array element  */

    rdd.union(rDD)
    /**  Two RDD Merge the data in , But don't go heavy  */

    rdd.intersection(rDD)
    /**  Two, please RDD Data intersection of , No duplicate data . Tend to have shuffle The process  */

    rdd.subtract(rDD)
    /**  Difference set , The results include rdd It doesn't contain rDD, Tend to have shuffle The process  */

    rdd.zip(rDD)
    /**  Put two RDD Combine into K-V Formal RDD, By default, two RDD Of partition The number is the same as the number of elements , Otherwise, there will be exceptions  */

//    rdd.zipPartitions(rDD)
    /**  Put two RDD Combine into K-V Formal RDD  You need to ensure two RDD Of partition identical , The two one. RDD Element data of is not required  */

    val zipWithIndex: RDD[(Int, Long)] = rdd.zipWithIndex()
    /**  take RDD The element of and the ID Combine into K-V, You need to calculate the starting index number of each partition  */

    val zipWithUniqueId: RDD[(Int, Long)] = rDD.zipWithUniqueId()
    /**  take RDD And a unique ID Combine into K-V value  */

    val value: RDD[(String, String)] = pairRDD.partitionBy(new HashPartitioner(2))
    /**  Function and repartition similar , Generate ShuffledRDD */

    pairRDD.mapValues(x => x + 1)
    /**  Yes [K,V] Of V Value for map operation  */

    val collectAsMap: collection.Map[String, String] = pairRDD.collectAsMap()
    /**  return HashMap */

    pairRDD.flatMapValues(x => Seq(x, "a"))
    /**  Yes [K,V] Of V Value for flatmap operation  */

    /* Four kinds of K-V Type conversion operation :combineByKey、foldByKey、reduceByKey、groupByKey*/

    val reduceByKey: RDD[(String, String)] = pairRDD.reduceByKey({ case (x, y) => x + y })
    /**  Merge values with the same key  */

    val groupByKey: RDD[(String, Iterable[String])] = pairRDD.groupByKey()
    /**  Group values with the same key  */

    val sortByKey: RDD[(String, String)] = pairRDD.sortByKey()
    /** ascending = true  The mode is sorted in ascending order  */

    pairRDD.combineByKey(
      (x: String) => (List(x), 1), //createCombiner  Build composite functions 
      (peo: (List[String], Int), x: String) => (x :: peo._1, peo._2 + 1), //mergeValue  Combine value functions 
      (sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2) //mergeCombiners  Merge combiner functions 
    ).foreach(println)
    /**  Aggregate the elements of each partition , And every element is a binary , and aggregate similar  */
  }
}

 

原网站

版权声明
本文为[ZH519080]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/173/202206221523255208.html