当前位置:网站首页>spark学习笔记(六)——sparkcore核心编程-RDD行动算子
spark学习笔记(六)——sparkcore核心编程-RDD行动算子
2022-07-27 01:31:00 【一个人的牛牛】
行动算子-触发作业的执行(runjob)
创建activeJob,提交并执行
目录
RDD转换:对RDD功能的补充和封装,将旧的RDD包装成为新的RDD;
RDD行动:触发任务的调度和作业的执行。
(1)reduce
函数签名:def reduce(f: (T, T) => T): T
函数说明:聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————reduce
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
val A = rdd.reduce(_ + _) //相加
println(A)
//TODO 关闭环境
sc.stop()
}
(2)collect
函数签名:def collect(): Array[T]
函数说明:在驱动程序中,以数组Array的形式返回数据集的所有元素。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————collect
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
//采集
val ints = rdd.collect()
println(ints.mkString("."))
//TODO 关闭环境
sc.stop()
}
(3)count
函数签名:def count(): Long
函数说明:返回RDD中元素的个数。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————count
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
val l = rdd.count()
println(l)
//TODO 关闭环境
sc.stop()
}
(4)first
函数签名:def first(): T
函数说明:返回RDD中的第一个元素。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————first
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
val firstRDD = rdd.first()
println(firstRDD)
//TODO 关闭环境
sc.stop()
}
(5)take
函数签名:def take(num: Int): Array[T]
函数说明:返回一个由RDD的前n个元素组成的数组。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————take
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
val takeRDD = rdd.take(4)
println(takeRDD.mkString("."))
//TODO 关闭环境
sc.stop()
}
(6)takeOrdered
函数签名:def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
函数说明:返回该RDD排序后的前n个元素组成的数组。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————takeOrdered
val rdd = sc.makeRDD(List(1, 4, 5, 2, 3, 6))
val takeRDD = rdd.takeOrdered(4)
println(takeRDD.mkString("."))
//TODO 关闭环境
sc.stop()
}
(7)aggregate
函数签名:def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
函数说明:分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合 。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————aggregate
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
val aggregateRDD = rdd.aggregate(5)(_ + _, _ + _)
//1+2+3+5=11 4+5+6+5=20 11+20+5=36
println(aggregateRDD)
//TODO 关闭环境
sc.stop()
}
(8)fold
函数签名:def fold(zeroValue: T)(op: (T, T) => T): T
函数说明:折叠操作,aggregate的简化版操作。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————fold
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6),2)
val foldRDD = rdd.fold(5)(_+_)
//1+2+3+5=11 4+5+6+5=20 11+20+5=36
println(foldRDD)
//TODO 关闭环境
sc.stop()
}
(9)countByKey
函数签名:def countByKey(): Map[K, Long]
函数说明:统计每种key的个数。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————countByKey
//countByValue
println("countByValue")
val rdd = sc.makeRDD(List(1,2,3,4,1,1,2),2)
val countRDD = rdd.countByValue()
println(countRDD)
//countByKey
println("countByKey")
val rdd2 = sc.makeRDD(List(
("A",1),("B",1),("A",2),("C",3)
))
val countRDD2 = rdd2.countByKey()
println(countRDD2)
//TODO 关闭环境
sc.stop()
}
(10)save 相关算子
函数签名:
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
函数说明:将数据保存到不同格式的文件中。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————save
val rdd = sc.makeRDD(List(
("A", 1), ("B", 1), ("C", 1), ("A", 2), ("B", 2),("C",2)
),2)
rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output1")
rdd.saveAsSequenceFile("output2")
//TODO 关闭环境
sc.stop()
}
(11)foreach
函数签名:def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) }
函数说明:分布式遍历RDD中的每一个元素,调用指定函数。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————foreach
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
rdd.collect().foreach(println)
println(">>>>>>>>>>>>>")
rdd.foreach(println) // 从内存打印
//TODO 关闭环境
sc.stop()
}
本文仅仅是学习笔记的记录!!!
边栏推荐
猜你喜欢

图解用户登录验证流程,写得太好了!

Use the most primitive method to manually implement the common 20 array methods

Idea 中添加支持@Data 插件

HCIP第十四天笔记

代码审查金字塔

周全的照护 解析LYRIQ锐歌电池安全设计

Worthington果胶酶的特性及测定方案

impala 执行计划详解

一体式水利视频监控站 遥测终端视频图像水位水质水量流速监测

Practice of online problem feedback module (XV): realize the function of online updating feedback status
随机推荐
Alibaba cloud technology expert Yang Zeqiang: Construction of observability on elastic computing cloud
代码审查金字塔
基于.NetCore开发博客项目 StarBlog - (16) 一些新功能 (监控/统计/配置/初始化)
Naive Bayes -- Document Classification
Comprehensive care analysis lyriq Ruige battery safety design
Hcip 13th day notes
DNS记录类型及相关名词解释
Functions that should be selected for URL encoding and decoding
图解用户登录验证流程,写得太好了!
shell awk
localStorage与sessionStorage
Acwing 2074. Countdown simulation
[simple question of stack and queue] leetcode 232. realize queue with stack, 225. realize stack with queue
朴素贝叶斯——文档分类
安全员及环保员岗位职责
Portraiture5全新升级版磨皮滤镜插件神器
Hcip day 14 notes
CAS deployment and successful login jump address
opiodr aborting process unknown ospid (21745) as a result of ORA-609
OpenTelemetry 在服务网格架构下的最佳实践