当前位置:网站首页>Persistence / caching of RDD in spark

Persistence / caching of RDD in spark

2022-07-06 21:44:00 Big data Xiaochen

  • Yes RDD In the process of conversion , If you want to treat someone in the middle RDD Multiple reuse , For example, yes. RDD Output multiple times , So by default, every time Action Will trigger a job, Every job Will load data from scratch and calculate , A waste of time . If the logical RDDN Data persistence to specific storage media, such as 【 Memory 】、【 disk 】、【 Out of heap memory 】, Then only calculate this once RDD, Improve program performance

  • RDD call cache/persist All are 【lazy 】 operator , Need one 【Action】 After operator trigger ,( Usually use count To trigger ).RDD Data will be persisted to memory or disk . Later operations , Will get data directly from memory or disk .

  • below 3 Both are only persistent to 【 Memory 】

    • rdd.persist()

    • rdd.cache()

    • rdd.persist(StorageLevel.MEMORY_ONLY)

  • More storage levels

    • rdd.persist(level : StorageLevel)

    • StorageLevel

 

    • _ONLY: Just save the data to 【 Memory 】 or 【 disk 】

    • _2: Backup when data is persistent 【2】 Share

    • _SER: take RDD The elements of 【 serialize 】, Compress , Convenient network transmission .

    • MEMORY_AND_DISK_SER_2 : Put the data 【 serialize 】 Save to memory , If 【 Memory 】 Not enough , Continue to overflow 【 disk 】, And backup 2 Time .

  • Release cache / Persistence

    • When caching RDD When data is no longer used , Consider releasing resources

    • rdd.unpersit()

       

# -*- coding:utf-8 -*-
# Desc:This is Code Desc

import os
import json
import re
import time

from pyspark import SparkConf, SparkContext

os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/bin/python3.8"
#  When multiple versions exist , Failure to specify is likely to result in an error 
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON

if __name__ == '__main__':
    #1- establish SparkContext Context object 
    conf=SparkConf().setAppName("2_rdd_from_external").setMaster("local[*]")
    sc=SparkContext(conf=conf)
    #2 Read the file 
    rdd=sc.textFile("file:///export/pyworkspace/pyspark_sz26/pyspark-sparkcore-3.1.2/data/apache.log")
    #3 Wash and extract ip,\\s+ Represents general white space characters , such as tab key , Space , Line break ,\r\t
    rdd2=rdd.map(lambda line : re.split('\\s+',line)[0])
    # Yes rdd2 Cache persistence 
    rdd2.cache()
    pv = rdd2.count()
    #4 Calculation pv, Print 

    pv=rdd2.count()
    print('pv=',pv)
    #5 Calculation uv, Print 
    uv=rdd2.distinct().count()
    print('uv=',uv)
    time.sleep(600)
    sc.stop()



When to use cache/persist ?

  • When RDD By 【 many 】 Secondary multiplexing

  • When RDD The previous calculation process is very 【 Complex and expensive 】( Such as through 【JDBC】 Come to ), And it has been used many times .

原网站

版权声明
本文为[Big data Xiaochen]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202131122070732.html