当前位置:网站首页>Spark累加器和广播变量

Spark累加器和广播变量

2022-06-24 06:39:00 Angryshark_128

累加器

累加器有些类似Redis的计数器,但要比计数器强大,不仅可以用于计数,还可以用来累加求和、累加合并元素等。

假设我们有一个word.txt文本,我们想要统计该文本中单词“sheep”的行数,我们可以直接读取文本filter过滤然后计数。

sc.textFile("word.txt").filter(_.contains("sheep")).count()

假设我们想分别统计文本中单词"sheep""wolf"的行数,如果按照上述方法需要计算两次

sc.textFile("word.txt").filter(_.contains("sheep")).count()
sc.textFile("word.txt").filter(_.contains("wolf")).count()

如果要分别统计100个单词的行数,则要计算100次

如果使用累加器,则只需要读一次即可

val count1=sc.acccumlator(0)
val count2=sc.acccumlator(0)
...

def processLine(line:String):Unit{
    
   if(line.contains("sheep")){
       count1+=1
   }
   
   if(line.contains("wolf")){
       count2+=1
   }
   
   ...
}


sc.textFile("word.txt").foreach(processLine(_))

不仅Int类型可以累加,Long、Double、Collection也可以累加,还可以进行自定义,而且这个变量可以在Spark的WebUI界面看到。

注意:累加器只能在Driver端定义和读取,不能在Executor端读取

广播变量

广播变量允许缓存一个只读的变量在每台机器(worker)上面,而不是每个任务(task)保存一份备份。利用广播变量能够以一种更有效率的方式将一个大数据量输入集合的副本分配给每个节点。

广播变量通过两个方面提高数据共享效率:

(1)集群中每个节点(物理机器)只有一个副本,默认的闭包是每个任务一个副本;

(2)广播传输是通过BT下载模式实现的,也就是P2P下载,在集群多的情况下,可以极大地提高数据传输速率。广播变量修改后,不会反馈到其他节点。

val list=sc.parallize(0 to 10)
val brdList=sc.broadcast(list)

sc.textFile("test.txt").filter(brdList.value.contains(_.toInt)).foreach(println)

使用时,需注意:

(1)适用于小变量分发,对于动则几十M的变量,每个任务都发送一次既消耗内存,也浪费时间

(2)广播变量只能在driver端定义,在Executor端读取,Executor不能修改

原网站

版权声明
本文为[Angryshark_128]所创,转载请带上原文链接,感谢
https://pandora.blog.csdn.net/article/details/111998322