当前位置:网站首页>spark(standalone,yarn)
spark(standalone,yarn)
2022-08-02 14:05:00 【大学生爱编程】
1.安装及设置
1、上传解压,配置环境变量 配置bin目录
tar -xvf spark-2.4.5-bin-hadoop2.7.tgz
mv spark-2.4.5-bin-hadoop2.7 spark-2.4.5
配置环境变量
vim /etc/profile
2、
-修改配置文件 conf/ cp spark-env.sh.template spark-env.sh
增加配置
export SPARK_MASTER_IP=master
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_CORES=2
export SPARK_WORKER_INSTANCES=1
export SPARK_WORKER_MEMORY=2g
export JAVA_HOME=/usr/local/soft/jdk1.8.0_171
-增加从节点配置 删除localhost
cp slaves.template slaves
node1
node2
增加从节点
3、复制到其它节点
scp -r spark-2.4.5 node1:pwd
scp -r spark-2.4.5 node2:pwd
4、在主节点执行启动命令,启动命令和Hadoop是一样的,所以在sbin目录下启动
启动集群,在master中执行
./sbin/start-all.sh
5、 http://master:8080/ 访问spark ui
相关依赖
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.12</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.12</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.5</version>
</dependency>
</dependencies>
打包编译pom文件
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- Scala Compiler -->
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
2.单机
**clint模式**
日志在本地输出,一般用于上线前测试(bin/下执行)
需要进入到spark-examples_2.11-2.4.5.jar 包所在的目录下执行
cd /usr/local/soft/spark-2.4.5/examples/jars
算圆周率:
spark-submit
--class org.apache.spark.examples.SparkPi
--master spark://master:7077
--executor-memory 512m
--total-executor-cores 1
spark-examples_2.11-2.4.5.jar 100
**cluster模式**
上线使用,不会再本地打印日志,但是要注释掉local,注释掉local后默认的读取文件路径时hdfs上的
在网页可以查看结果
spark-submit
--class org.apache.spark.examples.SparkPi
--master spark://master:7077
--executor-memory 512M
--total-executor-cores 1
--deploy-mode cluster
spark-examples_2.11-2.4.5.jar 100
项目打包上传到服务器
spark-submit --class 主类名 --master spark://master:7077 包名
3.整合Yarn(yarn的资源运行spark任务)
**停止spark集群** 在spark sbin目录下执行 ./stop-all.sh
spark整合yarn只需要在一个节点整合, **可以**删除node1 和node2中所有的spark 文件
1、增加hadoop 配置文件地址
vim spark-env.sh
增加export HADOOP_CONF_DIR=/usr/local/soft/hadoop-2.7.6/etc/hadoop
2、往yarn提交任务需要增加两个配置 yarn-site.xml(/usr/local/soft/hadoop-2.7.6/etc/hadoop/yarn-site.xml)
先关闭yarn stop-all.sh
cd /usr/local/soft/hadoop-2.7.6/etc/hadoop
vim yarn-site.xml
增加配置
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
4、同步到其他节点,重启yarn
scp -r yarn-site.xml node1:`pwd`
scp -r yarn-site.xml node2:`pwd`
启动yarn
clint模式 日志本地输出,用于上线前测试或者用于一次性任务,clint提交大量任务会导致网卡
spark-submit
--class org.apache.spark.examples.SparkPi
--master yarn-client
spark-examples_2.11-2.4.5.jar 100
java API删除hdfs文件
在代码中可以在存文件之前手动删除一次已存在的路径
val configuration=new Configuration()
val filesystem:FileSystem=FileSystem.get(configuration)
if (fileSystem.exists(new Path("路径"))){
fileSystem.delete(new Path("路径"), true)
}
cluster模式 不在本地打印日志,减少io,Driver随机在一台nodeManager中启动,一般用于上线前
spark-submit
--class org.apache.spark.examples.SparkPi
--master yarn-cluster
spark-examples_2.11-2.4.5.jar 100
获取yarn程序执行日志 执行成功之后才能获取到
yarn logs -applicationId application_1657871301549_0005(application的id)
端口号:
spark
master:8080
hdfs webui
http://node1:50070
yarn ui
http://node1:8088
4.任务进程介绍
Executor:执行器,task在其中执行,缓存数据
Task:自己写的spark算子的代码逻辑封装的线程对象
DriverProgram:新建SparkContext程序,spark的控制程序,负责将task发送到executor中去执行
executor的数量,内存,和core都可以手动设置:
--num-executors 1
--executor-cores 1 ( 其数量决定同时可以执行的task的数量)
--executor-memory 1G
5.资源调度与任务调度
5.1资源调度(两张图)
spark和mr的区别:
1.都是大数据计算引擎,spark可以将数据缓存在内存中进行计算
2.spark:shuffle的结果流经RDD中,可以不落地的
mr:前一次shuffle的结果落地后才能开启下一次计算
3.可以使用Scala语言的特性,简洁,函数式编程
4.spark是粗粒度的资源调度,mr是细粒度的
spark与mr资源申请的区别:
spark是粗粒度的资源调度,在任务执行之前将所需要的全部资源申请下来,task在执行的时候不需要单独申请资源
task启动快所以job任务执行也快,但是spark需要等待到最后一个task执行完成才会释放资源,会占用额外的资源,导致浪费
mr是细粒度的资源申请,在每一个task执行之前单独去yarn申请资源,执行完成就释放资源
task启动慢所以job执行也慢,但不会占用额外资源
5.2任务调度流程 执行task
1.yarn-clint申请资源流程:
clint本地启动driver程序,向yarn的resourceManager申请资源启动applicationMaster,rm随机分配一个NodeManager节点启动am,nm上的am向rm申请启动Executor(可以多个),executor会反向注册给driver端
2.宽窄依赖,划分stage(切分任务),每个stage中是可以进行并行计算的task
3.yarn-clint任务调度流程:
当代码中遇到actions算子时会开始任务调度,构建DAG有向无环图,DAGSchedule根据宽窄依赖将DAG有向无环图切分为多个stage,将stage以taskSet的形式发送给taskScheduler,taskScheduler将taskSet中的task发送到Executor(线程池)中去执行,会尽量将task发送到数据所在的节点运行
1.重试机制:
如果task执行失败,TaskScheduler重试3次
如果还失败,DAGScheduler重试Stage4次
如果是shuffle文件找不到导致异常,taskScheduler不负责重试task,而是DAGSchedule重试上一个stage
2.推测执行:如果spark发现有的task执行很慢,会再发送一个task去竞争
yarn-clint模式任务调度是由Driver负责,资源调度是由ApplicationMaster负责
yarn-cluster模式资源调度和任务调度都是由ApplicationMastrt(和Driver不做区分)负责
6.累加器和小坑
6.1一般方法
- 算子内的代码会被封装成task(线程对象)发送到executor中执行
算子外的代码运行在driver端 - executor和driver属于不同的JVM,所以在算子内修改算子外的一个普通变量不会生效
- 当在算子内使用算子外的一个普通变量时,这个变量会以变量副本的形式封装到task中,将task发送到Executor中去,可以用但是不能修改原始变量
var count = 0
studentsRDD.foreach(stu => {
count += 1
println(count) //从1打印到1000
})
println(s"count:$count") //结果为0
6.2累加器和小坑
//1、定义累加器
val accumulator: LongAccumulator = sc.longAccumulator
val mapRDD: RDD[String] = studentsRDD.map(stu => {
accumulator.add(1) //2、对累累加器进行累加
stu
})
//3、再Driver端读取累加结果
println(s"accumulator:${accumulator.value}")
spark代码中RDD不能嵌套使用,在算子内不能使用RDD,RDD内的算子被封装成task发送给executor
studentsRDD.foreach(stu => {
studentsRDD.foreach(stu => {
println(stu)
})
})
在算子内不能使用SparkContext:task需要在网络中传输但是SparkContext不能序列化
studentsRDD.foreach(stu => {
val scoreRDD: RDD[String] = sc.textFile("/data/score.txt")
scoreRDD.foreach(println)
})
}
6.3广播变量
1.在算子内使用算子外的普通变量,普通变量会以变量副本的形式封装到task中,将task发送到executor中执行(如果RDD有10个分区,会产生10个task,汇总产生10个变量副本),会增加网络io
2.Executor的数量比task数量少的时候,会出现同一个executor重复使用同一个变量,在driver端定义广播变量,将driver端的广播变量广播到Executor端,然后在executor内获取广播变量,每个executor中只会有一个变量副本,副本数就会减少。
3.广播变量在spark中应用于map join,大表关联小表,不产生shuffle
广播变量定义到算子内:
// 读取学生表,以学号作为key构建一个map集合
val studentMap: Map[String, String] =Source.fromFile("data/students.txt")
.getLines().toList
.map(stu=>{
val id: String =stu.split(",")(0) //取出id
(id,stu)
}).toMap
val scoresRDD: RDD[String] =sc.textFile("data/score.txt",10)
//关联学生表和分数表,使用学号到学生表的map集合中查询学生的信息
val broadCastMap: Broadcast[Map[String, String]] =sc.broadcast(studentMap) //广播
val joinRDD: RDD[(String, String)] =scoresRDD.map(sco=>{
val id: String =sco.split(",")(0)
val map: Map[String, String] = broadCastMap.value //使用
val studentInfo: String =map.getOrElse(id,"默认值")
(id,studentInfo)
})
joinRDD.foreach(println)
6.4进程功能
Executor中都有BM:
BlockManager:负责管理Executor中磁盘和内存的数据
1.广播变量和累加器
2.Spark缓存数据
3.shuffle过程中的文件
ConnectionManager:负责创建网络连接
BlockTransforService:负责拉去数据
MemoryManager:管理内存数据
DiskManager:管理磁盘数据
Driver中:
BlockManagerMaster中:
ConnectionManager:负责创建网络连接
BlockTransforService:负责拉去数据
MemoryManager:管理内存数据
DiskManager:管理磁盘数据
边栏推荐
猜你喜欢
随机推荐
St. Regis Takeaway Notes - Lecture 05 Getting Started with Redis
Briefly write about the use and experience of PPOCRLabel
Deep learning framework pytorch rapid development and actual combat chapter3
【c】大学生在校学习c语言常见代码
C语言日记 7 输入/输出格式控制
浮点数的运算方法
c语言用scanf出错不安全的解决办法
宏定义问题记录day2
Implementation of redis distributed lock and watchdog
C语言初级—数组元素的增删改查
MySQL知识总结 (三) 索引
ABP,kendo后台接口,新增,查询
加减法运算及其溢出处理
MySQL知识总结 (五) 锁
C语言sizeof和strlen的区别
科创知识年度盛会,中国科创者大会8月6日首场开幕!
2022最新交规记忆重点
【Camera2】由Camera2 特性想到的有关MED(多场景设备互动)的场景Idea
What's wrong with running yolov5 (1) p, r, map are all 0
原码、补码、反码