当前位置:网站首页>spark_ sql
spark_ sql
2022-06-12 09:55:00 【A pig without food】
[ Failed to transfer the external chain picture , The origin station may have anti-theft chain mechanism , It is suggested to save the pictures and upload them directly (img-bBx0fy0y-1611723184476)(C:\Users\14112\AppData\Roaming\Typora\typora-user-images\image-20210126202837264.png)]
$“age”+1,'age+1,column(“age”)+1,col(“age”)
in $,`,col, and colum Equivalent
DSL
3.4.1.1 First step : Create a text file
stay linux Of /export/servers/ Create text file under path
cd /export/servers/
vim person.txt
write file :
1 zhangsan 20
2 lisi 29
3 wangwu 25
4 zhaoliu 30
5 tianqi 35
6 kobe 40
3.4.1.2 The second step : Definition RDD
Use spark-shell Get into spark client
cd /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/
bin/spark-shell --master local[2]
val lineRDD = sc.textFile(“file:///export/servers/person.txt”).map(x => x.split(" "))
3.4.1.3 The third step : Definition case class Sample class
case class Person(id:Int,name:String,age:Int)
3.4.1.4 Step four : relation RDD And case class
val personRDD = lineRDD.map(x => Person(x(0).toInt,x(1),x(2).toInt))
3.4.1.5 Step five : take RDD convert to DF
val personDF = personRDD.toDF
Be careful :DF It can also be converted into RDD, Use it directly DF call rdd The method can
scala> personDF.rdd.collect
res38: Array[org.apache.spark.sql.Row] = Array([1,zhangsan,20], [2,lisi,29], [3,wangwu,25], [4,zhaoliu,30], [5,tianqi,35], [6,kobe,40])
[ Failed to transfer the external chain picture , The origin station may have anti-theft chain mechanism , It is suggested to save the pictures and upload them directly (img-PUWmnwKv-1611723184478)(C:\Users\14112\AppData\Roaming\Typora\typora-user-images\image-20210127125124494.png)]
DataSet:DataFrame+ Generic
problem : If you need word segmentation now ,String Type of data can be segmented , If DataFrame Don't know generics , I don't know. Person,String
Case study : Use two kinds of API Realization WordCount
demand :
step :
- 1- Get ready SparkSession Environmental Science
- 2- Reading data
- 3- Flattening data FlatMap
- 4- Executive statistics
package cn.itcast.sparksql
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.functions._
/**
* DESC:
* 1- Get ready SparkSession Environmental Science
* 2- Reading data
* 3- Flattening data FlatMap
* 4- Executive statistics
*/
object _10DSLSQLWordCount {
def main(args: Array[String]): Unit = {
//1- Get ready SparkSession Environmental Science
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
val spark: SparkSession = SparkSession.builder()
.config(conf)
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
//2- Reading data
//val valueRDD: RDD[String] = sc.textFile("data/input/words.txt")
val df: DataFrame = spark.read.text("data/input/words.txt")
//spark.read.textFile("data/input/words.txt")
//3- Flattening data FlatMap
val dataDS: Dataset[String] = df.as[String]
val strDS: Dataset[String] = dataDS.flatMap(x => x.split("\\s+"))
//4- Executive statistics '---------DSL
println("======================DSL-=========================")
strDS.printSchema()
//root
//|-- value: string (nullable = true)
strDS.groupBy($"value")
.count()
.orderBy($"count".desc)
.limit(5)
.show()
//
strDS.groupBy($"value")
.count()
.sort($"count".desc)
.limit(5)
.show()
//
strDS.groupBy($"value")
.agg(count("value").alias("count"))
.sort($"count".desc)
.limit(5)
.show()
//
strDS.groupBy($"value")
//Map(
//*"age" -> "max",
//*"expense" -> "sum"
//*)
.agg(Map("value" -> "count"))
.sort($"count(value)".desc)
.limit(5)
.show()
//SQL
println("======================SQL-=========================")
strDS.createOrReplaceTempView("table_view")
spark.sql(
"""
|select value,count(value) as count
|from table_view
|group by value
|order by count desc
|limit 5
|""".stripMargin).show()
spark.stop()
}
}
RDD-DF-DS Installation and replacement between
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
- DESC:
*/
case class People2(id: Int, name: String, age: Int)
object 09RddDFDS {
def main(args: Array[String]): Unit = {
//1- Get ready SparkSession
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster(“local[*]”)
val spark: SparkSession = SparkSession.builder()
.config(conf)
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel(“WARN”)
//2- Read data file
val rdd1: RDD[String] = sc.textFile(“data/input/sql/people1.txt”)
//3- Convert data to DataFrame
val peopleRDD: RDD[People2] = rdd1.map(.split("\s+")).map(filed => People2(filed(0).toInt, filed(1), filed(2).toInt))
// Here it is RDD The switch to DF Implicit conversion is required in the process
import spark.implicits._
//1- from RDD Turn into DF----4 Kind of
val peopleDF: DataFrame = peopleRDD.toDF()
//2- from DF turn RDD
peopleDF.rdd.collect().foreach(println())
//[1,zhangsan,20]
//[2,lisi,29]
//3-RDD turn DS
val dataDS: Dataset[People2] = peopleRDD.toDS()
//±–±-------±–+
//| id| name|age|
//±–±-------±–+
//| 1|zhangsan| 20|
//| 2| lisi| 29|
//| 3| wangwu| 25|
//4-DS turn RDD
dataDS.rdd.collect().foreach(println())
//People2(1,zhangsan,20)
//People2(2,lisi,29)
//People2(3,wangwu,25)
//5-DF-DS
val peopleDS: Dataset[People2] = peopleDF.as[People2]
peopleDS.show()
//6-DS-DF
peopleDS.toDF().show()
//4- close SparkSession
spark.close()
}
}
UDF Programming
The essence is not difficult : That's a function , Implement a function : One-to-one relationship , Be similar to map effect
Steps used :
adopt session establish udf.register( The name of the function , Every variable in the function : The type of variable )
[ Failed to transfer the external chain picture , The origin station may have anti-theft chain mechanism , It is suggested to save the pictures and upload them directly (img-0WNH7j5v-1611723184480)(C:\Users\14112\AppData\Roaming\Typora\typora-user-images\image-20210127123539258.png)]
How many variables are there Just choose which udf How many? , For example, if there are two variables, choose UDF2
spark.udf.register() This is a fixed format
// adopt new Write functions in the same way
spark.udf.register("smalltoUpper2", new UDF1[String,String] {
override def call(t1: String): String =
t1.toUpperCase()
},DataTypes.StringType)
// adopt lambam expression
//2 establish lamba Expression function
spark.udf.register("smalltoUpper1",(a1:String)=>{
a1.toUpperCase()
})
spark.udf.register("smalltoUpper2", new UDF1[String,String] {
override def call(t1: String): String =
t1.toUpperCase()
},DataTypes.StringType)
// adopt lambam expression
//2 establish lamba Expression function
spark.udf.register("smalltoUpper1",(a1:String)=>{
a1.toUpperCase()
})
边栏推荐
- MYSQL的最左匹配原則的原理講解
- [preview of the open class of Jishu] arm's strongest MCU core cortex-m85 processor helps the innovation of the Internet of things in an all-round way (there is a lottery)
- 2022 pole technology communication - the dispute over anmou technology is settled, and the cornerstone of the local semiconductor industry is more stable
- 7-4 network red dot punch in strategy (DFS)
- [cloud native | kubernetes] kubernetes networkpolicy
- [cloud native] establishment of Eureka service registration
- Using C language code to realize factory LCD RGB test program
- 小程序的介绍
- Access and traversal of string class objects
- Jetpack架构组件学习(3)——Activity Results API使用
猜你喜欢

MYSQL的最左匹配原则的原理讲解

Auto. JS learning note 4: after autojs is packaged, most Huawei and other big brand mobile phones cannot be installed? This problem can be solved by using the simulator to remotely sign and package in

基于SSM实现水果商城批发平台

在线电路仿真以及开源电子硬件设计介绍

使用Visual Studio 2017创建简单的窗口程序

QQ,微信能聊天都靠它(socket)?

【clickhouse专栏】基础数据类型说明

2026年中国软件定义存储市场容量将接近45.1亿美元

Ceph性能优化与增强

Implementation of fruit mall wholesale platform based on SSM
随机推荐
redis学习记录:字典(dict)源码分析
Record and store user video playback history selection
哈希表的理论讲解
总有一根阴线(上影线)会阻止多军前进的脚步,总有一个阳线(下影线)会阻挡空军肆虐的轰炸
Transport layer protocol -- TCP protocol
markdown_ Picture side by side scheme
MYSQL的最左匹配原则的原理讲解
What are the functions of resistance? (super full)
Autojs学习笔记6:text(txt).findOne()切换app时会报错,最后解决实现效果,切换任何app直到脚本找到指定的txt文字的控件进行点击。
MySQL索引常见问题
Storage R & D Engineer Recruitment
【系统分析师之路】第十八章 复盘系统安全分析与设计
Li Yang, a scientific and technological innovator and CIO of the world's top 500 group: the success of digital transformation depends on people. Decision makers should always focus on "firewood"
5 most common CEPH failure scenarios
Briefly introduce the difference between threads and processes
Differences among list, set and map
《保护我们的数字遗产:DNA数据存储》白皮书发布
2022 极术通讯-安谋科技迎来发展新机遇
【云原生】具体指什么呢---此文和大伙儿分享答案
[path of system analyst] Chapter 18 security analysis and design of double disk system