当前位置:网站首页>Spark accumulator
Spark accumulator
2022-07-06 02:04:00 【Diligent ls】
One 、 System accumulator
accumulator : Distributed sharing writes only variables .(Executor and Executor Data cannot be read between )
An accumulator is used to put Executor End variable information is aggregated to Driver End . stay Driver A variable defined in , stay Executor Each end task Will get a new copy of this variable , Every task After updating the values of these copies , Send back Driver Perform consolidation calculation at the end .
Definition :SparkContext.accumulator(initialValue) Method
object accumulator01_system {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("WC")
.setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)))
val sum: LongAccumulator = sc.longAccumulator("sum")
dataRDD.foreach{
case (word,count) => {
sum.add(count)
}
}
println(("a",sum.value))
sc.stop()
}
}
Be careful :
Executor The task at the end cannot read the value of the accumulator ( for example : stay Executor End calls sum.value, The value obtained is not the final value of the accumulator ). So we say , The accumulator is a distributed shared write only variable .
The accumulator should be placed in the action operator , Because the number of times the conversion operator is executed depends on job The number of , If one spark The application has multiple action operators , The accumulator in the conversion operator may be updated more than once , Lead to wrong result . therefore , If you want an accumulator that is absolutely reliable both in failure and in repeated calculations , We have to put it in foreach() In such an action operator .
For accumulators used in action operators ,Spark Just put everyone Job The modification of each accumulator is applied once .
Two 、 Custom accumulator
Custom accumulator steps
(1) Inherit AccumulatorV2, Set input 、 Output generics
(2) rewrite 6 Abstract methods
(3) To use a custom accumulator, you need to register :sc.register( accumulator ," Accumulator name ")
object accumulator03_define {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("WC")
.setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(List("Hello", "Hello", "Hello", "Hello", "Spark", "Spark","Hive","Hive"), 2)
// Create accumulators
val acc: myaccumulator = new myaccumulator()
// Register accumulator
sc.register(acc,"wordcount")
// Use accumulator
rdd.foreach{
word => {
acc.add(word)
}
}
// Get the accumulation result of the accumulator
println(acc.value)
sc.stop()
}
}
class myaccumulator extends AccumulatorV2[String,mutable.Map[String,Long]]{
// Define the output data set , A variable Map
var map: mutable.Map[String, Long] = mutable.Map[String,Long]()
// Whether it is in initialization state , If the set data is empty , This is the initialization state
override def isZero: Boolean = map.isEmpty
// Copy accumulator
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = new myaccumulator()
// Reset accumulator
override def reset(): Unit = map.clear()
// Add data
override def add(v: String): Unit = {
if(v.startsWith("H")){
map(v) = map.getOrElse(v,0L) + 1
}
}
// Merge accumulator
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
val map2: mutable.Map[String, Long] = other.value
map2.foreach{
case (word,count) =>{
map(word) = map.getOrElse(word,0L) + count
}
}
}
// The value of the accumulator
override def value: mutable.Map[String, Long] = map
}
边栏推荐
- Derivation of Biot Savart law in College Physics
- Accelerating spark data access with alluxio in kubernetes
- Cadre du Paddle: aperçu du paddlelnp [bibliothèque de développement pour le traitement du langage naturel des rames volantes]
- NumPy 数组索引 切片
- How to set an alias inside a bash shell script so that is it visible from the outside?
- GBase 8c数据库升级报错
- Extracting key information from TrueType font files
- The ECU of 21 Audi q5l 45tfsi brushes is upgraded to master special adjustment, and the horsepower is safely and stably increased to 305 horsepower
- MCU lightweight system core
- Campus second-hand transaction based on wechat applet
猜你喜欢
PHP campus financial management system for computer graduation design
2022年PMP项目管理考试敏捷知识点(8)
Redis string type
UE4 unreal engine, editor basic application, usage skills (IV)
2 power view
[Clickhouse] Clickhouse based massive data interactive OLAP analysis scenario practice
干货!通过软硬件协同设计加速稀疏神经网络
SQL statement
Open source | Ctrip ticket BDD UI testing framework flybirds
Leetcode3, implémenter strstr ()
随机推荐
D22:indeterminate equation (indefinite equation, translation + problem solution)
抓包整理外篇——————状态栏[ 四]
Folio. Ink is a free, fast and easy-to-use image sharing tool
Cookie concept, basic use, principle, details and Chinese transmission
A basic lintcode MySQL database problem
leetcode-两数之和
Blue Bridge Cup embedded_ STM32_ New project file_ Explain in detail
500 lines of code to understand the principle of mecached cache client driver
Exness: Mercedes Benz's profits exceed expectations, and it is predicted that there will be a supply chain shortage in 2022
01. Go language introduction
[flask] official tutorial -part2: Blueprint - view, template, static file
【Flask】响应、session与Message Flashing
Leetcode skimming questions_ Verify palindrome string II
Basic operations of databases and tables ----- non empty constraints
Grabbing and sorting out external articles -- status bar [4]
Basic operations of database and table ----- delete data table
Gbase 8C database upgrade error
Paddle framework: paddlenlp overview [propeller natural language processing development library]
Thinking about the best practice of dynamics 365 development collaboration
通过PHP 获取身份证相关信息 获取生肖,获取星座,获取年龄,获取性别