当前位置:网站首页>pyspark学习笔记
pyspark学习笔记
2022-07-23 05:38:00 【我是女生,我不做程序媛】
spark框架:MapReduce

【注:reduce的个数不一定和key的个数相等,可能n个key对应m个reduce】
shuffle:处于一个宽依赖,可以实现类似混洗的功能,将相同的 Key 分发至同一个 Reducer上进行处理。
搭建sc环境:
from pyspark.sql import SparkSession
from pyspark.sql import Row
spark dataframe
spark dataframe与pandas的dataframe不同,是两种不同的数据类型,具有不同的函数和使用方法。
- 建立spark dataframe: df=spark_session.sql(‘sql’)
- 将spark dataframe转化为二维列表: df.collect()
外层数组是每一行数据(Row),里层数组是一行中每一列(Column)的数据
spark rdd
rdd是spark中的基础数据单元,每个rdd被分为多个分区,可以包含Python、Java、Scala中任意类型的对象。
创建rdd
- 读取外部数据集
lines = sc.textFile(“README.md”) - 分发驱动器程序中的对象集合(如list、set)
lst=[(‘Alice’,1),(‘Bob’,2)]
rdd = sc.parallelize( lst )
3. 从spark dataframe转化
rdd = spark.createDataFrame( lst ,[‘name’,‘age’] ).rdd
rdd基本操作
转化操作transformation
转化操作不进行实际计算和存储,只是记录计算的步骤(即惰性计算):
① map:将函数作用在rdd的每个元素中,函数返回结果作为结果rdd的值。
nums = sc.parallelize([1,2,3,4])
squared = nums.map(lambda x: x*x).collect()
② filter:将函数作用在每个rdd元素,返回符合函数条件的rdd
pythonlines = lines.filter(lambda line: “Python” in line)
③ flatMap:将函数作用在迭代器rdd上,将所有迭代器的返回值塞到同一个迭代器中返回
lines = sc.parallelize([”hello world“,“hi”])
words = lines.flatMap(lambda line: line.split(" "))
words.first() #返回“hello”行动操作action
行动操作进行实际计算,得出结果返回到驱动器程序中或者并存储到外部存储(如HDFS)中:
①reduce:接收两个同类型元素并将计算结果返回
sum = rdd.reduce(lambda x,y: x+y)
② pythonlines.first()
③ rdd.take(2) #取rdd的2个值,以列表的形式返回[(‘Alice’,1),(‘Bob’,2)]
④pythonlines.collect() #取全部值,会把整个rdd拉取到driver上,慎用,最好用.first(), take(n)等取代
⑤ pythonlines.count() #计数持久化
Spark rdd惰性求值,每次调用行动操作时都会将前面的依赖重新计算一边,为了避免重复计算,可以将rdd持久化。
result = nums.map(lambda x: x*x)
result.persist() # 可以选择持久化级别:MEMORY_ONLY,MEMORY_ONLY_SER,MEMORY_AND_DISK,MEMORY_AND_DISK_SER,DISK_ONLY
print(result.count())
print(result.first())
result.unpersist()遍历dataframe形式的rdd
rdd是不可iterable的类型,不可以用for循环遍历
①rdd_result = rdd.map(function)
②rdd_result = rdd.mapPartitions(function)
def function(iter):
from multiprocessing.pool import ThreadPool*
results = []
def f (r ):
nonlocal results
results += [ something ]
# 如果r是row类型,即Row(id=' ',set=[]),可以取r['id'],r['set']等
# 如果r是tuple类型,即(Row(id=' ',set=[ ]),Row(id=' ',set=[ ])),可以取r[0],r[1],r[0]['id'],r[0]['set']等
with ThreadPool(8) as p:
p.map(f, [r for r in iter])
# r是rdd的一行
return results
map和mapPartition的区别:
对于有3个元素,1个分区的rdd:
map是对rdd中的每一个元素进行操作,操作一次的结果作为一个Row,操作3次;
mapPartitions则是对rdd中的每个分区的迭代器进行操作,操作一次的结果为3个Row,操作1次。
上例中,map的返回结果为[Row(_1=324, _2=[‘a’, ‘b’, ‘c’]), Row(_1=100, _2=[‘e’, ‘g’]), Row(_1=555, _2=[‘a’, ‘e’, ‘m’])]
mapPartitions的返回结果为[Row(id=324, set=[‘a’, ‘b’, ‘c’]), Row(id=100, set=[‘e’, ‘g’]), Row(id=555, set=[‘a’, ‘e’, ‘m’])]
向spark传递函数
python
- lambda函数
word = rdd.filter(lambda s : “error” in s) - 定义局部函数
#自定义query函数,rdd作为参数传入
def function(iter):
pass
#将返回结果转化为df
df = rdd.mapPartitions(query).toDF()
注意1:若在rdd函数中传递了某个对象的成员,spark会把成员所在整个对象都序列化发到工作节点上,传递的东西会比想象中大得多,如:
rdd.filter(lambda x: self.query in x) #会把整个self都保存到局部变量
query=self.query()
rdd.filter(lambda x: query in x)
注意2:在function中定义的print语句不会输出到主机终端,会在其他工作节点输出,因此不要在调用函数内部输出调试。
Scala
java
边栏推荐
猜你喜欢

Activiti工作流使用之Activiti-app的安装及流程创建

Huawei executives talk about the 35 year old crisis. How can programmers overcome the worry of age?

Data Lake: viewing data lake from data warehouse

QT style syntax definition access, can check the data, can be automatically generated

Notifier Nordic fire engine power supply maintenance and daily maintenance

Compare the advantages and disadvantages of RDB and AOF modes of redis

C1 -- vivado configuration vs code text editor environment 2022-07-21

Filter in MATLAB

Project deployment (simplified version)

Markdown common syntax records
随机推荐
Data Lake: introduction to Apache iceberg
面试必备之数据库专题
Single sign on - how to unify the expiration time of session between authentication server and client
C#的partial用法
Partial usage of C #
img标签设置height和width无效
Web server failed to start. Port 8080 was already in use.
[swift bug] Xcode prompt error running playground: failed to prepare for communication with playground
Huawei executives talk about the 35 year old crisis. How can programmers overcome the worry of age?
FFmpeg 音频编码
pycharm占用c盘
十年架构五年生活-01毕业之初
【无标题】
LearnOpenGL - Introduction
An accident caused by MySQL misoperation, and "high availability" is not working well
C1--Vivado配置VS Code文本编辑器环境2022-07-21
知识点回顾
8、曲面几何
Mysql database foundation
JDBC learning and simple encapsulation