当前位置:网站首页>Spark_DSL
Spark_DSL
2022-08-02 14:05:00 【大学生爱编程】
1.Spark-SQL
读取csv,json,jdbc的数据
完全兼容HQL,DSL
DateFrame:基于RDD的表结构,最终还是转化成RDD执行
//1 新版本的spark统一入口
val spark: SparkSession =SparkSession.builder()
.master("local")
.appName("sql")
.getOrCreate()
//2 读取数据构建DataFrame,DF相当于一张表
val linesDF=spark
.read
.format("csv") //指定读取数据的类型
.schema("lines STRING") //指定字段名和字段类型
.option("sep","\t") //指定分割符
.load("data/words.txt")
//3 将DF注册成一个视图,然后才能写sql
linesDF.createOrReplaceTempView("lines")
linesDF.printSchema() //打印表结构
//4 写sql 完全兼容HQL
val resultDF:DataFrame=spark.sql(
"""
|select word,count(1) as num from(
|select explode(split(line,',')) as word from
|lines) as t1
|group by word
|""".stripMargin)
resultDF.show() //show()的话只展示部分数据
//5 将数据保存到hdfs
resultDF
.write
.format("csv")
.option("sep","\t")
.mode(SaveMode.Overwrite)
.save("data/wc")
2.DSL示例
DSL写法: DSL必须在DF中写,从上往下写,代码的思想进行
//构建spark入口
val spark: SparkSession =SparkSession.builder()
.master("local")
.appName("sql")
.getOrCreate()
//读取数据构建DataFrame(相当于一张表)
val linesDF: DataFrame =spark
.read
.format("csv")
.schema("lines STRING")
.option("sep","\t")
.load("data/words.txt")
//DSL不用注册成视图,是类SQL语言,介于代码和SQL之间的一种写法
import org.apache.spark.sql.functions._
import spark.implicits._
//相当于直接对linesDF进行select 传入的是列对象 $"列名"
val resultDF: DataFrame =linesDF.select(explode(split($"lines",","))as "word")
.groupBy($"word")
.agg(count($"word") as "c") //统计单词数量
resultDF.show()
//保存数据
resultDF
.write
.format("csv")
.option("sep","\t")
.mode(SaveMode.Overwrite)
.save("data/wc1")
3.DSL解析json,csv文件
导入依赖:
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.79</version>
</dependency>
直接解析json格式的数据
创建spark sql环境
val spark: SparkSession =SparkSession
.builder()
.master("local")
.appName("dsl")
.config("spark.sql.shuffle.partitions",1)
.getOrCreate()
//读取一个json格式的文件,不需要指定分隔符了直接解析,文件类型,文件路径
val studentDF: DataFrame =spark.read
.format("json")
.load("data/students1.json")
//读取csv格式的文件
val scoreDF: DataFrame =spark
.read
.format("csv")
.option("sep", ",")
.schema("sid STRING,cid STRING,sco DOUBLE") //指定列名和类型
.load("data/score.txt")
1.printSchema() 打印表结构
root
|-- age: long (nullable = true)
|-- clazz: string (nullable = true)
|-- gender: string (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
2.studentDF.show(100) 默认20条数据
3.studentDF.show(false) 某些值太长,完整打印每一列的数据
4.DSL函数
1.选择数据,相当于RDD中的转换算子,返回值是DataFrame
2.不能使用聚合函数,需要在agg中使用聚合函数
studentDF.select("id","age").show(150) //直接抽出来两列展示
studentDF.selectExpr("name","age+1 as age1").show(50) //传一个表达式可以对列进行操作
+------+----+
| name|age1|
+------+----+
|施笑槐| 23|
|吕金鹏| 25|
|单乐蕊| 23|
3.导入隐式转换,使用列对象的方式进行处理
import spark.implicits._
studentDF.select($"id",$"age").show()
4.导入spark函数,使用列对象的方法
import org.apache.spark.sql.functions._
studentDF.select($"id",substring($"clazz",0,2) as "cc").show()
5.where过滤数据
studentDF.where("gender='女' and age=23").show()
+---+--------+------+----------+------+
|age| clazz|gender| id| name|
+---+--------+------+----------+------+
| 23|文科六班| 女|1500100007|尚孤风|
| 23|文科一班| 女|1500100016|潘访烟|
| 23|理科二班| 女|1500100052|居初兰|
6.分组聚合 groupBy() 聚合函数写在agg()中
分组和聚合要一起使用,结果中只包含分组字段和聚合字段
studentDF.groupBy($"clazz")
.agg(count($"clazz") as "num",round(avg($"age")) as "avgAge") //count里传字段
.show(50)
7.排序 order by()
统计班级人数并且降序排列
studentDF
.groupBy($"clazz")
.agg(count($"clazz") as "num")
.orderBy($"num".desc)
.show()
8.表关联 join
val joinDF: DataFrame =studentDF.join(scoreDF,$"id"===$"sid","inner")
9. withColumn("新列名",row_number() over Window.partitionBy(...).orderBy(...))
/**
* 统计每个班级前十的同学
* 先算每个同学的总分
* withColumn 在DF的基础上增加新的列 需要导包
*/
import org.apache.spark.sql.expressions.Window
joinDF
.groupBy($"id",$"clazz")
.agg(sum($"sco") as "sumSco")
.withColumn("row",row_number() over Window.partitionBy($"clazz").orderBy($"sumSco".desc))
.where($"row"<=10)
.show()
4.DataSource
4.1 csv:需要手动指定列名和类型
val spark: SparkSession =SparkSession
.builder()
.master("local")
.appName("sql") .config("spark.sql.shuffle.partitions",1)
.getOrCreate()
val csvDF: DataFrame =spark
.read
.format("csv")
.schema("id STRING,name STRING,age INT,gender STRING,clazz STRING")
.option("sep",",")
.load("data/students.txt")
csvDF.show()
import spark.implicits._
import org.apache.spark.sql.functions._
//求每个班级人数并且保存文件内
csvDF
.groupBy($"clazz")
.agg(count($"clazz") as "num")
.write
.format("csv")
.option("sep",",")
.mode(SaveMode.Overwrite)
.save("data/clazzNum1")
4.2 json parquet 格式读取存储
/**
* 读取json格式的数据,解析是不需要指定列名
* 格式化代码,每一行都要是json格式
* 有列名,存储空间变大,少某一个字段对其他没有影响
*/
val jsonDF: DataFrame =spark
.read
.format("json")
.load("data/students1.json")
//统计性别人数,将数据保存为json格式
jsonDF
.groupBy($"gender")
.agg(count($"gender") as "g")
.write
.format("json")
.mode(SaveMode.Overwrite)
.save("data/gender_num")
---------------------------------------------------------------
//上面数据保存为parquet格式 (带表结构的压缩格式 压缩比取决于信息熵)
//认为不可读,时间换空间
jsonDF
.write
.format("Parquet")
.mode(SaveMode.Overwrite)
.save("data/students")
//读取parquet格式数据,自带表结构不需要手动指定列
val parquetDF: DataFrame =spark
.read
.format("parquet")
.load("data/students")
parquetDF.printSchema()
parquetDF.show(30)
4.3 从JDBC中读取数据
//引入MySQL依赖,指定数据格式、驱动、数据库数据表、用户名密码
val jdbcDF: DataFrame = spark
.read
.format("jdbc")
.option("url", "jdbc:mysql://master:3306")
.option("dbtable", "bigdata.students")
.option("user", "root")
.option("password", "123456")
.load()
5.RDD和DF转换
//spark入口
val spark: SparkSession =SparkSession
.builder()
.appName("rdd")
.master("local")
.getOrCreate()
val sc: SparkContext =spark.sparkContext //获取SparkContext对象
//取出数据转化成元组格式
val studentsRDD: RDD[(String, String, String, String, String)] =sc.textFile("data/students.txt")
.map(line=>line.split(","))
.map{
case Array(id:String,name:String,age:String,gender:String,clazz:String)=>
(id,name,age,gender,clazz)
}
//导入隐式转换,调用toDF方法
import spark.implicits._
val studentsDF: DataFrame =studentsRDD.toDF("id","name","age","gender","clazz")
studentsDF.show(30)
DDF转RDD
//saprksession环境
val spark: SparkSession = SparkSession
.builder()
.appName("rdd")
.master("local")
.getOrCreate()
//读取文件
val studentDF: DataFrame = spark
.read
.format("json")
.load("data/students.json")
//DF格式转RDD
import spark.implicits._
val studentRDD: RDD[Row] = studentDF.rdd
//通过列名获取数据
val stuRDD: RDD[(String, String, Long, String, String)] = studentRDD.map((row: Row) => {
val id: String = row.getAs[String]("id")
val name: String = row.getAs[String]("name")
val age: Long = row.getAs[Long]("age")
val gender: String = row.getAs[String]("gender")
val clazz: String = row.getAs[String]("clazz")
(id, name, age, gender, clazz)
})
//模式匹配获取数据
val caseRDD: RDD[(String, String, Long, String, String)] = studentRDD.map {
//需要注意字段顺序
case Row(age: Long, clazz: String, gender: String, id: String, name: String) =>
(id, name, age, gender, clazz)
}
6.窗口函数
row_number
rank
sum
count
avg
lag
lead
分组聚合字段会变少,分区是增加一个字段,其他的保持不变
6.1 sum over 中排序的两种用法
//计算年级前十同学的信息
//分区内加order by,是累加的结果
val joinDF: DataFrame =studentDF.join(scoreDF,$"id"===$"sid")
joinDF
.withColumn("sumSco",sum($"sco") over Window.partitionBy($"id").orderBy($"sco"))
.show()
+----------+------+---+------+--------+----------+-------+-----+------+
| id| name|age|gender| clazz| sid| cid| sco|sumSco|
+----------+------+---+------+--------+----------+-------+-----+------+
|1500100001|施笑槐| 22| 女|文科六班|1500100001|1000003| 0.0| 0.0|
|1500100001|施笑槐| 22| 女|文科六班|1500100001|1000002| 5.0| 5.0|
|1500100001|施笑槐| 22| 女|文科六班|1500100001|1000004| 29.0| 34.0|
|1500100001|施笑槐| 22| 女|文科六班|1500100001|1000006| 52.0| 86.0|
//开窗后再进行排序
val joinDF: DataFrame =studentDF.join(scoreDF,$"id"===$"sid")
joinDF
.withColumn("sumSco",sum($"sco") over Window.partitionBy($"id"))
.orderBy($"sumSco".desc)
.limit(60)
--+ +----------+------+---+------+--------+----------+-------+-----+---
| id| name|age|gender| clazz| sid| cid| sco|sumSco|
+----------+------+---+------+--------+----------+-------+-----+------+
|1500100929|满慕易| 21| 女|理科三班|1500100929|1000001|144.0| 630.0|
|1500100929|满慕易| 21| 女|理科三班|1500100929|1000002|138.0| 630.0|
|1500100929|满慕易| 21| 女|理科三班|1500100929|1000003| 88.0| 630.0|
|1500100929|满慕易| 21| 女|理科三班|1500100929|1000007| 91.0| 630.0|
|1500100929|满慕易| 21| 女|理科三班|1500100929|1000008| 99.0| 630.0|
|1500100929|满慕易| 21| 女|理科三班|1500100929|1000009| 70.0| 630.0|
|1500100080|巫景彰| 21| 男|理科五班|1500100080|1000001|142.0| 628.0|
|1500100080|巫景彰| 21| 男|理科五班|1500100080|1000002|149.0| 628.0|
|1500100080|巫景彰| 21| 男|理科五班|1500100080|1000003|123.0| 628.0|
6.2 count
//统计每科都及格的学生 count orderBy的效果和row_number效果一样,对sid分区,对cid进行排序,对sid进行统计,累加最后的结果就是区内总人数
scoreDF
//关联科目表
.join(subjectDF, "cid")
//过滤不及格的分数
.where($"sco" >= $"ssco" * 0.6)
//统计每个学生几个的科目数
.withColumn("jige", count($"sid") over Window.partitionBy($"sid"))
//取出都及格的学生
.where($"jige" === 6)
//.show(100)
6.3 avg
//统计总分大于年级平均分的学生:(计算方式与开窗方式)
先计算总分,再根据文理科开窗,计算年级均分
joinDF
.withColumn("sumSco",sum($"sco") over Window.partitionBy($"id"))
.withColumn("avgSco",avg($"sumSco") over Window.partitionBy(substring($"clazz",0,2)))
.show(6000)
|1500100999|钟绮晴| 23| 女|文科五班|1500100999|1000004| 48.0| 371.0|374.00766283524905|
|1500100999|钟绮晴| 23| 女|文科五班|1500100999|1000005| 41.0| 371.0|374.00766283524905|
|1500100999|钟绮晴| 23| 女|文科五班|1500100999|1000006| 10.0| 371.0|374.00766283524905|
|1500100003|单乐蕊| 22| 女|理科六班|1500100003|1000001| 48.0| 359.0| 370.9769392033543|
|1500100003|单乐蕊| 22| 女|理科六班|1500100003|1000002|132.0| 359.0| 370.9769392033543|
|1500100003|单乐蕊| 22| 女|理科六班|1500100003|1000003| 41.0| 359.0| 370.9769392033543|
6.4 lag 取当前行的前面几行的那一条数据,必须分区排序
joinDF
.groupBy($"id",$"clazz") //分组聚合只包含出现的列,所以此处对班级进行一次分组
.agg(sum($"sco" ) as "sumSco")
.withColumn("rm",row_number() over Window.partitionBy($"clazz").orderBy($"sumSco".desc)) //注意设置降序的位置
.show(1000)
+----------+--------+------+---+
| id| clazz|sumSco| rm|
+----------+--------+------+---+
|1500100308|文科一班| 628.0| 1|
|1500100875|文科一班| 595.0| 2|
|1500100943|文科一班| 580.0| 3|
|1500100871|文科一班| 569.0| 4|
//分组聚合只包含出现的列,所以此处对班级进行一次分组
//**lag函数必须进行分区排序**
joinDF
.groupBy($"id",$"clazz")
.agg(sum($"sco" ) as "sumSco")
.withColumn("rm",row_number() over Window.partitionBy($"clazz").orderBy($"sumSco".desc))
.withColumn("headSumSco",lag($"sumSco",1,750) over Window.partitionBy($"clazz").orderBy($"sumSco".desc))
.withColumn("cha",$"sumSco"-$"headSumSco")
.show(1000)
+----------+--------+------+---+----------+------+
| id| clazz|sumSco| rm|headSumSco| cha|
+----------+--------+------+---+----------+------+
|1500100308|文科一班| 628.0| 1| 750.0|-122.0|
|1500100875|文科一班| 595.0| 2| 628.0| -33.0|
|1500100943|文科一班| 580.0| 3| 595.0| -15.0|
|1500100871|文科一班| 569.0| 4| 580.0| -11.0|
边栏推荐
猜你喜欢
C语言日记 3 常量
初识c语言指针
利用c语言实现对键盘输入的一串字符的各类字符的计数
Creating seven NiuYun Flask project complete and let cloud
C语言sizeof和strlen的区别
St. Regis Takeaway Notes - Lecture 05 Getting Started with Redis
无人驾驶综述:等级划分
字符串的小知识
Linux: CentOS 7 install MySQL5.7
Using the cloud GPU + pycharm training model to realize automatic background run programs, save training results, the server automatically power off
随机推荐
yolov5,yolov4,yolov3 mess
十分钟带你入门Nodejs
verilog学习|《Verilog数字系统设计教程》夏宇闻 第三版思考题答案(第十二章)
yolov5 improvement (1) Add attention focus mechanism
8580 Merge linked list
华为防火墙IPS
Web Design (Beginners) [easy to understand]
Kubernetes架构和组件
MySQL知识总结 (八) InnoDB的MVCC实现机制
标签加id 和 加号 两个文本框 和一个var 赋值
C语言初级—水仙花数
verilog学习|《Verilog数字系统设计教程》夏宇闻 第三版思考题答案(第十三章)
Kubernetes核心概念
宝塔搭建DM企业建站系统源码实测
C语言sizeof和strlen的区别
宝塔搭建PHP自适应懒人网址导航源码实测
Unit 13 Mixing in View Base Classes
MySQL知识总结 (四) 事务
存储系统Cache(知识点+例题)
鼠标右键菜单栏太长如何减少