当前位置:网站首页>Spark accumulator
Spark accumulator
2022-07-06 02:04:00 【Diligent ls】
One 、 System accumulator
accumulator : Distributed sharing writes only variables .(Executor and Executor Data cannot be read between )
An accumulator is used to put Executor End variable information is aggregated to Driver End . stay Driver A variable defined in , stay Executor Each end task Will get a new copy of this variable , Every task After updating the values of these copies , Send back Driver Perform consolidation calculation at the end .
Definition :SparkContext.accumulator(initialValue) Method
object accumulator01_system {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("WC")
.setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)))
val sum: LongAccumulator = sc.longAccumulator("sum")
dataRDD.foreach{
case (word,count) => {
sum.add(count)
}
}
println(("a",sum.value))
sc.stop()
}
}
Be careful :
Executor The task at the end cannot read the value of the accumulator ( for example : stay Executor End calls sum.value, The value obtained is not the final value of the accumulator ). So we say , The accumulator is a distributed shared write only variable .
The accumulator should be placed in the action operator , Because the number of times the conversion operator is executed depends on job The number of , If one spark The application has multiple action operators , The accumulator in the conversion operator may be updated more than once , Lead to wrong result . therefore , If you want an accumulator that is absolutely reliable both in failure and in repeated calculations , We have to put it in foreach() In such an action operator .
For accumulators used in action operators ,Spark Just put everyone Job The modification of each accumulator is applied once .
Two 、 Custom accumulator
Custom accumulator steps
(1) Inherit AccumulatorV2, Set input 、 Output generics
(2) rewrite 6 Abstract methods
(3) To use a custom accumulator, you need to register :sc.register( accumulator ," Accumulator name ")
object accumulator03_define {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("WC")
.setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(List("Hello", "Hello", "Hello", "Hello", "Spark", "Spark","Hive","Hive"), 2)
// Create accumulators
val acc: myaccumulator = new myaccumulator()
// Register accumulator
sc.register(acc,"wordcount")
// Use accumulator
rdd.foreach{
word => {
acc.add(word)
}
}
// Get the accumulation result of the accumulator
println(acc.value)
sc.stop()
}
}
class myaccumulator extends AccumulatorV2[String,mutable.Map[String,Long]]{
// Define the output data set , A variable Map
var map: mutable.Map[String, Long] = mutable.Map[String,Long]()
// Whether it is in initialization state , If the set data is empty , This is the initialization state
override def isZero: Boolean = map.isEmpty
// Copy accumulator
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = new myaccumulator()
// Reset accumulator
override def reset(): Unit = map.clear()
// Add data
override def add(v: String): Unit = {
if(v.startsWith("H")){
map(v) = map.getOrElse(v,0L) + 1
}
}
// Merge accumulator
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
val map2: mutable.Map[String, Long] = other.value
map2.foreach{
case (word,count) =>{
map(word) = map.getOrElse(word,0L) + count
}
}
}
// The value of the accumulator
override def value: mutable.Map[String, Long] = map
}
边栏推荐
- Redis-字符串类型
- leetcode3、实现 strStr()
- Selenium element positioning (2)
- [flask] official tutorial -part3: blog blueprint, project installability
- Computer graduation design PHP college classroom application management system
- 0211 embedded C language learning
- 【Flask】获取请求信息、重定向、错误处理
- [width first search] Ji Suan Ke: Suan tou Jun goes home (BFS with conditions)
- 【Flask】官方教程(Tutorial)-part3:blog蓝图、项目可安装化
- PHP campus movie website system for computer graduation design
猜你喜欢
Computer graduation design PHP college classroom application management system
Accelerating spark data access with alluxio in kubernetes
How to upgrade kubernetes in place
Leetcode skimming questions_ Verify palindrome string II
Publish your own toolkit notes using NPM
Cookie concept, basic use, principle, details and Chinese transmission
[Jiudu OJ 09] two points to find student information
使用npm发布自己开发的工具包笔记
Unity learning notes -- 2D one-way platform production method
Jisuanke - t2063_ Missile interception
随机推荐
干货!通过软硬件协同设计加速稀疏神经网络
Basic operations of databases and tables ----- default constraints
[ssrf-01] principle and utilization examples of server-side Request Forgery vulnerability
通过PHP 获取身份证相关信息 获取生肖,获取星座,获取年龄,获取性别
[depth first search] Ji Suan Ke: Betsy's trip
The ECU of 21 Audi q5l 45tfsi brushes is upgraded to master special adjustment, and the horsepower is safely and stably increased to 305 horsepower
Flowable source code comments (36) process instance migration status job processor, BPMN history cleanup job processor, external worker task completion job processor
Cookie concept, basic use, principle, details and Chinese transmission
Basic operations of databases and tables ----- primary key constraints
Ali test open-ended questions
Ali test Open face test
Redis list
阿裏測開面試題
Pangolin Library: subgraph
leetcode-2. Palindrome judgment
【Flask】响应、session与Message Flashing
Campus second-hand transaction based on wechat applet
【Flask】获取请求信息、重定向、错误处理
02. Go language development environment configuration
Leetcode3. Implement strstr()