当前位置:网站首页>spark(standalone,yarn)

spark(standalone,yarn)

2022-08-02 14:05:00 大学生爱编程

1.安装及设置

1、上传解压,配置环境变量 配置bin目录
tar -xvf spark-2.4.5-bin-hadoop2.7.tgz
mv spark-2.4.5-bin-hadoop2.7 spark-2.4.5
配置环境变量
vim /etc/profile
2、
-修改配置文件 conf/ cp spark-env.sh.template spark-env.sh
增加配置
export SPARK_MASTER_IP=master
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_CORES=2
export SPARK_WORKER_INSTANCES=1
export SPARK_WORKER_MEMORY=2g
export JAVA_HOME=/usr/local/soft/jdk1.8.0_171

-增加从节点配置 删除localhost
cp slaves.template slaves
node1
node2

增加从节点

3、复制到其它节点
scp -r spark-2.4.5 node1:pwd
scp -r spark-2.4.5 node2:pwd

4、在主节点执行启动命令,启动命令和Hadoop是一样的,所以在sbin目录下启动
启动集群,在master中执行
./sbin/start-all.sh

5、 http://master:8080/ 访问spark ui

相关依赖
<dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.12</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.11.12</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.11.12</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>
    </dependencies>
打包编译pom文件
 <build>
        <plugins>
            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <!-- Scala Compiler -->
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>

2.单机

**clint模式**
日志在本地输出,一般用于上线前测试(bin/下执行)

需要进入到spark-examples_2.11-2.4.5.jar 包所在的目录下执行
cd /usr/local/soft/spark-2.4.5/examples/jars

算圆周率:
spark-submit
 --class org.apache.spark.examples.SparkPi
 --master spark://master:7077
 --executor-memory 512m
 --total-executor-cores 1 
 spark-examples_2.11-2.4.5.jar 100
**cluster模式**
上线使用,不会再本地打印日志,但是要注释掉local,注释掉local后默认的读取文件路径时hdfs上的
在网页可以查看结果
spark-submit
 --class org.apache.spark.examples.SparkPi          
 --master spark://master:7077
 --executor-memory 512M
 --total-executor-cores 1
 --deploy-mode cluster 
 spark-examples_2.11-2.4.5.jar 100
项目打包上传到服务器
spark-submit --class 主类名 --master spark://master:7077 包名

3.整合Yarn(yarn的资源运行spark任务)

**停止spark集群**       在spark sbin目录下执行   ./stop-all.sh

spark整合yarn只需要在一个节点整合, **可以**删除node1 和node2中所有的spark 文件

1、增加hadoop 配置文件地址   
  vim spark-env.sh
  增加export HADOOP_CONF_DIR=/usr/local/soft/hadoop-2.7.6/etc/hadoop

2、往yarn提交任务需要增加两个配置  yarn-site.xml(/usr/local/soft/hadoop-2.7.6/etc/hadoop/yarn-site.xml)
先关闭yarn    stop-all.sh
cd /usr/local/soft/hadoop-2.7.6/etc/hadoop

vim yarn-site.xml
增加配置
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>

4、同步到其他节点,重启yarn
scp -r yarn-site.xml node1:`pwd`
scp -r yarn-site.xml node2:`pwd`

启动yarn

clint模式 日志本地输出,用于上线前测试或者用于一次性任务,clint提交大量任务会导致网卡

spark-submit
--class org.apache.spark.examples.SparkPi
--master yarn-client
spark-examples_2.11-2.4.5.jar 100

java API删除hdfs文件
在代码中可以在存文件之前手动删除一次已存在的路径
val configuration=new Configuration()
val filesystem:FileSystem=FileSystem.get(configuration)
if (fileSystem.exists(new Path("路径"))){
fileSystem.delete(new Path("路径"),    true)
}

cluster模式 不在本地打印日志,减少io,Driver随机在一台nodeManager中启动,一般用于上线前

spark-submit 
--class org.apache.spark.examples.SparkPi 
--master yarn-cluster 
spark-examples_2.11-2.4.5.jar 100

获取yarn程序执行日志  执行成功之后才能获取到
yarn logs -applicationId application_1657871301549_0005(application的id)
端口号:

spark
master:8080

hdfs webui
http://node1:50070

yarn ui
http://node1:8088

4.任务进程介绍

Executor:执行器,task在其中执行,缓存数据 
Task:自己写的spark算子的代码逻辑封装的线程对象
DriverProgram:新建SparkContext程序,spark的控制程序,负责将task发送到executor中去执行
executor的数量,内存,和core都可以手动设置:
--num-executors 1
--executor-cores 1        ( 其数量决定同时可以执行的task的数量)
--executor-memory 1G

5.资源调度与任务调度

5.1资源调度(两张图)

spark和mr的区别:
1.都是大数据计算引擎,spark可以将数据缓存在内存中进行计算
2.spark:shuffle的结果流经RDD中,可以不落地的
mr:前一次shuffle的结果落地后才能开启下一次计算
3.可以使用Scala语言的特性,简洁,函数式编程
4.spark是粗粒度的资源调度,mr是细粒度的
spark与mr资源申请的区别:
spark是粗粒度的资源调度,在任务执行之前将所需要的全部资源申请下来,task在执行的时候不需要单独申请资源
task启动快所以job任务执行也快,但是spark需要等待到最后一个task执行完成才会释放资源,会占用额外的资源,导致浪费

mr是细粒度的资源申请,在每一个task执行之前单独去yarn申请资源,执行完成就释放资源
task启动慢所以job执行也慢,但不会占用额外资源

在这里插入图片描述
在这里插入图片描述

5.2任务调度流程 执行task

1.yarn-clint申请资源流程:
clint本地启动driver程序,向yarn的resourceManager申请资源启动applicationMaster,rm随机分配一个NodeManager节点启动am,nm上的am向rm申请启动Executor(可以多个),executor会反向注册给driver端
2.宽窄依赖,划分stage(切分任务),每个stage中是可以进行并行计算的task
3.yarn-clint任务调度流程:
当代码中遇到actions算子时会开始任务调度,构建DAG有向无环图,DAGSchedule根据宽窄依赖将DAG有向无环图切分为多个stage,将stage以taskSet的形式发送给taskScheduler,taskScheduler将taskSet中的task发送到Executor(线程池)中去执行,会尽量将task发送到数据所在的节点运行
1.重试机制:
如果task执行失败,TaskScheduler重试3次
如果还失败,DAGScheduler重试Stage4次
如果是shuffle文件找不到导致异常,taskScheduler不负责重试task,而是DAGSchedule重试上一个stage
2.推测执行:如果spark发现有的task执行很慢,会再发送一个task去竞争
yarn-clint模式任务调度是由Driver负责,资源调度是由ApplicationMaster负责
yarn-cluster模式资源调度和任务调度都是由ApplicationMastrt(和Driver不做区分)负责

6.累加器和小坑

6.1一般方法

  1. 算子内的代码会被封装成task(线程对象)发送到executor中执行
    算子外的代码运行在driver端
  2. executor和driver属于不同的JVM,所以在算子内修改算子外的一个普通变量不会生效
  3. 当在算子内使用算子外的一个普通变量时,这个变量会以变量副本的形式封装到task中,将task发送到Executor中去,可以用但是不能修改原始变量
 var count = 0
    studentsRDD.foreach(stu => {
    count += 1
    println(count)     //从1打印到1000
    })
println(s"count:$count")   //结果为0

6.2累加器和小坑

//1、定义累加器
    val accumulator: LongAccumulator = sc.longAccumulator
    val mapRDD: RDD[String] = studentsRDD.map(stu => {
      accumulator.add(1)     //2、对累累加器进行累加
      stu
    })
//3、再Driver端读取累加结果
    println(s"accumulator:${accumulator.value}")
spark代码中RDD不能嵌套使用,在算子内不能使用RDD,RDD内的算子被封装成task发送给executor

studentsRDD.foreach(stu => {
      studentsRDD.foreach(stu => {
        println(stu)
      })
    })
在算子内不能使用SparkContext:task需要在网络中传输但是SparkContext不能序列化

 studentsRDD.foreach(stu => {
      val scoreRDD: RDD[String] = sc.textFile("/data/score.txt")
      scoreRDD.foreach(println)
    })
  }

6.3广播变量

1.在算子内使用算子外的普通变量,普通变量会以变量副本的形式封装到task中,将task发送到executor中执行(如果RDD有10个分区,会产生10个task,汇总产生10个变量副本),会增加网络io
2.Executor的数量比task数量少的时候,会出现同一个executor重复使用同一个变量,在driver端定义广播变量,将driver端的广播变量广播到Executor端,然后在executor内获取广播变量,每个executor中只会有一个变量副本,副本数就会减少。
3.广播变量在spark中应用于map join,大表关联小表,不产生shuffle

广播变量定义到算子内:
// 读取学生表,以学号作为key构建一个map集合
     val studentMap: Map[String, String] =Source.fromFile("data/students.txt")
      .getLines().toList
      .map(stu=>{
        val id: String =stu.split(",")(0)   //取出id
        (id,stu)
      }).toMap
     val scoresRDD: RDD[String] =sc.textFile("data/score.txt",10)
//关联学生表和分数表,使用学号到学生表的map集合中查询学生的信息
     val broadCastMap: Broadcast[Map[String, String]] =sc.broadcast(studentMap)    //广播

      val joinRDD: RDD[(String, String)] =scoresRDD.map(sco=>{
      val id: String =sco.split(",")(0)
      val map: Map[String, String] = broadCastMap.value  //使用
      val studentInfo: String =map.getOrElse(id,"默认值")
      (id,studentInfo)
    })
joinRDD.foreach(println)

6.4进程功能

Executor中都有BM:

BlockManager:负责管理Executor中磁盘和内存的数据
1.广播变量和累加器
2.Spark缓存数据
3.shuffle过程中的文件
ConnectionManager:负责创建网络连接
BlockTransforService:负责拉去数据
MemoryManager:管理内存数据
DiskManager:管理磁盘数据

Driver中:

BlockManagerMaster中:
ConnectionManager:负责创建网络连接
BlockTransforService:负责拉去数据
MemoryManager:管理内存数据
DiskManager:管理磁盘数据

原网站

版权声明
本文为[大学生爱编程]所创,转载请带上原文链接,感谢
https://blog.csdn.net/qq_45409791/article/details/125808292