当前位置:网站首页>Broadcast variables and accumulators in spark
Broadcast variables and accumulators in spark
2022-07-06 21:44:00 【Big data Xiaochen】
Broadcast variables
take Driver The shared data of the process is sent to all child nodes Executor In each task of the process .
If you don't use broadcast variable technology , that Driver The client will distribute the shared data to each by default 【Task】 in , Causing great pressure on network distribution .
If broadcast variable technology is used , be Driver The client will only send the shared data to each 【Executor】 One copy .Executor All in 【Task】 Reuse this object .
Ensure that the shared object is available 【 serialize 】 Of . Because the data transmitted across nodes should be serializable .
stay Driver The client broadcasts the shared objects to each Executor:
val bc = sc.broadcast( Shared objects )
stay Executor In order to get :
bc.value
accumulator
All in the cluster Executor Accumulate the same variable .
Spark Currently only support tired 【 Add 】 operation .
Yes 3 A built-in accumulator :【LongAccumulator】、【DoubleAccumulator】、【CollectionAccumulator】.
How to use the integer accumulator
stay Driver End define integer accumulator , Initial value of Fu .
acc=sc.accumulator(0)
stay Executor The end accumulates each time 1
acc+=1 perhaps acc.add(1)
eg:
# -*- coding:utf-8 -*-
# Desc:This is Code Desc
import os
import json
import re
import time
from pyspark import SparkConf, SparkContext, StorageLevel
os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/bin/python3.8"
# When multiple versions exist , Failure to specify is likely to result in an error
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
if __name__ == '__main__':
#1- establish SparkContext Context object
conf=SparkConf().setAppName("8_sharevalue").setMaster("local[*]")
sc=SparkContext(conf=conf)
#2- Load data files to form RDD
file_rdd=sc.textFile("file:///export/pyworkspace/pyspark_sz26/pyspark-sparkcore-3.1.2/data/accumulator_broadcast_data.txt")
#3- Define accumulator , Later, when you encounter special symbols, add 1 operation
acc=sc.accumulator(0)
#4- Put special symbols in the list
list_v=[",", ".", "!", "#", "$", "%"]
#5- Broadcast the list to each Executor in
bc=sc.broadcast(list_v)
#6- Data cleaning , Filter blank line
filtered_rdd=file_rdd.filter(lambda line:len(line.strip())>0)
#7- Yes RDD Split according to generalized white space characters
str_rdd=filtered_rdd.flatMap(lambda line:re.split('\\s+',line))
#8- Encounter special symbols , Just add the accumulator 1 operation , And eliminate special symbols , Form new RDD, There are only words in it
def filter_func(str):
# To use global variables acc
global acc
list_m=bc.value
if str in list_m:
acc+=1
return False
else:
return True
word_rdd=str_rdd.filter(filter_func)
# Cache here , effect 1 yes RDD Reuse of , effect 2 It is to prevent the accumulator from accumulating repeatedly
word_rdd.cache()
print(''' Be careful 1- Because the accumulation operation is delayed loading , need Action Operation trigger ,
Now, word_rdd Not yet Action operation , The count is still zero :''', acc.value)
#9- Right up there RDD Conduct wordcount.
wordcount_10=word_rdd.map(lambda word:(word,1))\
.reduceByKey(lambda x,y:x+y)\
.sortBy(lambda x: x[1], ascending=False) \
.take(10)
#10- Print wordcount Word frequency statistics
print('wordcount Word frequency statistics :',wordcount_10)
#11- Number of times to print symbols
print(' Number of symbols :',acc.value)
print(' The total number of words is :', word_rdd.count())
print(' Number of symbols :',acc.value)
print(''' Be careful 2- because word_rdd Yes 2 individual Action operation ,count and take,
If you don't word_rdd Do cache persistence , So for the first time take After triggering the calculation, the accumulator result is 8,
The second time count It will load and calculate again , It will cause continuous accumulation , The result doubled to 16.
add cache After cache persistence , The second time count The operation gets data directly from the cache , Avoid double counting , The accumulator will not double .
''')
边栏推荐
- @Detailed differences among getmapping, @postmapping and @requestmapping, with actual combat code (all)
- From campus to Tencent work for a year of those stumbles!
- PostgreSQL 安装gis插件 CREATE EXTENSION postgis_topology
- The relationship between root and coefficient of quadratic equation with one variable
- JS method to stop foreach
- MySQL - transaction details
- 一行代码可以做些什么?
- 红杉中国,刚刚募资90亿美元
- Is it profitable to host an Olympic Games?
- 技术分享 | 抓包分析 TCP 协议
猜你喜欢
Tiktok will push the independent grass planting app "praiseworthy". Can't bytes forget the little red book?
Uni app app half screen continuous code scanning
[in depth learning] pytorch 1.12 was released, officially supporting Apple M1 chip GPU acceleration and repairing many bugs
This year, Jianzhi Tencent
Numpy download and installation
Fastjson parses JSON strings (deserialized to list, map)
【力扣刷题】一维动态规划记录(53零钱兑换、300最长递增子序列、53最大子数组和)
Seven original sins of embedded development
3D face reconstruction: from basic knowledge to recognition / reconstruction methods!
Efficiency tool +wps check box shows the solution to the sun problem
随机推荐
一行代码可以做些什么?
ROS error: could not find a package configuration file provided by "move_base“
El table table - sortable sorting & disordered sorting when decimal and% appear
Description of web function test
Proxy and reverse proxy
[Li Kou brush questions] 32 Longest valid bracket
How do I remove duplicates from the list- How to remove duplicates from a list?
MySQL removes duplicates according to two fields
El table table - get the row and column you click & the sort of El table and sort change, El table column and sort method & clear sort clearsort
038. (2.7) less anxiety
[go][reprint]vscode run a HelloWorld example after configuring go
Caching strategies overview
Is this the feeling of being spoiled by bytes?
Tips for web development: skillfully use ThreadLocal to avoid layer by layer value transmission
FZU 1686 龙之谜 重复覆盖
麦趣尔砸了小众奶招牌
强化学习-学习笔记5 | AlphaGo
SDL2来源分析7:演出(SDL_RenderPresent())
C# 如何在dataGridView里设置两个列comboboxcolumn绑定级联事件的一个二级联动效果
[interpretation of the paper] machine learning technology for Cataract Classification / classification