当前位置:网站首页>Spark Learning Notes (VII) -- spark core core programming - RDD serialization / dependency / persistence / partition / accumulator / broadcast variables
Spark Learning Notes (VII) -- spark core core programming - RDD serialization / dependency / persistence / partition / accumulator / broadcast variables
2022-07-29 07:10:00 【One's cow】
Catalog
(2) Serialize methods and properties
(2)RDD Dependency relationship
(3) The difference between caching and checkpointing
accumulator : Distributed sharing writes only variables
Broadcast variables : Distributed shared read-only variables
RDD serialize
(1) Closure check
Calculated angle : The code outside the operator is Driver End execution , The code in the operator is Executor End execution .
stay scala In functional programming , Data outside the operator is often used inside the operator , This creates the effect of closures ; Data outside the operator is not serialized , It means that the value cannot be transferred to Executor End execution , There will be errors ; So we need to do the task calculation before , Detect whether objects in closures can be serialized , This operation we call closure detection .
(2) Serialize methods and properties
import org.apache.spark.{SparkConf, SparkContext}
object RDD_Serialiazble {
def main(args: Array[String]): Unit = {
//TODO Create an environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Serializable")
val sc = new SparkContext(sparkConf)
//TODO
// establish RDD
val rdd = sc.makeRDD(Array("Data","hello world", "hello scala", "hello spark","world","Big data"))
// establish search object
val search = new Search("hello")
// Function transfer
search.getMatch1(rdd).collect().foreach(println)
println(">>>>>>>>>>>")
// Attribute passing
search.getMatch2(rdd).collect().foreach(println)
//TODO Shut down the environment
sc.stop()
}
}
// Query object
class Search(query:String) extends Serializable {
def isMatch(s:String): Boolean = {
s.contains(this.query)
}
// Function serialization
def getMatch1 (rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
// Property serialization
def getMatch2(rdd: RDD[String]) : RDD[String] = {
rdd.filter(x => x.contains(query))
}
}
(3)Kryo serialize
1)Java Can serialize any class . however Java There are too many serialized bytes , The submission of serialized objects is also relatively large .Spark For performance reasons ,2.0 Start supporting another serialization mechanism ——Kryo.
2)Kryo The speed of Serializable Of 10 times . When RDD stay Shuffle Data time , Simple data type 、 Array and string types are already in Spark For internal use Kryo Serialization .
3) Use Kryo Serialization should also inherit Serializable Interface .
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object RDD_Serializable_Kryo {
def main(args: Array[String]): Unit = {
//TODO Create an environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Serializable")
// Replace serialization mechanism
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
// Use kryo Serialized custom classes
.registerKryoClasses(Array(classOf[Search2]))
val sc = new SparkContext(sparkConf)
//TODO
// establish RDD
val rdd = sc.makeRDD(Array("Data","hello world", "hello scala","world","Big data"))
// establish search object
val search2 = new Search2("hello")
// Function transfer
val result = search2.getMatch3(rdd)
result.collect().foreach(println)
println("<<<<<<<<")
// Attribute passing
search2.getMatch4(rdd).collect().foreach(println)
//TODO Shut down the environment
sc.stop()
}
}
// Query object
class Search2(query: String) extends Serializable {
def isMatch(s: String) = {
s.contains(query)
}
// Function serialization
def getMatch3(rdd: RDD[String]) = {
rdd.filter(isMatch)
}
// Property serialization
def getMatch4(rdd: RDD[String]) = {
rdd.filter(_.contains(query))
}
}
RDD Dependency relationship
(1)RDD Consanguinity
1)RDD Only coarse grained conversion is supported .
2) Will be created RDD A series of Lineage( ancestry ) recorded , To recover lost partitions .RDD Of Lineage Will record RDD Metadata information and transformation behavior of , When it's time to RDD When part of the partition data is lost , It can be based on this information Recalculate and recover lost data partitions .
notes :
coarse-grained : Transform the entire data set , Instead of transforming an element in the dataset ;
fine-grained : Perform conversion operations on an element in the dataset .
def main(args: Array[String]): Unit = {
//TODO Create an environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Depend")
val sc = new SparkContext(sparkConf)
//TODO Dependency relationship
val rdd = sc.textFile("datas/word2.txt")
println(rdd.toDebugString)
println(">>>>>>>>>>>")
val flatRDD = rdd.flatMap(_.split(" "))
println(flatRDD.toDebugString)
println(">>>>>>>>>>>")
val mapRDD = flatRDD.map((_, 1))
println(mapRDD.toDebugString)
println(">>>>>>>>>>>>>>")
val resultRDD = mapRDD.reduceByKey(_ + _)
println(resultRDD.toDebugString)
println(">>>>>>>>>>>>>>")
resultRDD.collect().foreach(println)
//TODO Shut down the environment
sc.stop()
}
(2)RDD Dependency relationship
Dependency is actually two adjacent RDD The relationship between .
def main(args: Array[String]): Unit = {
//TODO Create an environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Depend")
val sc = new SparkContext(sparkConf)
//TODO Dependency relationship
val rdd = sc.textFile("datas/word2.txt")
println(rdd.dependencies)
println(">>>>>>>>>>>")
val flatRDD = rdd.flatMap(_.split(" "))
println(flatRDD.dependencies)
println(">>>>>>>>>>>")
val mapRDD = flatRDD.map((_, 1))
println(mapRDD.dependencies)
println(">>>>>>>>>>>>>>")
val resultRDD = mapRDD.reduceByKey(_ + _)
println(resultRDD.dependencies)
println(">>>>>>>>>>>>>>")
resultRDD.collect().foreach(println)
//TODO Shut down the environment
sc.stop()
}
(3)RDD Narrow dependence
Narrow dependencies represent each parent RDD Of Partition Can only be Son RDD One of the Partition Use , The metaphor of narrow dependence on our image is the only child .
(4)RDD Wide dependence
Wide dependencies represent the same parent RDD Of Partition By multiple children RDD Of Partition rely on , Can cause Shuffle, The metaphor of wide dependence on our image is multiple birth .
(5)RDD Stage division
DAG(Directed Acyclic Graph) A directed acyclic graph is a topological graph composed of points and lines , The figure has a direction , It won't close the loop .
DAG Recorded part RDD The transformation process and the task stage of .
ShuffleMap Stage ,Result Stage
(6)RDD Division of tasks
RDD Task segmentation , The middle is divided into :Application、Job、Stage and Task
Application: Initialize a SparkContext That is to generate a Application;
Job: One Action The operator will generate a Job;
Stage:Stage It's equal to wide dependence (ShuffleDependency) Add the number of 1;
Task: One Stage In phase , the last one RDD The number of partitions is Task The number of .
notes :Application->Job->Stage->Task Every floor is 1 Yes n The relationship between .
RDD Persistence
(1)RDD Cache cache
RDD adopt Cache perhaps Persist Method caches the previous calculation results , By default, the data will be cached in JVM In the heap memory of . But these two methods are not cached immediately when called , It's the trigger behind action Operator time , The RDD Will be cached in the memory of the compute node , And for later reuse .
notes : The cache may be lost
1) The data stored in memory is deleted due to insufficient memory ,RDD The cache fault tolerance mechanism ensures the correct execution of the calculation even if the cache is lost .
2) Based on RDD A series of transformations , Lost data will be recalculated , because RDD Each of them Partition Is relatively independent , So just calculate the missing part .
3)Spark It'll automatically do some Shuffle The intermediate data of the operation is persisted ( such as :reduceByKey). When a node Shuffle Failure can avoid recalculating the entire input .
4) In practical use , If you want to reuse data , Recommend calling persist or cache.
def main(args: Array[String]): Unit = {
//TODO Create an environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Persist")
val sc = new SparkContext(sparkConf)
//TODO
val rdd = sc.makeRDD(List("hello world", "hello scala"))
val flatRDD = rdd.flatMap(_.split(" "))
val mapRDD = flatRDD.map((_, 1))
//cache Put data into memory
mapRDD.cache()
//persist Put data on disk
//mapRDD.persist(StorageLevel.DISK_ONLY)
val reduceRDD = mapRDD.reduceByKey(_ + _)
reduceRDD.collect().foreach(println)
println("<<<<<<<<<<<<<")
val groupRDD = mapRDD.groupByKey()
groupRDD.collect().foreach(println)
//TODO Shut down the environment
sc.stop()
}
(2)RDD CheckPoint checkpoint
The so-called checkpoint is to RDD Intermediate results are written to disk , Because blood dependence is too long, the cost of fault tolerance will be too high , Do checkpoint fault tolerance in the intermediate stage , If there is a node problem after the checkpoint , You can redo it from the checkpoint , To reduce overhead .
Yes RDD Conduct checkpoint The operation is not immediately executed , Must be implemented Action Operation can trigger .
def main(args: Array[String]): Unit = {
//TODO Create an environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("CheckPoint")
val sc = new SparkContext(sparkConf)
//TODO checkpoint
//checkpoint To specify the path to save checkpoints
sc.setCheckpointDir("checkpoint")
val rdd = sc.makeRDD(List("hello world", "hello scala"))
val flatRDD = rdd.flatMap(_.split(" "))
val mapRDD = flatRDD.map((_, 1))
mapRDD.checkpoint()
val reduceRDD = mapRDD.reduceByKey(_ + _)
reduceRDD.collect().foreach(println)
println("<<<<<<<<<<<<<")
val groupRDD = mapRDD.groupByKey()
groupRDD.collect().foreach(println)
//TODO Shut down the environment
sc.stop()
}
(3) The difference between caching and checkpointing
1)Cache Caching just saves the data , Do not cut off blood dependence ;Checkpoint Checkpoints cut blood dependence .
2)Cache Cached data is usually stored on disk 、 Memory, etc , Low reliability .Checkpoint The data is usually stored in HDFS Equal fault tolerance 、 Highly available file system , High reliability .
3) It is suggested that checkpoint() Of RDD Use Cache cache , such checkpoint Of job Just from Cache Just read the data from the cache , Otherwise, calculate from scratch RDD.
RDD Comparator
Spark At present, we support Hash Partition 、Range Partitions and user-defined partitions .
Hash The partition is the current default partition . The partition directly determines RDD The number of partitions in 、RDD Every piece of data goes through Shuffle Which partition will you enter after , And then decided reduce The number of .
Only Key-Value Type of RDD There is a partition , Not Key-Value Type of RDD The value of the partition is None.
Every RDD The partition ID Range :0 ~ (numPartitions - 1), Determine which partition this value belongs to .
(1)Hash Partition : For a given key, Calculating the hashCode, And divide by the number of partitions to get the remainder .
(2)Range Partition : Map a range of data into a partition , Try to ensure that the data in each partition is uniform , And the partitions are orderly .
notes : The previous blog post has written about partition , Here is only a demonstration of the custom partition , Others don't do demonstrations .
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
object RDD_OnesPart {
def main(args: Array[String]): Unit = {
//TODO Create an environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Part")
val sc = new SparkContext(sparkConf)
//TODO Custom partition
val rdd = sc.makeRDD(List(
("CBA", "....."), ("NBA", "....."), ("WWW", "....."), ("COM", ".....")
), 3)
val partRDD = rdd.partitionBy(new MyPartitioner)
partRDD.saveAsTextFile("output")
//TODO Shut down the environment
sc.stop()
}
//TODO Custom partition
/**
* 1. Inherit partitioner
* 2. Rewriting methods
*
*
* */
class MyPartitioner extends Partitioner {
// Zoning quantity
override def numPartitions: Int = 3
// According to the data key Value returns the partition index
override def getPartition(key: Any): Int = {
key match {
case "CBA" => 0
case "NBA" => 1
case _ => 2
}
}
}
}
accumulator : Distributed sharing writes only variables
An accumulator is used to put Executor End variable information is aggregated to Driver End . stay Driver Variables defined in the program , stay Executor Each end Task Will get a new copy of this variable , Every task After updating the values of these copies , Send back Driver End of merge.
(1) System accumulator
def main(args: Array[String]): Unit = {
//TODO Create an environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Acc")
val sc = new SparkContext(sparkConf)
//TODO accumulator —— System accumulator
val rdd = sc.makeRDD(List(1, 2, 3, 4))
// Method 1 of realizing accumulation :reduce(—_+_)
// val reduceRDD = rdd.reduce(_ + _)
//s\ Method 2 of realizing accumulator
val sum = sc.longAccumulator("sum")
rdd.foreach(
num => {
sum.add(num)
}
)
//1+2+3+4=10
println(sum.value)
//TODO Shut down the environment
sc.stop()
}
(2) Custom accumulator
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object RDD_Acc_Ones {
def main(args: Array[String]): Unit = {
//TODO Create an environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Acc")
val sc = new SparkContext(sparkConf)
//TODO accumulator —— Custom accumulator
val rdd = sc.makeRDD(List(
"hello","spark","hello","world"
))
// rdd.map((_,1)).reduceByKey(_+_)
// accumulator
// Create accumulator object
val accWc = new MyAccumulator()
// towards spark To register
sc.register(accWc,"wc")
rdd.foreach(
word => {
accWc.add(word)
}
)
println(accWc.value)
//TODO Shut down the environment
sc.stop()
}
/**MyAccumulator
* Custom accumulator
* 1. Inherit AccumulatorV2, Set generics
* 2. Abstract method of rewriting accumulator
*/
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String,
Long]]{
var map : mutable.Map[String, Long] = mutable.Map()
// Whether the accumulator is in the initial state
override def isZero: Boolean = {
map.isEmpty
}
// Copy accumulator
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
new MyAccumulator
}
// Reset accumulator
override def reset(): Unit = {
map.clear()
}
// Add data to the accumulator (In)
override def add(word: String): Unit = {
// Inquire about map Does the same word exist in
// If there are the same words , Then add the number of words 1
// If there is no same word , So in map Add this word to
map(word) = map.getOrElse(word, 0L) + 1L
}
// Merge accumulator
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]):
Unit = {
val map1 = map
val map2 = other.value
// Two Map The merger of
map = map1.foldLeft(map2)(
( innerMap, kv ) => {
innerMap(kv._1) = innerMap.getOrElse(kv._1, 0L) + kv._2
innerMap
}
) }
// Returns the result of the accumulator (Out)
override def value: mutable.Map[String, Long] = map
}
}
Broadcast variables : Distributed Shared read-only variables
principle : Broadcast variables are used to efficiently distribute large objects . Send a read-only value to all working nodes , For one or more Spark Operation and use . such as , If your application needs to send a large read-only query table to all nodes , Broadcast variables are appropriate . Using the same variable in multiple parallel operations , however Spark Each task will be sent separately .
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object RDD_Broadcast {
def main(args: Array[String]): Unit = {
//TODO Create an environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Acc")
val sc = new SparkContext(sparkConf)
//TODO accumulator —— Broadcast variables
val rdd1 = sc.makeRDD(List( ("a",1), ("b", 2), ("c", 3), ("d", 4) ),4)
val list = List( ("a",4), ("b", 5), ("c", 6), ("d", 7) )
// Declare broadcast variables
val broadcast: Broadcast[List[(String, Int)]] = sc.broadcast(list)
val resultRDD: RDD[(String, (Int, Int))] = rdd1.map {
case (key, num) => {
var num2 = 0
// Use broadcast variables
for ((k, v) <- broadcast.value) {
if (k == key) {
num2 = v
}
}
(key, (num, num2))
}
}
resultRDD.collect().foreach(println)
//TODO Shut down the environment
sc.stop()
}
}
This article is just a record of learning notes !
边栏推荐
- [C language brush leetcode] 1054. Bar code with equal distance (m)
- 模拟卷Leetcode【普通】172. 阶乘后的零
- ETL为什么经常变成ELT甚至LET?
- Teacher Wu Enda's machine learning course notes 00 are written in the front
- Thread synchronization - producers and consumers, tortoise and rabbit race, dual thread printing
- Simulation volume leetcode [normal] 081. Search rotation sort array II
- Hj37 statistics of the total number of rabbits per month Fibonacci series
- dba
- Flink real time warehouse DWD layer (traffic domain) template code
- [C language brush leetcode] 2332. The latest time to get on the bus (m)
猜你喜欢
随机推荐
[C language brush leetcode] 2332. The latest time to get on the bus (m)
要不要满足客户所有的需求
Unity exploration plot access design analysis & process + code specific implementation
Flink实时仓库-DWD层(处理复杂数据-流和表的装换处理)模板代码
【charles日常问题】开启charles,使用不了钉钉
Leetcode-1331: array ordinal conversion
Relative date used by filter in salesforce
Can MySQL export tables regularly?
Summary of 2022 SQL classic interview questions (with analysis)
【Redis】Redis开发规范与注意事项
Image noise and matrix inversion
谷歌零碎笔记之JWT(草稿)
fillder使用
Unity探索地块通路设计分析 & 流程+代码具体实现
Guess the number / / generate a random number for the first time
Decompilation of wechat applet
模拟卷Leetcode【普通】222. 完全二叉树的节点个数
CVPR2022Oral专题系列(一):低光增强
接口测试实战项目03:执行测试用例
gin 模版