当前位置:网站首页>Spark accumulator
Spark accumulator
2022-07-06 02:04:00 【Diligent ls】
One 、 System accumulator
accumulator : Distributed sharing writes only variables .(Executor and Executor Data cannot be read between )
An accumulator is used to put Executor End variable information is aggregated to Driver End . stay Driver A variable defined in , stay Executor Each end task Will get a new copy of this variable , Every task After updating the values of these copies , Send back Driver Perform consolidation calculation at the end .

Definition :SparkContext.accumulator(initialValue) Method
object accumulator01_system {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("WC")
.setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)))
val sum: LongAccumulator = sc.longAccumulator("sum")
dataRDD.foreach{
case (word,count) => {
sum.add(count)
}
}
println(("a",sum.value))
sc.stop()
}
}Be careful :
Executor The task at the end cannot read the value of the accumulator ( for example : stay Executor End calls sum.value, The value obtained is not the final value of the accumulator ). So we say , The accumulator is a distributed shared write only variable .
The accumulator should be placed in the action operator , Because the number of times the conversion operator is executed depends on job The number of , If one spark The application has multiple action operators , The accumulator in the conversion operator may be updated more than once , Lead to wrong result . therefore , If you want an accumulator that is absolutely reliable both in failure and in repeated calculations , We have to put it in foreach() In such an action operator .
For accumulators used in action operators ,Spark Just put everyone Job The modification of each accumulator is applied once .
Two 、 Custom accumulator
Custom accumulator steps
(1) Inherit AccumulatorV2, Set input 、 Output generics
(2) rewrite 6 Abstract methods
(3) To use a custom accumulator, you need to register :sc.register( accumulator ," Accumulator name ")
object accumulator03_define {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("WC")
.setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(List("Hello", "Hello", "Hello", "Hello", "Spark", "Spark","Hive","Hive"), 2)
// Create accumulators
val acc: myaccumulator = new myaccumulator()
// Register accumulator
sc.register(acc,"wordcount")
// Use accumulator
rdd.foreach{
word => {
acc.add(word)
}
}
// Get the accumulation result of the accumulator
println(acc.value)
sc.stop()
}
}
class myaccumulator extends AccumulatorV2[String,mutable.Map[String,Long]]{
// Define the output data set , A variable Map
var map: mutable.Map[String, Long] = mutable.Map[String,Long]()
// Whether it is in initialization state , If the set data is empty , This is the initialization state
override def isZero: Boolean = map.isEmpty
// Copy accumulator
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = new myaccumulator()
// Reset accumulator
override def reset(): Unit = map.clear()
// Add data
override def add(v: String): Unit = {
if(v.startsWith("H")){
map(v) = map.getOrElse(v,0L) + 1
}
}
// Merge accumulator
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
val map2: mutable.Map[String, Long] = other.value
map2.foreach{
case (word,count) =>{
map(word) = map.getOrElse(word,0L) + count
}
}
}
// The value of the accumulator
override def value: mutable.Map[String, Long] = map
}边栏推荐
- Maya hollowed out modeling
- Redis-字符串类型
- Leetcode sum of two numbers
- [ssrf-01] principle and utilization examples of server-side Request Forgery vulnerability
- How to improve the level of pinduoduo store? Dianyingtong came to tell you
- 01.Go语言介绍
- Internship: unfamiliar annotations involved in the project code and their functions
- [solution] every time idea starts, it will build project
- Exness: Mercedes Benz's profits exceed expectations, and it is predicted that there will be a supply chain shortage in 2022
- 抓包整理外篇——————状态栏[ 四]
猜你喜欢

PHP campus financial management system for computer graduation design
![[technology development -28]: overview of information and communication network, new technology forms, high-quality development of information and communication industry](/img/94/05b2ff62a8a11340cc94c69645db73.png)
[technology development -28]: overview of information and communication network, new technology forms, high-quality development of information and communication industry

SPI communication protocol

Virtual machine network, networking settings, interconnection with host computer, network configuration
![NLP fourth paradigm: overview of prompt [pre train, prompt, predict] [Liu Pengfei]](/img/11/a01348dbfcae2042ec9f3e40065f3a.png)
NLP fourth paradigm: overview of prompt [pre train, prompt, predict] [Liu Pengfei]

Exness: Mercedes Benz's profits exceed expectations, and it is predicted that there will be a supply chain shortage in 2022

leetcode-两数之和

NumPy 数组索引 切片

Using SA token to solve websocket handshake authentication
![[flask] official tutorial -part3: blog blueprint, project installability](/img/fd/fc922b41316338943067469db958e2.png)
[flask] official tutorial -part3: blog blueprint, project installability
随机推荐
[solved] how to generate a beautiful static document description page
剑指 Offer 38. 字符串的排列
Redis如何实现多可用区?
Shutter doctor: Xcode installation is incomplete
A Cooperative Approach to Particle Swarm Optimization
[solution] every time idea starts, it will build project
Blue Bridge Cup embedded_ STM32_ New project file_ Explain in detail
Force buckle 9 palindromes
Regular expressions: examples (1)
Redis-列表
Leetcode skimming questions_ Sum of squares
Paddle framework: paddlenlp overview [propeller natural language processing development library]
leetcode-2.回文判断
Leetcode skimming questions_ Verify palindrome string II
Using SA token to solve websocket handshake authentication
MCU lightweight system core
02. Go language development environment configuration
This time, thoroughly understand the deep copy
2 power view
D22:indeterminate equation (indefinite equation, translation + problem solution)