当前位置:网站首页>Broadcast variables and accumulators in spark

Broadcast variables and accumulators in spark

2022-07-06 21:44:00 Big data Xiaochen

Broadcast variables

  • take Driver The shared data of the process is sent to all child nodes Executor In each task of the process .

    If you don't use broadcast variable technology , that Driver The client will distribute the shared data to each by default 【Task】 in , Causing great pressure on network distribution .

    If broadcast variable technology is used , be Driver The client will only send the shared data to each 【Executor】 One copy .Executor All in 【Task】 Reuse this object .

  • Ensure that the shared object is available 【 serialize 】 Of . Because the data transmitted across nodes should be serializable .

  • stay Driver The client broadcasts the shared objects to each Executor:

val bc = sc.broadcast(  Shared objects  )

  stay Executor In order to get :

bc.value

  accumulator

  • All in the cluster Executor Accumulate the same variable .

  • Spark Currently only support tired 【 Add 】 operation .

  • Yes 3 A built-in accumulator :【LongAccumulator】、【DoubleAccumulator】、【CollectionAccumulator】.

  • How to use the integer accumulator

    • stay Driver End define integer accumulator , Initial value of Fu .

      acc=sc.accumulator(0)

      stay Executor The end accumulates each time 1  

      acc+=1
       perhaps acc.add(1)

eg: 

# -*- coding:utf-8 -*-
# Desc:This is Code Desc
import os
import json
import re
import time

from pyspark import SparkConf, SparkContext, StorageLevel

os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/bin/python3.8"
#  When multiple versions exist , Failure to specify is likely to result in an error 
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON

if __name__ == '__main__':
    #1- establish SparkContext Context object 
    conf=SparkConf().setAppName("8_sharevalue").setMaster("local[*]")
    sc=SparkContext(conf=conf)
    #2- Load data files to form RDD
    file_rdd=sc.textFile("file:///export/pyworkspace/pyspark_sz26/pyspark-sparkcore-3.1.2/data/accumulator_broadcast_data.txt")
    #3- Define accumulator , Later, when you encounter special symbols, add 1 operation 
    acc=sc.accumulator(0)
    #4- Put special symbols in the list 
    list_v=[",", ".", "!", "#", "$", "%"]
    #5- Broadcast the list to each Executor in 
    bc=sc.broadcast(list_v)
    #6- Data cleaning , Filter blank line 
    filtered_rdd=file_rdd.filter(lambda line:len(line.strip())>0)
    #7- Yes RDD Split according to generalized white space characters 
    str_rdd=filtered_rdd.flatMap(lambda line:re.split('\\s+',line))
    #8- Encounter special symbols , Just add the accumulator 1 operation , And eliminate special symbols , Form new RDD, There are only words in it 
    def filter_func(str):
        # To use global variables acc
        global acc
        list_m=bc.value
        if str in list_m:
            acc+=1
            return False
        else:
            return True

    word_rdd=str_rdd.filter(filter_func)
    # Cache here , effect 1 yes RDD Reuse of , effect 2 It is to prevent the accumulator from accumulating repeatedly 
    word_rdd.cache()
    print(''' Be careful 1- Because the accumulation operation is delayed loading , need Action Operation trigger ,
     Now, word_rdd Not yet Action operation , The count is still zero :''', acc.value)

    #9- Right up there RDD Conduct wordcount.
    wordcount_10=word_rdd.map(lambda word:(word,1))\
        .reduceByKey(lambda x,y:x+y)\
        .sortBy(lambda x: x[1], ascending=False) \
        .take(10)
    #10- Print wordcount Word frequency statistics 
    print('wordcount Word frequency statistics :',wordcount_10)
    #11- Number of times to print symbols 
    print(' Number of symbols :',acc.value)
    print(' The total number of words is :', word_rdd.count())
    print(' Number of symbols :',acc.value)
    print(''' Be careful 2- because word_rdd Yes 2 individual Action operation ,count and take,
         If you don't word_rdd Do cache persistence , So for the first time take After triggering the calculation, the accumulator result is 8,
         The second time count It will load and calculate again , It will cause continuous accumulation , The result doubled to 16.
         add cache After cache persistence , The second time count The operation gets data directly from the cache , Avoid double counting , The accumulator will not double .
        ''')

 

原网站

版权声明
本文为[Big data Xiaochen]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202131122070650.html