当前位置:网站首页>Broadcast variables and accumulators in spark
Broadcast variables and accumulators in spark
2022-07-06 21:44:00 【Big data Xiaochen】
Broadcast variables
take Driver The shared data of the process is sent to all child nodes Executor In each task of the process .
If you don't use broadcast variable technology , that Driver The client will distribute the shared data to each by default 【Task】 in , Causing great pressure on network distribution .
If broadcast variable technology is used , be Driver The client will only send the shared data to each 【Executor】 One copy .Executor All in 【Task】 Reuse this object .
Ensure that the shared object is available 【 serialize 】 Of . Because the data transmitted across nodes should be serializable .
stay Driver The client broadcasts the shared objects to each Executor:
val bc = sc.broadcast( Shared objects )
stay Executor In order to get :
bc.value
accumulator
All in the cluster Executor Accumulate the same variable .
Spark Currently only support tired 【 Add 】 operation .
Yes 3 A built-in accumulator :【LongAccumulator】、【DoubleAccumulator】、【CollectionAccumulator】.
How to use the integer accumulator
stay Driver End define integer accumulator , Initial value of Fu .
acc=sc.accumulator(0)
stay Executor The end accumulates each time 1
acc+=1 perhaps acc.add(1)
eg:
# -*- coding:utf-8 -*-
# Desc:This is Code Desc
import os
import json
import re
import time
from pyspark import SparkConf, SparkContext, StorageLevel
os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/bin/python3.8"
# When multiple versions exist , Failure to specify is likely to result in an error
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
if __name__ == '__main__':
#1- establish SparkContext Context object
conf=SparkConf().setAppName("8_sharevalue").setMaster("local[*]")
sc=SparkContext(conf=conf)
#2- Load data files to form RDD
file_rdd=sc.textFile("file:///export/pyworkspace/pyspark_sz26/pyspark-sparkcore-3.1.2/data/accumulator_broadcast_data.txt")
#3- Define accumulator , Later, when you encounter special symbols, add 1 operation
acc=sc.accumulator(0)
#4- Put special symbols in the list
list_v=[",", ".", "!", "#", "$", "%"]
#5- Broadcast the list to each Executor in
bc=sc.broadcast(list_v)
#6- Data cleaning , Filter blank line
filtered_rdd=file_rdd.filter(lambda line:len(line.strip())>0)
#7- Yes RDD Split according to generalized white space characters
str_rdd=filtered_rdd.flatMap(lambda line:re.split('\\s+',line))
#8- Encounter special symbols , Just add the accumulator 1 operation , And eliminate special symbols , Form new RDD, There are only words in it
def filter_func(str):
# To use global variables acc
global acc
list_m=bc.value
if str in list_m:
acc+=1
return False
else:
return True
word_rdd=str_rdd.filter(filter_func)
# Cache here , effect 1 yes RDD Reuse of , effect 2 It is to prevent the accumulator from accumulating repeatedly
word_rdd.cache()
print(''' Be careful 1- Because the accumulation operation is delayed loading , need Action Operation trigger ,
Now, word_rdd Not yet Action operation , The count is still zero :''', acc.value)
#9- Right up there RDD Conduct wordcount.
wordcount_10=word_rdd.map(lambda word:(word,1))\
.reduceByKey(lambda x,y:x+y)\
.sortBy(lambda x: x[1], ascending=False) \
.take(10)
#10- Print wordcount Word frequency statistics
print('wordcount Word frequency statistics :',wordcount_10)
#11- Number of times to print symbols
print(' Number of symbols :',acc.value)
print(' The total number of words is :', word_rdd.count())
print(' Number of symbols :',acc.value)
print(''' Be careful 2- because word_rdd Yes 2 individual Action operation ,count and take,
If you don't word_rdd Do cache persistence , So for the first time take After triggering the calculation, the accumulator result is 8,
The second time count It will load and calculate again , It will cause continuous accumulation , The result doubled to 16.
add cache After cache persistence , The second time count The operation gets data directly from the cache , Avoid double counting , The accumulator will not double .
''')
边栏推荐
- Summary of cross partition scheme
- PostgreSQL 安装gis插件 CREATE EXTENSION postgis_topology
- 对话阿里巴巴副总裁贾扬清:追求大模型,并不是一件坏事
- JS operation DOM element (I) -- six ways to obtain DOM nodes
- JS学习笔记-OO创建怀疑的对象
- [redis design and implementation] part I: summary of redis data structure and objects
- guava: Multiset的使用
- 美国科技行业结束黄金时代,芯片求售、裁员3万等哀声不断
- El table table - get the row and column you click & the sort of El table and sort change, El table column and sort method & clear sort clearsort
- Thinking about agile development
猜你喜欢
1292_ Implementation analysis of vtask resume() and xtask resume fromisr() in freeros
Z function (extended KMP)
C# 如何在dataGridView里设置两个列comboboxcolumn绑定级联事件的一个二级联动效果
[in depth learning] pytorch 1.12 was released, officially supporting Apple M1 chip GPU acceleration and repairing many bugs
PostgreSQL 安装gis插件 CREATE EXTENSION postgis_topology
20220211 failure - maximum amount of data supported by mongodb
guava:Collections.unmodifiableXXX创建的collection并不immutable
Set up a time server
【力扣刷题】32. 最长有效括号
50 commonly used numpy function explanations, parameters and usage examples
随机推荐
From campus to Tencent work for a year of those stumbles!
数字化转型挂帅复产复工,线上线下全融合重建商业逻辑
MySQL - 事务(Transaction)详解
【滑动窗口】第九届蓝桥杯省赛B组:日志统计
What about the spectrogram
基于InsightFace的高精度人脸识别,可直接对标虹软
嵌入式开发的7大原罪
1292_ Implementation analysis of vtask resume() and xtask resume fromisr() in freeros
JS traversal array and string
在最长的距离二叉树结点
50个常用的Numpy函数解释,参数和使用示例
npm run dev启动项目报错 document is not defined
Digital transformation takes the lead to resume production and work, and online and offline full integration rebuilds business logic
ACdreamoj1110(多重背包)
High precision face recognition based on insightface, which can directly benchmark hongruan
Vim 基本配置和经常使用的命令
Reinforcement learning - learning notes 5 | alphago
@GetMapping、@PostMapping 和 @RequestMapping详细区别附实战代码(全)
Five wars of Chinese Baijiu
El table table - sortable sorting & disordered sorting when decimal and% appear