当前位置:网站首页>spark_ sql
spark_ sql
2022-06-12 09:55:00 【A pig without food】
[ Failed to transfer the external chain picture , The origin station may have anti-theft chain mechanism , It is suggested to save the pictures and upload them directly (img-bBx0fy0y-1611723184476)(C:\Users\14112\AppData\Roaming\Typora\typora-user-images\image-20210126202837264.png)]
$“age”+1,'age+1,column(“age”)+1,col(“age”)
in $,`,col, and colum Equivalent
DSL
3.4.1.1 First step : Create a text file
stay linux Of /export/servers/ Create text file under path
cd /export/servers/
vim person.txt
write file :
1 zhangsan 20
2 lisi 29
3 wangwu 25
4 zhaoliu 30
5 tianqi 35
6 kobe 40
3.4.1.2 The second step : Definition RDD
Use spark-shell Get into spark client
cd /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/
bin/spark-shell --master local[2]
val lineRDD = sc.textFile(“file:///export/servers/person.txt”).map(x => x.split(" "))
3.4.1.3 The third step : Definition case class Sample class
case class Person(id:Int,name:String,age:Int)
3.4.1.4 Step four : relation RDD And case class
val personRDD = lineRDD.map(x => Person(x(0).toInt,x(1),x(2).toInt))
3.4.1.5 Step five : take RDD convert to DF
val personDF = personRDD.toDF
Be careful :DF It can also be converted into RDD, Use it directly DF call rdd The method can
scala> personDF.rdd.collect
res38: Array[org.apache.spark.sql.Row] = Array([1,zhangsan,20], [2,lisi,29], [3,wangwu,25], [4,zhaoliu,30], [5,tianqi,35], [6,kobe,40])
[ Failed to transfer the external chain picture , The origin station may have anti-theft chain mechanism , It is suggested to save the pictures and upload them directly (img-PUWmnwKv-1611723184478)(C:\Users\14112\AppData\Roaming\Typora\typora-user-images\image-20210127125124494.png)]
DataSet:DataFrame+ Generic
problem : If you need word segmentation now ,String Type of data can be segmented , If DataFrame Don't know generics , I don't know. Person,String
Case study : Use two kinds of API Realization WordCount
demand :
step :
- 1- Get ready SparkSession Environmental Science
- 2- Reading data
- 3- Flattening data FlatMap
- 4- Executive statistics
package cn.itcast.sparksql
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.functions._
/**
* DESC:
* 1- Get ready SparkSession Environmental Science
* 2- Reading data
* 3- Flattening data FlatMap
* 4- Executive statistics
*/
object _10DSLSQLWordCount {
def main(args: Array[String]): Unit = {
//1- Get ready SparkSession Environmental Science
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
val spark: SparkSession = SparkSession.builder()
.config(conf)
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
//2- Reading data
//val valueRDD: RDD[String] = sc.textFile("data/input/words.txt")
val df: DataFrame = spark.read.text("data/input/words.txt")
//spark.read.textFile("data/input/words.txt")
//3- Flattening data FlatMap
val dataDS: Dataset[String] = df.as[String]
val strDS: Dataset[String] = dataDS.flatMap(x => x.split("\\s+"))
//4- Executive statistics '---------DSL
println("======================DSL-=========================")
strDS.printSchema()
//root
//|-- value: string (nullable = true)
strDS.groupBy($"value")
.count()
.orderBy($"count".desc)
.limit(5)
.show()
//
strDS.groupBy($"value")
.count()
.sort($"count".desc)
.limit(5)
.show()
//
strDS.groupBy($"value")
.agg(count("value").alias("count"))
.sort($"count".desc)
.limit(5)
.show()
//
strDS.groupBy($"value")
//Map(
//*"age" -> "max",
//*"expense" -> "sum"
//*)
.agg(Map("value" -> "count"))
.sort($"count(value)".desc)
.limit(5)
.show()
//SQL
println("======================SQL-=========================")
strDS.createOrReplaceTempView("table_view")
spark.sql(
"""
|select value,count(value) as count
|from table_view
|group by value
|order by count desc
|limit 5
|""".stripMargin).show()
spark.stop()
}
}
RDD-DF-DS Installation and replacement between
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
- DESC:
*/
case class People2(id: Int, name: String, age: Int)
object 09RddDFDS {
def main(args: Array[String]): Unit = {
//1- Get ready SparkSession
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster(“local[*]”)
val spark: SparkSession = SparkSession.builder()
.config(conf)
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel(“WARN”)
//2- Read data file
val rdd1: RDD[String] = sc.textFile(“data/input/sql/people1.txt”)
//3- Convert data to DataFrame
val peopleRDD: RDD[People2] = rdd1.map(.split("\s+")).map(filed => People2(filed(0).toInt, filed(1), filed(2).toInt))
// Here it is RDD The switch to DF Implicit conversion is required in the process
import spark.implicits._
//1- from RDD Turn into DF----4 Kind of
val peopleDF: DataFrame = peopleRDD.toDF()
//2- from DF turn RDD
peopleDF.rdd.collect().foreach(println())
//[1,zhangsan,20]
//[2,lisi,29]
//3-RDD turn DS
val dataDS: Dataset[People2] = peopleRDD.toDS()
//±–±-------±–+
//| id| name|age|
//±–±-------±–+
//| 1|zhangsan| 20|
//| 2| lisi| 29|
//| 3| wangwu| 25|
//4-DS turn RDD
dataDS.rdd.collect().foreach(println())
//People2(1,zhangsan,20)
//People2(2,lisi,29)
//People2(3,wangwu,25)
//5-DF-DS
val peopleDS: Dataset[People2] = peopleDF.as[People2]
peopleDS.show()
//6-DS-DF
peopleDS.toDF().show()
//4- close SparkSession
spark.close()
}
}
UDF Programming
The essence is not difficult : That's a function , Implement a function : One-to-one relationship , Be similar to map effect
Steps used :
adopt session establish udf.register( The name of the function , Every variable in the function : The type of variable )
[ Failed to transfer the external chain picture , The origin station may have anti-theft chain mechanism , It is suggested to save the pictures and upload them directly (img-0WNH7j5v-1611723184480)(C:\Users\14112\AppData\Roaming\Typora\typora-user-images\image-20210127123539258.png)]
How many variables are there Just choose which udf How many? , For example, if there are two variables, choose UDF2
spark.udf.register() This is a fixed format
// adopt new Write functions in the same way
spark.udf.register("smalltoUpper2", new UDF1[String,String] {
override def call(t1: String): String =
t1.toUpperCase()
},DataTypes.StringType)
// adopt lambam expression
//2 establish lamba Expression function
spark.udf.register("smalltoUpper1",(a1:String)=>{
a1.toUpperCase()
})
spark.udf.register("smalltoUpper2", new UDF1[String,String] {
override def call(t1: String): String =
t1.toUpperCase()
},DataTypes.StringType)
// adopt lambam expression
//2 establish lamba Expression function
spark.udf.register("smalltoUpper1",(a1:String)=>{
a1.toUpperCase()
})
边栏推荐
- Strange error -- frame detected by contour detection, expansion corrosion, and reversal of opening and closing operation effect
- 《真北》读书笔记
- 卖疯了的临期产品:超低价、大混战与新希望
- Introduction to applet
- Clickhouse column basic data type description
- MySQL index
- Abstract classes and interfaces
- Japanese economic foam and house price foam
- 抽象类和接口
- MySQL索引常见问题
猜你喜欢

In 2026, the capacity of China's software defined storage market will be close to US $4.51 billion

Crazy temporary products: super low price, big scuffle and new hope

Principle analysis of mongodb storage engine wiredtiger

【云原生 | Kubernetes篇】Kubernetes 网络策略(NetworkPolicy)

【云原生】具体指什么呢---此文和大伙儿分享答案

Differences among list, set and map

极速搭建元宇宙画廊 #oncyber.io

001:数据湖是什么?

GNU EFI development environment settings

Create simple windowing programs using Visual Studio 2017
随机推荐
High quality and good books help guide apes and recommend "good summer books" with the four major publishers
C#入门系列(十二) -- 字符串
Auto.js学习笔记7:js文件调用另一个js文件里的函数和变量,解决调用失败的各种问题
SAP Hana error message sys_ XSA authentication failed SQLSTATE - 28000
Code implementation of hash table based on linear detection
How to do industry analysis
Ceph性能优化与增强
Jetpack architecture component learning (3) -- activity results API usage
传输层协议 ——— TCP协议
Auto.js学习笔记4:autojs打包后,大部分华为等大牌子手机无法安装?利用模拟器远程在autoPro里签名打包可以解决该问题。
The Dragon Boat Festival is in good health -- people are becoming more and more important in my heart
在线电路仿真以及开源电子硬件设计介绍
7-4 网红点打卡攻略(dfs)
优质好书助成长 猿辅导携四大出版社推荐“暑期好书”
7-4 network red dot punch in strategy (DFS)
There is always a negative line (upper shadow line) that will stop the advance of many armies, and there is always a positive line (lower shadow line) that will stop the rampant bombing of the air for
Combat tactics based on CEPH object storage
003:AWS认为什么是数据湖?
Using C language code to realize factory LCD RGB test program
卖疯了的临期产品:超低价、大混战与新希望