当前位置:网站首页>Spark Learning Notes (VI) -- spark core core programming RDD action operator
Spark Learning Notes (VI) -- spark core core programming RDD action operator
2022-07-27 03:24:00 【One's cow】
Action operator - Trigger the execution of the job (runjob)
establish activeJob, Submit and execute
Catalog
RDD transformation : Yes RDD Supplement and encapsulation of functions , Put the old RDD Packaging becomes new RDD;
RDD action : Trigger task scheduling and job execution .
(1)reduce
Function signature :def reduce(f: (T, T) => T): T
Function description : Gather RDD All elements in , Aggregate the data in the partition first , Re aggregate inter partition data .
def main(args: Array[String]): Unit = {
//TODO Create an environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO Action operator action-operator————reduce
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
val A = rdd.reduce(_ + _) // Add up
println(A)
//TODO Shut down the environment
sc.stop()
}
(2)collect
Function signature :def collect(): Array[T]
Function description : In the driver , In array Array Returns all elements of the dataset in the form of .
def main(args: Array[String]): Unit = {
//TODO Create an environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO Action operator action-operator————collect
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
// collection
val ints = rdd.collect()
println(ints.mkString("."))
//TODO Shut down the environment
sc.stop()
}
(3)count
Function signature :def count(): Long
Function description : return RDD The number of elements in .
def main(args: Array[String]): Unit = {
//TODO Create an environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO Action operator action-operator————count
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
val l = rdd.count()
println(l)
//TODO Shut down the environment
sc.stop()
}
(4)first
Function signature :def first(): T
Function description : return RDD The first element in .
def main(args: Array[String]): Unit = {
//TODO Create an environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO Action operator action-operator————first
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
val firstRDD = rdd.first()
println(firstRDD)
//TODO Shut down the environment
sc.stop()
}
(5)take
Function signature :def take(num: Int): Array[T]
Function description : Returns a RDD Before n An array of elements .
def main(args: Array[String]): Unit = {
//TODO Create an environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO Action operator action-operator————take
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
val takeRDD = rdd.take(4)
println(takeRDD.mkString("."))
//TODO Shut down the environment
sc.stop()
}
(6)takeOrdered
Function tab name :def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
Function description : Return to the RDD The first after sorting n An array of elements .
def main(args: Array[String]): Unit = {
//TODO Create an environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO Action operator action-operator————takeOrdered
val rdd = sc.makeRDD(List(1, 4, 5, 2, 3, 6))
val takeRDD = rdd.takeOrdered(4)
println(takeRDD.mkString("."))
//TODO Shut down the environment
sc.stop()
}
(7)aggregate
Function signature :def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
Function description : The data of the partition is aggregated through the initial value and the data in the partition , Then, data aggregation between partitions is performed with the initial value .
def main(args: Array[String]): Unit = {
//TODO Create an environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO Action operator action-operator————aggregate
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
val aggregateRDD = rdd.aggregate(5)(_ + _, _ + _)
//1+2+3+5=11 4+5+6+5=20 11+20+5=36
println(aggregateRDD)
//TODO Shut down the environment
sc.stop()
}
(8)fold
Function signature :def fold(zeroValue: T)(op: (T, T) => T): T
Function description : Folding operation ,aggregate Simplified version of operation .
def main(args: Array[String]): Unit = {
//TODO Create an environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO Action operator action-operator————fold
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6),2)
val foldRDD = rdd.fold(5)(_+_)
//1+2+3+5=11 4+5+6+5=20 11+20+5=36
println(foldRDD)
//TODO Shut down the environment
sc.stop()
}
(9)countByKey
Function signature :def countByKey(): Map[K, Long]
Function description : Count each key The number of .
def main(args: Array[String]): Unit = {
//TODO Create an environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO Action operator action-operator————countByKey
//countByValue
println("countByValue")
val rdd = sc.makeRDD(List(1,2,3,4,1,1,2),2)
val countRDD = rdd.countByValue()
println(countRDD)
//countByKey
println("countByKey")
val rdd2 = sc.makeRDD(List(
("A",1),("B",1),("A",2),("C",3)
))
val countRDD2 = rdd2.countByKey()
println(countRDD2)
//TODO Shut down the environment
sc.stop()
}
(10)save Correlation operator
Function signature :
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
Function description : Save the data to a file of different formats .
def main(args: Array[String]): Unit = {
//TODO Create an environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO Action operator action-operator————save
val rdd = sc.makeRDD(List(
("A", 1), ("B", 1), ("C", 1), ("A", 2), ("B", 2),("C",2)
),2)
rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output1")
rdd.saveAsSequenceFile("output2")
//TODO Shut down the environment
sc.stop()
}
(11)foreach
Function signature :def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) }
Function description : Distributed traversal RDD Every element in , Call the specified function .
def main(args: Array[String]): Unit = {
//TODO Create an environment
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO Action operator action-operator————foreach
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
rdd.collect().foreach(println)
println(">>>>>>>>>>>>>")
rdd.foreach(println) // Print from memory
//TODO Shut down the environment
sc.stop()
}
This article is just a record of learning notes !!!
边栏推荐
猜你喜欢

Detailed explanation of const usage in C language

Worthington果胶酶的特性及测定方案

Code practice when the queue reaches the maximum length

【1206. 设计跳表】

Idea 中添加支持@Data 插件

【树链剖分】模板题

Pytoch loss function summary

Details of impala implementation plan

Pytorch损失函数总结

$128million! IQM, a Finnish quantum computing company, was supported by the world fund
随机推荐
Role of thread.sleep (0)
spark:计算不同分区中相同key的平均值(入门级-简单实现)
Explain
渗透测试-后渗透-痕迹清理
【flask】服务端获取客户端请求的文件
【树链剖分】模板题
Comprehensive care analysis lyriq Ruige battery safety design
食物链(DAY 79)
客户案例 | 关注老年用户体验,银行APP适老化改造要避虚就实
《稻盛和夫给年轻人的忠告》阅读笔记
消息被拒MQ
【学习笔记之菜Dog学C】字符串+内存函数
Portraiture5 new and upgraded leather filter plug-in artifact
关于OpenFeign的源码分析
阶乘末尾0的数量
How many implementation postures of delay queue? Daily essential skills!
【常用搜索问题】111
flask_restful中reqparse解析器继承
30分钟彻底弄懂 synchronized 锁升级过程
185. All employees with the top three highest wages in the Department (mandatory)