当前位置:网站首页>Spark accumulator and broadcast variables and beginners of sparksql
Spark accumulator and broadcast variables and beginners of sparksql
2022-07-06 17:40:00 【Bald Second Senior brother】
accumulator :
Catalog
Why use accumulators : stay spark If you do not define an accumulator in, when using the accumulation calculation method , because task Cannot change the original variable , After using the accumulator, the global variables can be rewritten , So accumulator is also called global writable variable
The illustration
Usage mode :
object Spark_leijia { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("word") val sc = new SparkContext(conf) val dataRDD = sc.makeRDD(List(1, 2, 3, 4), 2) // Accumulator shared variable // Create accumulator object val accumulator = sc.longAccumulator var sum=0; dataRDD.foreach{ case i =>{ // Execute the accumulation function accumulator.add(i) } } println("sum="+accumulator.value) } }
Be careful : Although accumulator is also called global write only variable , But he is also readable , It is only the operation of writing that is used at most
Broadcast variables
Why use broadcast variables : because spark There is an independent copy of the variable of , Then when sending Driver For each task Distribute a copy , stay task Too much time will consume a lot of system resources , So there are broadcast variables , Broadcast variables are also called global readable variables
The illustration
The use of broadcast variables
// Define broadcast variables val a = 3 val broadcast = sc.broadcast(a) // Restore val c = broadcast.value
Be careful : The broadcast variable is the same as its other name. Once defined, it can only be read and cannot be overwritten
SparkSQL
What is? SpqrkSQL:Spark Is a similar to HIve A kind of SQL It's all to replace MapReduce The birth of the , It provides 2 Programming abstractions :DataFrame and DataSet, And as distributed SQL The function of query engine
advantage : Easy integration , Unified data access , compatible Hive, Standard data connection
DataFrame
dataFrame It's a use RDD Distributed data containers written for the bottom , Compare with RDD For example, he added the structure of data , Similar to two-dimensional tables and hive similar .
dataFrame Use :
df At build time , Need to be right RDD The data from map To add data structures , Or directly read files with data structures, such as :json file
val conf = new SparkConf().setMaster("local[*]").setAppName("Spark_Sql1") val session = new SparkSession.Builder().config(conf).getOrCreate() // Read data and build dataFrame val frame = session.read.json("D:\\spark-class\\in\\user.json") // use sql Syntax to access data // take dataFram Convert to a table frame.createOrReplaceTempView("xxx"); session.sql("select * from xxx").show //frame.show() session.stop()
DataSet
DataSet And Df Very similar, but ,DS stay DF Data types are added on the basis of , In this way, data can be queried as an object .
DS Use DS The use of DF similar , However, you need to specify the type of data when creating , You can create it through a sample class
RDD,DF,DS Direct conversion
The conversion of these three is to add or remove data types through specific functions , Data structure for conversion
object Spark_SQL2 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("Spark_Sql1") val session = new SparkSession.Builder().config(conf).getOrCreate() // Implicit conversion rules need to be introduced before replacement // take RDD Convert to DF val listRDD = session.sparkContext.makeRDD(List((1, "zhangsan", 21), (2, "lisi", 24))) import session.implicits._ val dataFrame = listRDD.toDF("id", "name", "age") //DF Convert to DS val dataSet = dataFrame.as[User] //DS Convert to DF val dataFrame1 = dataSet.toDF() //DF Convert to RDD val rdd = dataFrame1.rdd rdd.foreach(row =>{ println(row.getString(1)) }) } /** * Sample class */ case class User(id:Int ,name:String,age:Int) }
def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("word") val session = new SparkSession.Builder().config(conf).getOrCreate() import session.implicits._ // establish RDD val listRDD = session.sparkContext.makeRDD(List((1, "zhangsan", 21), (2, "lisi", 24))) //RDD Convert to DS val userRDD = listRDD.map { case (id,name,age) => { User(id, name, age) } } userRDD.toDS().show() } /** * Sample class */ case class User(id:Int ,name:String,age:Int)
Custom aggregate functions
Weak type : No data type is specified
object Spark_SQL4 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("word") val session = new SparkSession.Builder().config(conf).getOrCreate() import session.implicits._ // establish RDD val listRDD = session.sparkContext.makeRDD(List((1, "zhangsan", 21), (2, "lisi", 24))) val udaf=new MyAgeAvg // Register aggregate function session.udf.register("avgAGE",udaf) // Use val rdd = listRDD.map { case (id, name, age) => { User(id, name, age) } } val ds = rdd.toDS().createOrReplaceTempView("user") session.sql("select avgAGE(age) from user ").show() } /** * Sample class */ case class User(id:Int ,name:String,age:Int) /** * Custom weak type aggregate function * 1. Inherit UserDefinedAggregateFunction * 2. Implementation method */ class MyAgeAvg extends UserDefinedAggregateFunction{ // Function input data structure override def inputSchema: StructType = { new StructType().add("age",LongType) } // Data structure during calculation override def bufferSchema: StructType = { new StructType().add("sum",LongType).add("count",LongType) } // The data type returned by the function override def dataType: DataType =DoubleType // Whether the function is stable override def deterministic: Boolean = true // Buffer initialization before calculation override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0)=0L buffer(1)=0L } // Update data override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { buffer(0)=buffer.getLong(0)+input.getLong(0) buffer(1)=buffer.getLong(1)+1 } // Combine multiple node buffers override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0) buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1) } // Calculation override def evaluate(buffer: Row): Any ={ buffer.getLong(0).toDouble/buffer.getLong(1) } } }
Strong type : Specify the data type
object Spark_SQL5 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("word") val session = new SparkSession.Builder().config(conf).getOrCreate() import session.implicits._ // establish RDD val listRDD = session.sparkContext.makeRDD(List((1, "zhangsan", 21), (2, "lisi", 24))) val udaf = new MyAgeAvgClass // Convert the aggregate function to the column of the query val avg = udaf.toColumn.name("avgAge") // Use val rdd = listRDD.map { case (id, name, age) => { User(id, name, age) } } val ds = rdd.toDS() ds.select(avg).show } /** * Sample class */ case class User(id: Int, name: String, age: Int) /** * Define data types */ case class AvgBuff(var sum:Int,var count:Int) /** * Custom strongly typed aggregate functions * 1.Aggregator, Set generics * 2. Implementation method */ class MyAgeAvgClass extends Aggregator[User,AvgBuff,Double] { // Buffer initialization override def zero: AvgBuff = {AvgBuff(0,0)} // Aggregate data override def reduce(b: AvgBuff, a: User): AvgBuff = { b.sum=b.sum+a.age b.count=b.count+1 b } // Merge operation of buffer override def merge(b1: AvgBuff, b2: AvgBuff): AvgBuff = { b1.sum=b1.sum+b2.sum b1.count=b1.count+b2.count b1 } // Calculation override def finish(reduction: AvgBuff): Double = { reduction.sum.toDouble/reduction.count } override def bufferEncoder: Encoder[AvgBuff] = Encoders.product override def outputEncoder: Encoder[Double] = Encoders.scalaDouble } }
SparkSession:
SparkSession It provides a unified entry point for users , He is used to replace SparkContext For convenience ,SparkSession Because Spark The operation of RDD For different API Need different Context use SparkSession To integrate them .
advantage :
Provide users with a unified entry point to use Spark Functions
Allow users to call DataFrame and Dataset relevant API To write a program
It reduces some concepts that users need to understand , It's easy to communicate with Spark Interact
And Spark You don't need to create a display when you interact SparkConf, SparkContext as well as SQlContext, These objects have been enclosed in SparkSession in
边栏推荐
- 沉淀下来的数据库操作类-C#版(SQL Server)
- Vscode matches and replaces the brackets
- [ASM] introduction and use of bytecode operation classwriter class
- 华为认证云计算HICA
- Grafana 9 正式发布,更易用,更酷炫了!
- 2021-03-22 "display login screen during recovery" can't be canceled. The appearance of lock screen interface leads to the solution that the remotely connected virtual machine can't work normally
- Total / statistics function of MySQL
- C# WinForm系列-Button简单使用
- Final review of information and network security (based on the key points given by the teacher)
- [CISCN 2021 华南赛区]rsa Writeup
猜你喜欢
yarn : 无法加载文件 D:\ProgramFiles\nodejs\yarn.ps1,因为在此系统上禁止运行脚本
07 personal R & D products and promotion - human resources information management system
虚拟机启动提示Probing EDD (edd=off to disable)错误
网络分层概念及基本知识
[reverse] repair IAT and close ASLR after shelling
05个人研发的产品及推广-数据同步工具
RepPoints:可形变卷积的进阶
Application service configurator (regular, database backup, file backup, remote backup)
C#版Selenium操作Chrome全屏模式显示(F11)
[ASM] introduction and use of bytecode operation classwriter class
随机推荐
信息与网络安全期末复习(完整版)
Jetpack compose 1.1 release, based on kotlin's Android UI Toolkit
Redis installation on centos7
Flink analysis (I): basic concept analysis
MySQL stored procedure
BearPi-HM_ Nano development board "flower protector" case
集成开发管理平台
C#版Selenium操作Chrome全屏模式显示(F11)
[CISCN 2021 华南赛区]rsa Writeup
应用服务配置器(定时,数据库备份,文件备份,异地备份)
C WinForm series button easy to use
Junit单元测试
Start job: operation returned an invalid status code 'badrequst' or 'forbidden‘
Pyspark operator processing spatial data full parsing (4): let's talk about spatial operations first
【逆向】脱壳后修复IAT并关闭ASLR
CTF逆向入门题——掷骰子
04个人研发的产品及推广-数据推送工具
Flink parsing (IV): recovery mechanism
基于LNMP部署flask项目
[translation] principle analysis of X Window Manager (I)