当前位置:网站首页>Checkpoint of RDD in spark

Checkpoint of RDD in spark

2022-07-06 21:44:00 Big data Xiaochen

  • RDD Of checkpoint Mechanism , because cache、persist Supported persistent storage media memory and disk are easy 【 The loss of 】, and HDFS Yes 【 High availability 】、【 Fault tolerance 】 Characteristics of , So will RDD The data is stored in HDFS On .

  • therefore checkpoint It also has the function of persistence , There's more 【 Safe and reliable 】 The function of .  

  • Usage mode

    • First step :【sc.setCheckpointDir("hdfs://node1:8020/output/ckp/6_checkpoint")】 // Appoint HDFS The catalog of

      The second step :【rdd.checkpoint()】// Frequently used later RDD、 Or very important RDD

  • Case study

    • Directly on the basis of the previous case , With a little modification , First specify HDFS The catalog of , then persist or cache Replace with checkpoint.

# -*- coding:utf-8 -*-
# Desc:This is Code Desc

import os
import json
import re
import time

from pyspark import SparkConf, SparkContext, StorageLevel

os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/bin/python3.8"
#  When multiple versions exist , Failure to specify is likely to result in an error 
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON

if __name__ == '__main__':
    #1- establish SparkContext Context object 
    conf=SparkConf().setAppName("2_rdd_from_external").setMaster("local[*]")
    sc=SparkContext(conf=conf)
    sc.setCheckpointDir("hdfs://node1:8020/output/ckp/6_checkpoint")
    #2 Read the file 
    rdd=sc.textFile("file:///export/pyworkspace/pyspark_sz26/pyspark-sparkcore-3.1.2/data/apache.log")
    #3 Wash and extract ip,\\s+ Represents general white space characters , such as tab key , Space , Line break ,\r\t
    rdd2=rdd.map(lambda line : re.split('\\s+',line)[0])
    # Yes rdd2 checkpoint Persist to HDFS
    rdd2.checkpoint()
    pv = rdd2.count()
    #4 Calculation pv, Print 
    pv=rdd2.count()
    print('pv=',pv)
    #5 Calculation uv, Print 
    uv=rdd2.distinct().count()
    print('uv=',uv)
    time.sleep(600)
    sc.stop()



  result :

Persistence and Checkpoint The difference between

  • Location difference :persist or cache Will RDD The data is stored in 【 Memory 】、【 disk 】、【 Out of heap memory 】 in , however checkpoint Mechanism will RDD Data saved in 【HDFS】 On .

  • Life cycle : When Application completion of enforcement , Or call 【unpersist】, that persist or cache The data will be automatically cleared . however checkpoint Contents of the catalog 【 Can't 】 Automatic removal of , It needs to be cleared manually .

  • Consanguinity :persist or cache【 Meeting 】 Retain RDD By blood , If the data of a partition is lost , Then we can use 【 Dependent on kinship 】 Recalculate . however HDFS【 no need 】 Retain dependencies , Because even if the data of a partition is lost or damaged , Then it can also be used directly and conveniently HDFS In addition to 【2】 Copies .

原网站

版权声明
本文为[Big data Xiaochen]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202131122070691.html