当前位置:网站首页>Spark accumulator
Spark accumulator
2022-07-06 02:04:00 【Diligent ls】
One 、 System accumulator
accumulator : Distributed sharing writes only variables .(Executor and Executor Data cannot be read between )
An accumulator is used to put Executor End variable information is aggregated to Driver End . stay Driver A variable defined in , stay Executor Each end task Will get a new copy of this variable , Every task After updating the values of these copies , Send back Driver Perform consolidation calculation at the end .

Definition :SparkContext.accumulator(initialValue) Method
object accumulator01_system {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("WC")
.setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)))
val sum: LongAccumulator = sc.longAccumulator("sum")
dataRDD.foreach{
case (word,count) => {
sum.add(count)
}
}
println(("a",sum.value))
sc.stop()
}
}Be careful :
Executor The task at the end cannot read the value of the accumulator ( for example : stay Executor End calls sum.value, The value obtained is not the final value of the accumulator ). So we say , The accumulator is a distributed shared write only variable .
The accumulator should be placed in the action operator , Because the number of times the conversion operator is executed depends on job The number of , If one spark The application has multiple action operators , The accumulator in the conversion operator may be updated more than once , Lead to wrong result . therefore , If you want an accumulator that is absolutely reliable both in failure and in repeated calculations , We have to put it in foreach() In such an action operator .
For accumulators used in action operators ,Spark Just put everyone Job The modification of each accumulator is applied once .
Two 、 Custom accumulator
Custom accumulator steps
(1) Inherit AccumulatorV2, Set input 、 Output generics
(2) rewrite 6 Abstract methods
(3) To use a custom accumulator, you need to register :sc.register( accumulator ," Accumulator name ")
object accumulator03_define {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("WC")
.setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(List("Hello", "Hello", "Hello", "Hello", "Spark", "Spark","Hive","Hive"), 2)
// Create accumulators
val acc: myaccumulator = new myaccumulator()
// Register accumulator
sc.register(acc,"wordcount")
// Use accumulator
rdd.foreach{
word => {
acc.add(word)
}
}
// Get the accumulation result of the accumulator
println(acc.value)
sc.stop()
}
}
class myaccumulator extends AccumulatorV2[String,mutable.Map[String,Long]]{
// Define the output data set , A variable Map
var map: mutable.Map[String, Long] = mutable.Map[String,Long]()
// Whether it is in initialization state , If the set data is empty , This is the initialization state
override def isZero: Boolean = map.isEmpty
// Copy accumulator
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = new myaccumulator()
// Reset accumulator
override def reset(): Unit = map.clear()
// Add data
override def add(v: String): Unit = {
if(v.startsWith("H")){
map(v) = map.getOrElse(v,0L) + 1
}
}
// Merge accumulator
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
val map2: mutable.Map[String, Long] = other.value
map2.foreach{
case (word,count) =>{
map(word) = map.getOrElse(word,0L) + count
}
}
}
// The value of the accumulator
override def value: mutable.Map[String, Long] = map
}边栏推荐
- 【Flask】官方教程(Tutorial)-part2:蓝图-视图、模板、静态文件
- MySQL index
- Using SA token to solve websocket handshake authentication
- Social networking website for college students based on computer graduation design PHP
- 500 lines of code to understand the principle of mecached cache client driver
- Leetcode3, implémenter strstr ()
- Leetcode skimming questions_ Verify palindrome string II
- 竞价推广流程
- Alibaba canal usage details (pit draining version)_ MySQL and ES data synchronization
- Basic operations of databases and tables ----- unique constraints
猜你喜欢

Computer graduation design PHP campus restaurant online ordering system

Leetcode skimming questions_ Verify palindrome string II

Computer graduation design PHP enterprise staff training management system

1. Introduction to basic functions of power query

leetcode-两数之和

MySQL index

You are using pip version 21.1.1; however, version 22.0.3 is available. You should consider upgradin

Computer graduation design PHP college classroom application management system

Basic operations of database and table ----- delete data table

使用npm发布自己开发的工具包笔记
随机推荐
How to improve the level of pinduoduo store? Dianyingtong came to tell you
The ECU of 21 Audi q5l 45tfsi brushes is upgraded to master special adjustment, and the horsepower is safely and stably increased to 305 horsepower
selenium 元素定位(2)
Win10 add file extension
Thinking about the best practice of dynamics 365 development collaboration
Flutter Doctor:Xcode 安装不完整
Comments on flowable source code (XXXV) timer activation process definition processor, process instance migration job processor
Gbase 8C database upgrade error
Leetcode skimming questions_ Verify palindrome string II
[flask] response, session and message flashing
Poj2315 football games
SPI communication protocol
Redis-列表
Folio. Ink is a free, fast and easy-to-use image sharing tool
阿裏測開面試題
NiO related knowledge (II)
Redis如何实现多可用区?
[Clickhouse] Clickhouse based massive data interactive OLAP analysis scenario practice
【Flask】官方教程(Tutorial)-part1:项目布局、应用程序设置、定义和访问数据库
Executing two identical SQL statements in the same sqlsession will result in different total numbers