当前位置:网站首页>Spark accumulator
Spark accumulator
2022-07-06 02:04:00 【Diligent ls】
One 、 System accumulator
accumulator : Distributed sharing writes only variables .(Executor and Executor Data cannot be read between )
An accumulator is used to put Executor End variable information is aggregated to Driver End . stay Driver A variable defined in , stay Executor Each end task Will get a new copy of this variable , Every task After updating the values of these copies , Send back Driver Perform consolidation calculation at the end .
Definition :SparkContext.accumulator(initialValue) Method
object accumulator01_system {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("WC")
.setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)))
val sum: LongAccumulator = sc.longAccumulator("sum")
dataRDD.foreach{
case (word,count) => {
sum.add(count)
}
}
println(("a",sum.value))
sc.stop()
}
}
Be careful :
Executor The task at the end cannot read the value of the accumulator ( for example : stay Executor End calls sum.value, The value obtained is not the final value of the accumulator ). So we say , The accumulator is a distributed shared write only variable .
The accumulator should be placed in the action operator , Because the number of times the conversion operator is executed depends on job The number of , If one spark The application has multiple action operators , The accumulator in the conversion operator may be updated more than once , Lead to wrong result . therefore , If you want an accumulator that is absolutely reliable both in failure and in repeated calculations , We have to put it in foreach() In such an action operator .
For accumulators used in action operators ,Spark Just put everyone Job The modification of each accumulator is applied once .
Two 、 Custom accumulator
Custom accumulator steps
(1) Inherit AccumulatorV2, Set input 、 Output generics
(2) rewrite 6 Abstract methods
(3) To use a custom accumulator, you need to register :sc.register( accumulator ," Accumulator name ")
object accumulator03_define {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("WC")
.setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(List("Hello", "Hello", "Hello", "Hello", "Spark", "Spark","Hive","Hive"), 2)
// Create accumulators
val acc: myaccumulator = new myaccumulator()
// Register accumulator
sc.register(acc,"wordcount")
// Use accumulator
rdd.foreach{
word => {
acc.add(word)
}
}
// Get the accumulation result of the accumulator
println(acc.value)
sc.stop()
}
}
class myaccumulator extends AccumulatorV2[String,mutable.Map[String,Long]]{
// Define the output data set , A variable Map
var map: mutable.Map[String, Long] = mutable.Map[String,Long]()
// Whether it is in initialization state , If the set data is empty , This is the initialization state
override def isZero: Boolean = map.isEmpty
// Copy accumulator
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = new myaccumulator()
// Reset accumulator
override def reset(): Unit = map.clear()
// Add data
override def add(v: String): Unit = {
if(v.startsWith("H")){
map(v) = map.getOrElse(v,0L) + 1
}
}
// Merge accumulator
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
val map2: mutable.Map[String, Long] = other.value
map2.foreach{
case (word,count) =>{
map(word) = map.getOrElse(word,0L) + count
}
}
}
// The value of the accumulator
override def value: mutable.Map[String, Long] = map
}
边栏推荐
- 500 lines of code to understand the principle of mecached cache client driver
- Executing two identical SQL statements in the same sqlsession will result in different total numbers
- 同一个 SqlSession 中执行两条一模一样的SQL语句查询得到的 total 数量不一样
- Redis-Key的操作
- Jisuanke - t2063_ Missile interception
- Online reservation system of sports venues based on PHP
- Leetcode skimming questions_ Invert vowels in a string
- Regular expressions: examples (1)
- Ali test open-ended questions
- Unreal browser plug-in
猜你喜欢
National intangible cultural heritage inheritor HD Wang's shadow digital collection of "Four Beauties" made an amazing debut!
How to upgrade kubernetes in place
selenium 等待方式
Campus second-hand transaction based on wechat applet
Force buckle 1020 Number of enclaves
Card 4G industrial router charging pile intelligent cabinet private network video monitoring 4G to Ethernet to WiFi wired network speed test software and hardware customization
It's wrong to install PHP zbarcode extension. I don't know if any God can help me solve it. 7.3 for PHP environment
[ssrf-01] principle and utilization examples of server-side Request Forgery vulnerability
Win10 add file extension
dried food! Accelerating sparse neural network through hardware and software co design
随机推荐
[flask] official tutorial -part1: project layout, application settings, definition and database access
Redis-列表
Blue Bridge Cup embedded_ STM32_ New project file_ Explain in detail
Leetcode skimming questions_ Sum of squares
竞价推广流程
【Flask】响应、session与Message Flashing
Online reservation system of sports venues based on PHP
Mongodb problem set
[technology development -28]: overview of information and communication network, new technology forms, high-quality development of information and communication industry
[flask] static file and template rendering
2022年PMP项目管理考试敏捷知识点(8)
You are using pip version 21.1.1; however, version 22.0.3 is available. You should consider upgradin
Poj2315 football games
[solution] every time idea starts, it will build project
A basic lintcode MySQL database problem
ClickOnce does not support request execution level 'requireAdministrator'
Blue Bridge Cup embedded_ STM32 learning_ Key_ Explain in detail
Pangolin Library: subgraph
Paddle框架:PaddleNLP概述【飞桨自然语言处理开发库】
NLP fourth paradigm: overview of prompt [pre train, prompt, predict] [Liu Pengfei]