当前位置:网站首页>Spark practice questions + answers
Spark practice questions + answers
2022-08-01 21:32:00 【xiexiexie0520】
Spark练习题
数据
Spark练习题的数据可以通过URL链接下载,均是免费!
然后再项目中新建一个data目录,将数据放在data目录下
- 学生信息表(data/students.txt):https://download.csdn.net/download/holiday0520/86268311
- 学生分数表(data/score.txt):https://download.csdn.net/download/holiday0520/86268316
- 学生科目表(data/subject.txt):https://download.csdn.net/download/holiday0520/86268317
依赖
pom.xml文件需要导入的依赖
<dependencies>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.12</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.12</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.4.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Scala Compiler -->
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
练习题目
1.统计年级排名前十学生各科的分数 [学号,姓名,班级,科目,分数]
2.统计总分大于年级平均分的学生 [学号,姓名,班级,总分]
3.统计每科都及格的学生 [学号,姓名,班级,科目,分数]
4.统计每个班级的前三名 [学号,姓名,班级,分数]
5.统计偏科最严重的前100名学生 [学号,姓名,班级,科目,分数]
答案
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{
SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.junit.{
Before, Test}
//练习
class Demo24_Student {
var stuRDD: RDD[Stu] = _
var scoRDD: RDD[Sco] = _
var subRDD: RDD[Sub] = _
var sc: SparkContext = _
def filterWithIdListAndPrint(ids: List[String]): Unit = {
//将ids进行广播
val broIds: Broadcast[List[String]] = sc.broadcast(ids)
//将stuRDD变成KV格式 Easy to associate
val studentKVRDD: RDD[(String, (String, String))] = stuRDD
.filter(stu => broIds.value.contains(stu.id))
.map(stu => (stu.id, (stu.name, stu.clazz)))
//将subRDD变成KV根式,Easy to associate
val subjectKVRDD: RDD[(String, String)] = subRDD.map(sub => (sub.subId, sub.subName))
//The student table is associated with the score table(先过滤再关联)
scoRDD
.filter(sco => broIds.value.contains(sco.id))
.map(sco => (sco.id, sco))
.join(studentKVRDD)
.map {
case (id: String, (sco: Sco, (name: String, clazz: String))) =>
(sco.subId, (id, name, clazz, sco.score))
}
.join(subjectKVRDD)
.map {
case (subId: String, ((id: String, name: String, clazz: String, score: Int), subName: String)) =>
s"$id,$name,$clazz,$subName,$score"
}
.sortBy(s => s.split(",")(0)) // 按idSort the results for easy viewing of the data
.foreach(println)
}
@Before
def init(): Unit = {
//Read three copies of the data and build the corresponding sample class objects,然后转换为RDD
sc = new SparkContext(
new SparkConf()
.setAppName("Demo_Student")
.setMaster("local")
)
stuRDD = sc.textFile("data/students.txt")
.map(line => {
val splits: Array[String] = line.split(",")
Stu(splits(0), splits(1), splits(2).toInt, splits(3), splits(4))
})
scoRDD = sc.textFile("data/score.txt")
.map(line => {
val splits: Array[String] = line.split(",")
Sco(splits(0), splits(1), splits(2).toInt)
})
subRDD = sc.textFile("data/subject.txt")
.map(line => {
val splits: Array[String] = line.split(",")
Sub(splits(0), splits(1), splits(2).toInt)
})
}
@Test
//测试
def printRDD(): Unit = {
stuRDD.take(10).foreach(println)
scoRDD.take(10).foreach(println)
subRDD.take(10).foreach(println)
}
@Test
//1.统计年级排名前十学生各科的分数 [学号,姓名,班级,科目,分数]
def question1(): Unit = {
//通过scoRDD计算学生总分,Rank in descending order,取前10的学生id,and associate with students、科目表
val top10Ids: List[String] = scoRDD
.map(sco => (sco.id, sco.score))
.reduceByKey(_ + _) //计算学生总分
.sortBy(kv => kv._2, ascending = false) //Sort by total score in descending order
.map(kv => kv._1) //Do not total points,直接取id
.take(10) //取前十名
.toList
/* //将top10Ids进行广播 val broadTop10Ids: Broadcast[List[String]] = sc.broadcast(top10Ids) //将stuRDD变成KV格式,Easy to associate val studentKVRDD: RDD[(String, (String, String))] = stuRDD .filter(stu => broadTop10Ids.value.contains(stu.id)) .map(stu => (stu.id, (stu.name, stu.clazz))) //将subRDD变成KV格式,Easy to associate val subjectKVRDD: RDD[(String, String)] = subRDD .map(sub => (sub.subId, sub.subName)) //The student table is associated with the score table(先过滤再关联) scoRDD .filter(sco => broadTop10Ids.value.contains(sco.id)) .map(sco => (sco.id, sco)) .join(studentKVRDD) .map { case (id: String, (sco: Sco, (name: String, clazz: String))) => (sco.subId, (id, name, clazz, sco.score)) } .join(subjectKVRDD) .map { case (subId: String, ((id: String, name: String, clazz: String, score: Int), subName: String)) => s"$id,$name,$clazz,$subName,$score" } .sortBy(s => s.split(",")(0)) //结果按id排序,方便查看数据 .foreach(println) */
//优化
filterWithIdListAndPrint(top10Ids)
}
@Test
//2.统计总分大于年级平均分的学生 [学号,姓名,班级,总分]
def question2(): Unit = {
//计算平均分,Then filter out students whose total score is greater than the average
val sumScoreRDD: RDD[(String, Int)] = scoRDD
.map(sco => (sco.id, sco.score))
.reduceByKey(_ + _)
//对多次使用的RDD进行缓存
sumScoreRDD.cache()
val sumScoreAndCnt: (Int, Int) = sumScoreRDD
.map(kv => (1, kv._2))
.aggregateByKey((0, 0))(
(u: (Int, Int), sumScore: Int) => (u._1 + sumScore, u._2 + 1),
(u1, u2) => (u1._1 + u2._1, u1._2 + u2._2)
)
.collect()(0)._2
//平均成绩
val avgSumScore: Double = sumScoreAndCnt._1.toDouble / sumScoreAndCnt._2
println(avgSumScore)
//Filter out students with a total score greater than the average
val passSumScoreRDD: RDD[(String, Int)] = sumScoreRDD
.filter(kv => kv._2 > avgSumScore)
passSumScoreRDD.cache()
//Take out students whose total score is greater than the averageid
val passIDs: List[String] = passSumScoreRDD
.map(kv => kv._1)
.collect()
.toList
val broadPassIDs: Broadcast[List[String]] = sc.broadcast(passIDs)
//将stuRDD变成KV格式,Easy to associate
val studentKVRDD: RDD[(String, (String, String))] = stuRDD
.filter(stu => broadPassIDs.value.contains(stu.id))
.map(stu => (stu.id, (stu.name, stu.clazz)))
passSumScoreRDD
.join(studentKVRDD)
.map {
case (id: String, (sumScore: Int, (name: String, clazz: String))) =>
s"$id,$name,$clazz,$sumScore"
}
.foreach(println)
//释放缓存
sumScoreRDD.unpersist()
passSumScoreRDD.unpersist()
}
@Test
//3.统计每科都及格的学生 [学号,姓名,班级,科目,分数]
def question3(): Unit = {
//Removal of each student's failing subject grade records
val subjectKVRDD: RDD[(String, Int)] = subRDD
.map(sub => (sub.subId, sub.subScore))
//Find students who have passed all subjectsid
val passAllSubIds: List[String] = scoRDD
.map(sco => (sco.subId, sco))
.join(subjectKVRDD)
.filter {
case (subId: String, (sco: Sco, subScore: Int)) =>
sco.score >= subScore * 0.6
}
.map {
case (subId: String, (sco: Sco, subScore: Int)) =>
(sco.id, 1)
}
.reduceByKey(_ + _) //统计每个学生及格的科目数量
.filter(kv => kv._2 == 6) //取出6Students with passing grades in all subjects
.map(_._1) //取出id
.collect()
.toList
filterWithIdListAndPrint(passAllSubIds)
}
@Test
//4.统计每个班级的前三名 [学号,姓名,班级,分数]
def question4(): Unit = {
//将stuRDD变成KV格式,Easy to associate
val studentKVRDD: RDD[(String, (String, String))] = stuRDD
.map(stu => (stu.id, (stu.name, stu.clazz)))
scoRDD
.map(sco => (sco.id, sco.score))
.reduceByKey(_ + _) //计算学生总分
.join(studentKVRDD)
.map {
case (id: String, (sumScore: Int, (name: String, clazz: String))) =>
(id, name, clazz, sumScore)
}
.groupBy(t4 => t4._3)
.flatMap {
case (clazz: String, t4: Iterable[(String, String, String, Int)]) =>
//Take the top three in the class
t4.toList.sortBy(t4 => -t4._4).take(3)
}
.map(t4 => s"${t4._1},${t4._2},${t4._3},${t4._4}")
.foreach(println)
}
@Test
//5.统计偏科最严重的前100名学生 [学号,姓名,班级,科目,分数]
def question5(): Unit = {
//Before finding the most serious partiality100名学生的id
//流程:归一化 -> 方差 -> 排序 -> 提取前100名
//Removal of each student's failing subject grade records
val subjectKVRDD: RDD[(String, Int)] = subRDD
.map(sub => (sub.subId, sub.subScore))
val top100ids: List[String] = scoRDD
.map(sco => (sco.subId, sco))
.join(subjectKVRDD)
.map {
case (subId: String, (sco: Sco, subScore: Int)) =>
(sco.id, sco.score * 100.0 / subScore)
}
.groupByKey()
.map(kv => {
val id: String = kv._1
val scores: Iterable[Double] = kv._2
val avgScore: Double = scores.sum / scores.size
val variance: Double = scores
.map(score => {
Math.pow(score - avgScore, 2)
}).sum / scores.size
(id, variance)
})
.sortBy(-_._2)
.map(_._1)
.take(100)
.toList
filterWithIdListAndPrint(top100ids)
}
}
case class Stu(id: String, name: String, age: Int, gender: String, clazz: String)
case class Sco(id: String, subId: String, score: Int)
case class Sub(subId: String, subName: String, subScore: Int)
边栏推荐
- C陷阱与缺陷 附录B Koenig和Moo夫妇访谈
- JSD - 2204 - Knife4j framework - processing - Day07 response results
- RecycleView的使用
- FusionGAN:A generative adversarial network for infrared and visible image fusion文章学习笔记
- R语言 pca主成分分析的主要方法
- C陷阱与缺陷 第7章 可移植性缺陷 7.10 首先释放,然后重新分配
- C专家编程 第1章 C:穿越时空的迷雾 1.3 标准I/O库和C预处理器
- Classification interface, Taobao classification details API
- groupByKey和reduceBykey的区别
- MySQL相关知识
猜你喜欢
随机推荐
C陷阱与缺陷 第7章 可移植性缺陷 7.8 随机数的大小
位运算简介
NFT的10种实际用途(NFT系统开发)
分类接口,淘宝分类详情 API
Review Set/Map basics with these two hooks
【中文树库标记---CTB】
51.【结构体初始化的两种方法】
【力扣】字符串相乘
JVM内存结构详解
基于php旅游网站管理系统获取(php毕业设计)
C Expert Programming Chapter 1 C: Through the Fog of Time and Space 1.2 Early Experience of C Language
如何封装 cookie/localStorage/sessionStorage hook?
测试开发人均年薪30w+?软件测试工程师如何进阶拿到高薪?
方舟开服需要知道的那些事
C陷阱与缺陷 第8章 建议与答案 8.2 答案
C专家编程 第1章 C:穿越时空的迷雾 1.2 C语言的早期体验
宝塔应用使用心得
C陷阱与缺陷 第5章 库函数 5.5 库函数signal
牛血清白蛋白刺槐豆胶壳聚糖缓释纳米微球/多西紫杉醇的纳米微球DTX-DHA-BSA-NPs
树莓派的信息显示小屏幕,显示时间、IP地址、CPU信息、内存信息(c语言),四线的i2c通信,0.96寸oled屏幕