当前位置:网站首页>Spark accumulator
Spark accumulator
2022-07-27 01:14:00 【User 1483438】
What is an accumulator
accumulator : Distributed sharing writes only variables .(Executor and Executor Data cannot be read between ) An accumulator is used to put Executor End variable information is aggregated to Driver End . stay Driver Variables defined in the program , stay Executor Each end task Will get a new copy of this variable , Every task After updating the values of these copies , Send back Driver End of merge.
Case presentation
Count the sum of elements in the list
@Test
def demo: Unit ={
val conf=new SparkConf().setMaster("local[2]").setAppName("test")
val sc =new SparkContext(conf)
// Define a set , Zoning 2; Easy to calculate
val rdd1: RDD[Int] = sc.parallelize(List(3, 2, 5, 4, 8, 6), 2)
// Sum of statistical elements
var sum=0
// Cycle accumulation
rdd1.foreach(e=>{
sum =sum+e
})
// Output results
println(s"sum=$sum")
} here sum The result is ? The answer for 0
sum=0 Why 0 Well ? Shouldn't it be 3+2+5+4+8+6=28 Do you ? The reason is simple ,foreach Belong to Action operator ; Operators are Executor Implemented in , Everything outside the operator is Driver Implemented in . If the data in the operator to introduce external variables , It needs to be done serialize . The specific operation is shown in the figure ;
The sketch
Although the sum Add up , But it only works in the partition , about Driver for ,sum There has always been no change . We can print it out and see ,task It's just a thread , Use Thread.currentThread().getName You can get the thread name
// Cycle accumulation
rdd1.foreach(e=>{
sum =sum+e
println(s"${Thread.currentThread().getName};sum=$sum, e=$e ")
})Partition 0
Executor task launch worker for task 0;sum=3, e=3
Executor task launch worker for task 0;sum=5, e=2
Executor task launch worker for task 0;sum=10, e=5 Partition 1
Executor task launch worker for task 1;sum=4, e=4
Executor task launch worker for task 1;sum=12, e=8
Executor task launch worker for task 1;sum=18, e=6 Of course you can say , I don't have to foreach, Can't you use other operators ? Certainly. , For example, use reduce.
@Test
def demo: Unit ={
val conf=new SparkConf().setMaster("local[2]").setAppName("test")
val sc =new SparkContext(conf)
// Define a set , Zoning 2
val rdd1: RDD[Int] = sc.parallelize(List(3, 2, 5, 4, 8, 6), 2)
// Data aggregation
val sum=rdd1.reduce(_+_)
// Output results
println(s"sum=$sum")
}Output results , The answer is 28
sum=28All roads lead to Rome , There are many ways to achieve it .
stay Spark If you want to be in Task Count the number of certain events when calculating , Use filter/reduce It's fine too , But using an accumulator is a more convenient way , A classic application scenario of accumulator is used in Spark Streaming The number of events recorded in the application .
The use of accumulators
Using the accumulator requires SparkContext Set up as follows :sumAccumulator= Name the accumulator
val sumAccumulator=sc.longAccumulator("sumAccumulator")Built in accumulator There are three kinds of built-in accumulators ,LongAccumulator、DoubleAccumulator、CollectionAccumulator LongAccumulator: Numerical accumulation
LongAccumulator longAccumulator = sc.longAccumulator("long-account");DoubleAccumulator: Decimal accumulation
DoubleAccumulator doubleAccumulator = sc.doubleAccumulator("double-account");CollectionAccumulator: Set accumulation
CollectionAccumulator<Integer> collectionAccumulator = sc.collectionAccumulator("double-account");Case presentation :
@Test
def demo2(): Unit ={
val conf=new SparkConf().setMaster("local[2]").setAppName("test")
val sc =new SparkContext(conf)
// Define accumulator
val sumAccumulator=sc.longAccumulator("sumAccumulator")
// Define a set , Zoning 2
val rdd1: RDD[Int] = sc.parallelize(List(3, 2, 5, 4, 8, 6), 2)
// Cycle accumulation
rdd1.foreach(e=>{
sumAccumulator.add(e)
})
// Output results
println(s"sum=${sumAccumulator.value}")
}result
sum=28 The other two will not be demonstrated , It's the same when used . add: Storing data value: To get the results
The function of accumulator
accumulator : Distributed write variables only (Executor Terminal task The values of accumulators cannot be accessed from each other ). The accumulator aggregates information . towards Spark Transfer function , You can usually use Driver End defined variables , But in Executor When using this variable at the end , Every task All used in are copies of this variable . If the value of a variable changes ,Driver The variable value of the end will not change . We can realize slicing processing through accumulator , At the same time, update the variable value Link to the original text :https://blog.csdn.net/FlatTiger/article/details/115133641 Don't have to , But no, No .
Custom accumulator
Custom accumulator steps
- Definition 1. Definition class Inherit AccumulatorV2 2. Override abstract methods
- Use 1. Initialize the accumulator object 2. Register accumulator 3. Accumulate data in partitions 4. Get the end result
Case study : Use accumulator to realize WroldCount function
- Define a class Inherit
AccumulatorV2AccumulatorV2 We need to specify two types , INT: Represents the type of data entered OUT: Represents the data type of the returned result .
abstract class AccumulatorV2[IN, OUT] It doesn't matter if you don't understand , We can see longAccumulator In accumulator IN and OUT Specify what ? What passed in was a Long , It's also a Long;
class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] { Where did we introduce it ? add Is the parameter passed in (int It can be automatically converted to long)
// Cycle accumulation
rdd1.foreach(e=>{
sumAccumulator.add(e)
}) My way of thinking should be , We should give add What type of data is passed in , The data type is IN Do you ? Since it is the number of words , Can it be specified as String? If it is simply designated as String It seems difficult to calculate .
List("python","java","python","java","spark")We can assign a value to each word 1;
List(("python",1),("java",1),("python",1),("java",1),("spark",1)) such IN The parameter type of is clear , The first is a tuple , Tuple type is (String,Int) that OUT The type of ? Look at the code snippet below and think about something ?
// Output results
println(s"sum=${sumAccumulator.value}")value Whether the return is the final result ?WorldCount What is the result of program data ? Is that it ?
List(("python",2),("java",2),("spark",1))OUT The type of , We can designate one List , The element type inside , It's also a tuple (String,Int)
You also need to rewrite the methods inside .
class CustomAccumulator extends AccumulatorV2[(String,Int),List[(String,Int)]]{
/**
* Whether the accumulator is empty
*/
override def isZero: Boolean = ???
/**
* Copy accumulator
*/
override def copy(): AccumulatorV2[(String, Int), List[(String, Int)]] = ???
/**
* Reset accumulator
*/
override def reset(): Unit = ???
/**
* Cumulative element [ At every task Add up in ]
*/
override def add(v: (String, Int)): Unit = ???
/**
* Merge each task The cumulative result of 【 stay Driver Merge in 】
*/
override def merge(other: AccumulatorV2[(String, Int), List[(String, Int)]]): Unit = ???
/**
* obtain Driver Summarize the results
*/
override def value: List[(String, Int)] = ???
}Don't rush to write the implementation inside , First call , It's easy to understand .
@Test
def demo3(): Unit ={
val conf=new SparkConf().setMaster("local[2]").setAppName("test")
val sc =new SparkContext(conf)
// Initialize accumulator
val acc = new CustomAccumulator
// Register accumulator
sc.register(acc,"CustomAccumulator")
// Read the file
val lines=sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",2)
// Column cut , Data flattening
val value: RDD[String] = lines.flatMap(_.split(" "))
// Into the data structure we need
val mapList: RDD[(String, Int)] = value.map(e => (e, 1))
// Cycle accumulation
mapList.foreach(e=>{
acc.add(e)
})
// Output results
println(s"sum=${acc.value}")
}worldCount.txt Content
hello java shell
python java java
wahaha java shell
hello java shell shell Every element will be handed over to add, Just finish it first add function
import scala.collection.mutable
// Define a variable map Storage add Incoming elements
val result=mutable.Map[String,Int]() /**
* Cumulative element [ At every task Add up in ]
*/
override def add(v: (String, Int)): Unit = {
// Where are the incoming elements stored ? You can define a variable Map, Store every element
// according to key find map The elements in , Modify the original total
val sum=this.result.getOrElse(v._1,0)+v._2
// Cover the original key
this.result.put(v._1,sum)
}It doesn't matter if you don't understand , Here is the complete code .
value The result returned is result The result of ? So direct map turn list.
/**
* obtain Driver Summarize the results
*/
override def value: List[(String, Int)] = this.result.toListNow complete the code
class CustomAccumulator extends AccumulatorV2[(String,Int),List[(String,Int)]]{
import scala.collection.mutable
// Define a variable map Storage add Incoming elements
val result=mutable.Map[String,Int]()
/**
* Whether the accumulator is empty
*/
override def isZero: Boolean = ???
/**
* Copy accumulator
*/
override def copy(): AccumulatorV2[(String, Int), List[(String, Int)]] = ???
/**
* Reset accumulator
*/
override def reset(): Unit = ???
/**
* Cumulative element [ At every task Add up in ]
*/
override def add(v: (String, Int)): Unit = {
// Where are the incoming elements stored ? You can define a variable Map, Store every element
// according to key find map The elements in , Modify the original total
val sum=this.result.getOrElse(v._1,0)+v._2
// Cover the original key
this.result.put(v._1,sum)
}
/**
* Merge each task The cumulative result of 【 stay Driver Merge in 】
*/
override def merge(other: AccumulatorV2[(String, Int), List[(String, Int)]]): Unit = ???
/**
* obtain Driver Summarize the results
*/
override def value: List[(String, Int)] = this.result.toList
} The data of the current accumulator is result in , So direct judgment result Whether it is empty is OK
/**
* Whether the accumulator is empty
*/
override def isZero: Boolean = result.isEmpty Copy accumulator ; It's a little abstract to understand ,new CustomAccumulator It's defined in Driver in , But the whole calculation is in each partition , So we need establish Give him a new accumulator ( There will be pictures behind , It will not be so abstract to understand ).
/**
* Copy accumulator
*/
override def copy(): AccumulatorV2[(String, Int), List[(String, Int)]] = new CustomAccumulator()Reset accumulator : Is to clear the data
/**
* Reset accumulator
*/
override def reset(): Unit = this.result.clear()It says , Calculations are done in partitions , Therefore, you need to summarize the data of each partition
/**
* Merge each task The cumulative result of 【 stay Driver Merge in 】
*/
override def merge(other: AccumulatorV2[(String, Int), List[(String, Int)]]): Unit = {
// Get the accumulator data results of other partitions
val value: List[(String, Int)] = other.value
// And result Data merging
val list: List[(String, Int)] = result.toList
// here newList There must be duplicate data in
val newList: List[(String, Int)] =list++value
// grouping , polymerization
val groupList: Map[String, List[(String, Int)]] = newList.groupBy(e => e._1)
println(groupList)
// e._1 word
// e._2 Still a list
// e._2.map(_._2).sum Get the number of words inside
val newResult: Map[String, Int] =groupList.map(e=>{
val sum = e._2.map(_._2).sum
(e._1,sum)
})
// Merge map
result.++=(newResult)
}Complete code
class CustomAccumulator extends AccumulatorV2[(String,Int),List[(String,Int)]]{
import scala.collection.mutable
// Define a variable map Storage add Incoming elements
val result=mutable.Map[String,Int]()
/**
* Whether the accumulator is empty
*/
override def isZero: Boolean = result.isEmpty
/**
* Copy accumulator
*/
override def copy(): AccumulatorV2[(String, Int), List[(String, Int)]] =new CustomAccumulator()
/**
* Reset accumulator
*/
override def reset(): Unit = this.result.clear()
/**
* Cumulative element [ At every task Add up in ]
*/
override def add(v: (String, Int)): Unit = {
// Where are the incoming elements stored ? You can define a variable Map, Store every element
// according to key find map The elements in , Modify the original total
val sum=this.result.getOrElse(v._1,0)+v._2
// Cover the original key
this.result.put(v._1,sum)
}
/**
* Merge each task The cumulative result of 【 stay Driver Merge in 】
*/
override def merge(other: AccumulatorV2[(String, Int), List[(String, Int)]]): Unit = {
// Get the accumulator data results of other partitions
val value: List[(String, Int)] = other.value
// And result Data merging
val list: List[(String, Int)] = result.toList
// here newList There must be duplicate data in
val newList: List[(String, Int)] =list++value
// grouping , polymerization
val groupList: Map[String, List[(String, Int)]] = newList.groupBy(e => e._1)
println(groupList)
// e._1 word
// e._2 Still a list
// e._2.map(_._2).sum Get the number of words inside
val newResult: Map[String, Int] =groupList.map(e=>{
val sum = e._2.map(_._2).sum
(e._1,sum)
})
// Merge map
result.++=(newResult)
}
/**
* obtain Driver Summarize the results
*/
override def value: List[(String, Int)] = this.result.toList
}Data results
sum=List((wahaha,1), (java,5), (shell,4), (hello,2), (python,1))Data merged by partition 2 and partition 1 .
Map(shell -> List((shell,2), (shell,2)), wahaha -> List((wahaha,1)), java -> List((java,1), (java,4)), python -> List((python,1)), hello -> List((hello,1), (hello,1)))边栏推荐
- 李宏毅机器学习(2017版)_P1-2:机器学习介绍
- MySQL uses and implements ranking functions rank and deny_ Rank and row_ NUMBER
- RS485信号的测量
- MySQL - how to determine a field suitable for building an index?
- Reasons why row locks in MySQL upgrade table locks
- SQL learning (2) -- basic query and sorting of tables
- Spark on yarn's job submission process
- Programming method of sparksql
- One of the Flink requirements - processfunction (requirement: alarm if the temperature rises continuously within 30 seconds)
- 3. 拳王阿里
猜你喜欢

Solve the problem that CUDA cannot accelerate GPU in pytoch

Understanding of Flink interval join source code

小程序直播、连线直播、直播打赏:腾讯云移动直播组件MLVB多场景直播拓展

数据库期中(一)

吴恩达深度学习系列教学视频学习笔记(一)——用于二分类的logistic回归函数

智密-腾讯云直播 MLVB 插件优化教程:六步提升拉流速度+降低直播延迟

IDEA导入外部项目时pom文件的依赖无效问题解决

In depth learning report (3)

Android -- basic usage of litepal database framework

玩客云搭配zerotier保姆级教学,保证学废
随机推荐
Simple explanation of database table connection
PlantCV中文文档
Deep understanding of pod objects: basic management
SQL learning (3) -- complex query and function operation of tables
New experience of mlvb cloud live broadcast: millisecond low latency live broadcast solution (with live broadcast performance comparison)
Verilog的基本语法
Rabbit learning notes
李宏毅机器学习(2017版)_P13:深度学习
The shortest way to realize video applets: from bringing goods to brand marketing
Use and cases of partitions
Neo4j Basic Guide (installation, node and relationship data import, data query)
adb.exe已停止工作 弹窗问题
Rational selection of (Spark Tuning ~) operator
Scala-模式匹配
MySQL - how to determine a field suitable for building an index?
RS485信号的测量
Spark source code learning - memory tuning
MTCNN
Understanding of Flink checkpoint source code
Flink1.11 multi parallelism watermark test