当前位置:网站首页>Broadcast variables and accumulators in spark
Broadcast variables and accumulators in spark
2022-07-06 21:44:00 【Big data Xiaochen】
Broadcast variables
take Driver The shared data of the process is sent to all child nodes Executor In each task of the process .
If you don't use broadcast variable technology , that Driver The client will distribute the shared data to each by default 【Task】 in , Causing great pressure on network distribution .
If broadcast variable technology is used , be Driver The client will only send the shared data to each 【Executor】 One copy .Executor All in 【Task】 Reuse this object .
Ensure that the shared object is available 【 serialize 】 Of . Because the data transmitted across nodes should be serializable .
stay Driver The client broadcasts the shared objects to each Executor:
val bc = sc.broadcast( Shared objects )
stay Executor In order to get :
bc.value
accumulator
All in the cluster Executor Accumulate the same variable .
Spark Currently only support tired 【 Add 】 operation .
Yes 3 A built-in accumulator :【LongAccumulator】、【DoubleAccumulator】、【CollectionAccumulator】.
How to use the integer accumulator
stay Driver End define integer accumulator , Initial value of Fu .
acc=sc.accumulator(0)
stay Executor The end accumulates each time 1
acc+=1 perhaps acc.add(1)
eg:
# -*- coding:utf-8 -*-
# Desc:This is Code Desc
import os
import json
import re
import time
from pyspark import SparkConf, SparkContext, StorageLevel
os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/bin/python3.8"
# When multiple versions exist , Failure to specify is likely to result in an error
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
if __name__ == '__main__':
#1- establish SparkContext Context object
conf=SparkConf().setAppName("8_sharevalue").setMaster("local[*]")
sc=SparkContext(conf=conf)
#2- Load data files to form RDD
file_rdd=sc.textFile("file:///export/pyworkspace/pyspark_sz26/pyspark-sparkcore-3.1.2/data/accumulator_broadcast_data.txt")
#3- Define accumulator , Later, when you encounter special symbols, add 1 operation
acc=sc.accumulator(0)
#4- Put special symbols in the list
list_v=[",", ".", "!", "#", "$", "%"]
#5- Broadcast the list to each Executor in
bc=sc.broadcast(list_v)
#6- Data cleaning , Filter blank line
filtered_rdd=file_rdd.filter(lambda line:len(line.strip())>0)
#7- Yes RDD Split according to generalized white space characters
str_rdd=filtered_rdd.flatMap(lambda line:re.split('\\s+',line))
#8- Encounter special symbols , Just add the accumulator 1 operation , And eliminate special symbols , Form new RDD, There are only words in it
def filter_func(str):
# To use global variables acc
global acc
list_m=bc.value
if str in list_m:
acc+=1
return False
else:
return True
word_rdd=str_rdd.filter(filter_func)
# Cache here , effect 1 yes RDD Reuse of , effect 2 It is to prevent the accumulator from accumulating repeatedly
word_rdd.cache()
print(''' Be careful 1- Because the accumulation operation is delayed loading , need Action Operation trigger ,
Now, word_rdd Not yet Action operation , The count is still zero :''', acc.value)
#9- Right up there RDD Conduct wordcount.
wordcount_10=word_rdd.map(lambda word:(word,1))\
.reduceByKey(lambda x,y:x+y)\
.sortBy(lambda x: x[1], ascending=False) \
.take(10)
#10- Print wordcount Word frequency statistics
print('wordcount Word frequency statistics :',wordcount_10)
#11- Number of times to print symbols
print(' Number of symbols :',acc.value)
print(' The total number of words is :', word_rdd.count())
print(' Number of symbols :',acc.value)
print(''' Be careful 2- because word_rdd Yes 2 individual Action operation ,count and take,
If you don't word_rdd Do cache persistence , So for the first time take After triggering the calculation, the accumulator result is 8,
The second time count It will load and calculate again , It will cause continuous accumulation , The result doubled to 16.
add cache After cache persistence , The second time count The operation gets data directly from the cache , Avoid double counting , The accumulator will not double .
''')
边栏推荐
- Enhance network security of kubernetes with cilium
- 038. (2.7) less anxiety
- Fzu 1686 dragon mystery repeated coverage
- Yyds dry inventory run kubeedge official example_ Counter demo counter
- [go][转载]vscode配置完go跑个helloworld例子
- The underlying implementation of string
- R language for text mining Part4 text classification
- string的底层实现
- Summary of cross partition scheme
- Sql: stored procedures and triggers - Notes
猜你喜欢
jvm:大对象在老年代的分配
50个常用的Numpy函数解释,参数和使用示例
Tiktok will push the independent grass planting app "praiseworthy". Can't bytes forget the little red book?
Set up a time server
1292_FreeROS中vTaskResume()以及xTaskResumeFromISR()的实现分析
039. (2.8) thoughts in the ward
Is this the feeling of being spoiled by bytes?
PostgreSQL 修改数据库用户的密码
美国科技行业结束黄金时代,芯片求售、裁员3万等哀声不断
[redis design and implementation] part I: summary of redis data structure and objects
随机推荐
1292_ Implementation analysis of vtask resume() and xtask resume fromisr() in freeros
JS get array subscript through array content
记一次清理挖矿病毒的过程
一行代码可以做些什么?
Redistemplate common collection instructions opsforlist (III)
首批入选!腾讯安全天御风控获信通院业务安全能力认证
WEB功能测试说明
通过数字电视通过宽带网络取代互联网电视机顶盒应用
Sql: stored procedures and triggers - Notes
Enhance network security of kubernetes with cilium
Explain ESM module and commonjs module in simple terms
Quick news: the flybook players' conference is held online; Wechat payment launched "education and training service toolbox"
What about the spectrogram
语谱图怎么看
numpy 下载安装
美国科技行业结束黄金时代,芯片求售、裁员3万等哀声不断
Proxy and reverse proxy
互联网快讯:吉利正式收购魅族;胰岛素集采在31省全面落地
Redistemplate common collection instructions opsforzset (VI)
Quick access to video links at station B