当前位置:网站首页>Spark accumulator and broadcast variables and beginners of sparksql
Spark accumulator and broadcast variables and beginners of sparksql
2022-07-06 17:40:00 【Bald Second Senior brother】
accumulator :
Catalog
Why use accumulators : stay spark If you do not define an accumulator in, when using the accumulation calculation method , because task Cannot change the original variable , After using the accumulator, the global variables can be rewritten , So accumulator is also called global writable variable
The illustration
Usage mode :
object Spark_leijia { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("word") val sc = new SparkContext(conf) val dataRDD = sc.makeRDD(List(1, 2, 3, 4), 2) // Accumulator shared variable // Create accumulator object val accumulator = sc.longAccumulator var sum=0; dataRDD.foreach{ case i =>{ // Execute the accumulation function accumulator.add(i) } } println("sum="+accumulator.value) } }
Be careful : Although accumulator is also called global write only variable , But he is also readable , It is only the operation of writing that is used at most
Broadcast variables
Why use broadcast variables : because spark There is an independent copy of the variable of , Then when sending Driver For each task Distribute a copy , stay task Too much time will consume a lot of system resources , So there are broadcast variables , Broadcast variables are also called global readable variables
The illustration
The use of broadcast variables
// Define broadcast variables val a = 3 val broadcast = sc.broadcast(a) // Restore val c = broadcast.value
Be careful : The broadcast variable is the same as its other name. Once defined, it can only be read and cannot be overwritten
SparkSQL
What is? SpqrkSQL:Spark Is a similar to HIve A kind of SQL It's all to replace MapReduce The birth of the , It provides 2 Programming abstractions :DataFrame and DataSet, And as distributed SQL The function of query engine
advantage : Easy integration , Unified data access , compatible Hive, Standard data connection
DataFrame
dataFrame It's a use RDD Distributed data containers written for the bottom , Compare with RDD For example, he added the structure of data , Similar to two-dimensional tables and hive similar .
dataFrame Use :
df At build time , Need to be right RDD The data from map To add data structures , Or directly read files with data structures, such as :json file
val conf = new SparkConf().setMaster("local[*]").setAppName("Spark_Sql1") val session = new SparkSession.Builder().config(conf).getOrCreate() // Read data and build dataFrame val frame = session.read.json("D:\\spark-class\\in\\user.json") // use sql Syntax to access data // take dataFram Convert to a table frame.createOrReplaceTempView("xxx"); session.sql("select * from xxx").show //frame.show() session.stop()
DataSet
DataSet And Df Very similar, but ,DS stay DF Data types are added on the basis of , In this way, data can be queried as an object .
DS Use DS The use of DF similar , However, you need to specify the type of data when creating , You can create it through a sample class
RDD,DF,DS Direct conversion
The conversion of these three is to add or remove data types through specific functions , Data structure for conversion
object Spark_SQL2 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("Spark_Sql1") val session = new SparkSession.Builder().config(conf).getOrCreate() // Implicit conversion rules need to be introduced before replacement // take RDD Convert to DF val listRDD = session.sparkContext.makeRDD(List((1, "zhangsan", 21), (2, "lisi", 24))) import session.implicits._ val dataFrame = listRDD.toDF("id", "name", "age") //DF Convert to DS val dataSet = dataFrame.as[User] //DS Convert to DF val dataFrame1 = dataSet.toDF() //DF Convert to RDD val rdd = dataFrame1.rdd rdd.foreach(row =>{ println(row.getString(1)) }) } /** * Sample class */ case class User(id:Int ,name:String,age:Int) }
def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("word") val session = new SparkSession.Builder().config(conf).getOrCreate() import session.implicits._ // establish RDD val listRDD = session.sparkContext.makeRDD(List((1, "zhangsan", 21), (2, "lisi", 24))) //RDD Convert to DS val userRDD = listRDD.map { case (id,name,age) => { User(id, name, age) } } userRDD.toDS().show() } /** * Sample class */ case class User(id:Int ,name:String,age:Int)
Custom aggregate functions
Weak type : No data type is specified
object Spark_SQL4 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("word") val session = new SparkSession.Builder().config(conf).getOrCreate() import session.implicits._ // establish RDD val listRDD = session.sparkContext.makeRDD(List((1, "zhangsan", 21), (2, "lisi", 24))) val udaf=new MyAgeAvg // Register aggregate function session.udf.register("avgAGE",udaf) // Use val rdd = listRDD.map { case (id, name, age) => { User(id, name, age) } } val ds = rdd.toDS().createOrReplaceTempView("user") session.sql("select avgAGE(age) from user ").show() } /** * Sample class */ case class User(id:Int ,name:String,age:Int) /** * Custom weak type aggregate function * 1. Inherit UserDefinedAggregateFunction * 2. Implementation method */ class MyAgeAvg extends UserDefinedAggregateFunction{ // Function input data structure override def inputSchema: StructType = { new StructType().add("age",LongType) } // Data structure during calculation override def bufferSchema: StructType = { new StructType().add("sum",LongType).add("count",LongType) } // The data type returned by the function override def dataType: DataType =DoubleType // Whether the function is stable override def deterministic: Boolean = true // Buffer initialization before calculation override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0)=0L buffer(1)=0L } // Update data override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { buffer(0)=buffer.getLong(0)+input.getLong(0) buffer(1)=buffer.getLong(1)+1 } // Combine multiple node buffers override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0) buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1) } // Calculation override def evaluate(buffer: Row): Any ={ buffer.getLong(0).toDouble/buffer.getLong(1) } } }
Strong type : Specify the data type
object Spark_SQL5 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("word") val session = new SparkSession.Builder().config(conf).getOrCreate() import session.implicits._ // establish RDD val listRDD = session.sparkContext.makeRDD(List((1, "zhangsan", 21), (2, "lisi", 24))) val udaf = new MyAgeAvgClass // Convert the aggregate function to the column of the query val avg = udaf.toColumn.name("avgAge") // Use val rdd = listRDD.map { case (id, name, age) => { User(id, name, age) } } val ds = rdd.toDS() ds.select(avg).show } /** * Sample class */ case class User(id: Int, name: String, age: Int) /** * Define data types */ case class AvgBuff(var sum:Int,var count:Int) /** * Custom strongly typed aggregate functions * 1.Aggregator, Set generics * 2. Implementation method */ class MyAgeAvgClass extends Aggregator[User,AvgBuff,Double] { // Buffer initialization override def zero: AvgBuff = {AvgBuff(0,0)} // Aggregate data override def reduce(b: AvgBuff, a: User): AvgBuff = { b.sum=b.sum+a.age b.count=b.count+1 b } // Merge operation of buffer override def merge(b1: AvgBuff, b2: AvgBuff): AvgBuff = { b1.sum=b1.sum+b2.sum b1.count=b1.count+b2.count b1 } // Calculation override def finish(reduction: AvgBuff): Double = { reduction.sum.toDouble/reduction.count } override def bufferEncoder: Encoder[AvgBuff] = Encoders.product override def outputEncoder: Encoder[Double] = Encoders.scalaDouble } }
SparkSession:
SparkSession It provides a unified entry point for users , He is used to replace SparkContext For convenience ,SparkSession Because Spark The operation of RDD For different API Need different Context use SparkSession To integrate them .
advantage :
Provide users with a unified entry point to use Spark Functions
Allow users to call DataFrame and Dataset relevant API To write a program
It reduces some concepts that users need to understand , It's easy to communicate with Spark Interact
And Spark You don't need to create a display when you interact SparkConf, SparkContext as well as SQlContext, These objects have been enclosed in SparkSession in
边栏推荐
- 04个人研发的产品及推广-数据推送工具
- The art of Engineering
- C#版Selenium操作Chrome全屏模式显示(F11)
- Huawei certified cloud computing hica
- 网络分层概念及基本知识
- Flink analysis (I): basic concept analysis
- C# WinForm中DataGridView单元格显示图片
- 05个人研发的产品及推广-数据同步工具
- Pyspark operator processing spatial data full parsing (4): let's talk about spatial operations first
- 02 personal developed products and promotion - SMS platform
猜你喜欢
关于Selenium启动Chrome浏览器闪退问题
虚拟机启动提示Probing EDD (edd=off to disable)错误
The most complete tcpdump and Wireshark packet capturing practice in the whole network
OpenCV中如何使用滚动条动态调整参数
04 products and promotion developed by individuals - data push tool
RepPoints:可形变卷积的进阶
Final review of information and network security (based on the key points given by the teacher)
Models used in data warehouse modeling and layered introduction
TCP connection is more than communicating with TCP protocol
集成开发管理平台
随机推荐
微信小程序获取手机号
MySQL Advanced (index, view, stored procedures, functions, Change password)
【MMdetection】一文解决安装问题
微信防撤回是怎么实现的?
[reverse intermediate] eager to try
[ciscn 2021 South China]rsa writeup
Connect to LAN MySQL
Virtual machine startup prompt probing EDD (edd=off to disable) error
Openharmony developer documentation open source project
[VNCTF 2022]ezmath wp
Concept and basic knowledge of network layering
【MySQL入门】第三话 · MySQL中常见的数据类型
【Elastic】Elastic缺少xpack无法创建模板 unknown setting index.lifecycle.name index.lifecycle.rollover_alias
About selenium starting Chrome browser flash back
[getting started with MySQL] fourth, explore operators in MySQL with Kiko
CTF逆向入门题——掷骰子
PyTorch 提取中间层特征?
BearPi-HM_ Nano development board "flower protector" case
Chrome prompts the solution of "your company management" (the startup page is bound to the company's official website and cannot be modified)
学 SQL 必须了解的 10 个高级概念