当前位置:网站首页>spark学习笔记(六)——sparkcore核心编程-RDD行动算子
spark学习笔记(六)——sparkcore核心编程-RDD行动算子
2022-07-27 01:31:00 【一个人的牛牛】
行动算子-触发作业的执行(runjob)
创建activeJob,提交并执行
目录
RDD转换:对RDD功能的补充和封装,将旧的RDD包装成为新的RDD;
RDD行动:触发任务的调度和作业的执行。
(1)reduce
函数签名:def reduce(f: (T, T) => T): T
函数说明:聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————reduce
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
val A = rdd.reduce(_ + _) //相加
println(A)
//TODO 关闭环境
sc.stop()
}
(2)collect
函数签名:def collect(): Array[T]
函数说明:在驱动程序中,以数组Array的形式返回数据集的所有元素。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————collect
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
//采集
val ints = rdd.collect()
println(ints.mkString("."))
//TODO 关闭环境
sc.stop()
}
(3)count
函数签名:def count(): Long
函数说明:返回RDD中元素的个数。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————count
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
val l = rdd.count()
println(l)
//TODO 关闭环境
sc.stop()
}
(4)first
函数签名:def first(): T
函数说明:返回RDD中的第一个元素。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————first
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
val firstRDD = rdd.first()
println(firstRDD)
//TODO 关闭环境
sc.stop()
}
(5)take
函数签名:def take(num: Int): Array[T]
函数说明:返回一个由RDD的前n个元素组成的数组。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————take
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
val takeRDD = rdd.take(4)
println(takeRDD.mkString("."))
//TODO 关闭环境
sc.stop()
}
(6)takeOrdered
函数签名:def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
函数说明:返回该RDD排序后的前n个元素组成的数组。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————takeOrdered
val rdd = sc.makeRDD(List(1, 4, 5, 2, 3, 6))
val takeRDD = rdd.takeOrdered(4)
println(takeRDD.mkString("."))
//TODO 关闭环境
sc.stop()
}
(7)aggregate
函数签名:def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
函数说明:分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合 。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————aggregate
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
val aggregateRDD = rdd.aggregate(5)(_ + _, _ + _)
//1+2+3+5=11 4+5+6+5=20 11+20+5=36
println(aggregateRDD)
//TODO 关闭环境
sc.stop()
}
(8)fold
函数签名:def fold(zeroValue: T)(op: (T, T) => T): T
函数说明:折叠操作,aggregate的简化版操作。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————fold
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6),2)
val foldRDD = rdd.fold(5)(_+_)
//1+2+3+5=11 4+5+6+5=20 11+20+5=36
println(foldRDD)
//TODO 关闭环境
sc.stop()
}
(9)countByKey
函数签名:def countByKey(): Map[K, Long]
函数说明:统计每种key的个数。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————countByKey
//countByValue
println("countByValue")
val rdd = sc.makeRDD(List(1,2,3,4,1,1,2),2)
val countRDD = rdd.countByValue()
println(countRDD)
//countByKey
println("countByKey")
val rdd2 = sc.makeRDD(List(
("A",1),("B",1),("A",2),("C",3)
))
val countRDD2 = rdd2.countByKey()
println(countRDD2)
//TODO 关闭环境
sc.stop()
}
(10)save 相关算子
函数签名:
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
函数说明:将数据保存到不同格式的文件中。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————save
val rdd = sc.makeRDD(List(
("A", 1), ("B", 1), ("C", 1), ("A", 2), ("B", 2),("C",2)
),2)
rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output1")
rdd.saveAsSequenceFile("output2")
//TODO 关闭环境
sc.stop()
}
(11)foreach
函数签名:def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) }
函数说明:分布式遍历RDD中的每一个元素,调用指定函数。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————foreach
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
rdd.collect().foreach(println)
println(">>>>>>>>>>>>>")
rdd.foreach(println) // 从内存打印
//TODO 关闭环境
sc.stop()
}
本文仅仅是学习笔记的记录!!!
边栏推荐
- Worthington果胶酶的特性及测定方案
- How big is the bandwidth of the Tiktok server for hundreds of millions of people to brush at the same time?
- 积分发放带给商家的两个帮助
- OC-消息机制
- 水仙花数(DAY 78)
- My crawler notes (VII) blog traffic +1 through Crawlers
- localStorage与sessionStorage
- Skywalking系列学习之告警通知源码分析
- [binary search simple question] leetcode 35. search insertion position, 69. Square root of X, 367. Effective complete square, 441. Arrange coins
- Jmeter分布式压测
猜你喜欢

CAS部署使用以及登录成功跳转地址

Alibaba cloud technology expert Yang Zeqiang: Construction of observability on elastic computing cloud

在线问题反馈模块实战(十五):实现在线更新反馈状态功能

5、 MFC view windows and documents

MySQL:互联网公司常用分库分表方案汇总

延时队列的几种实现姿势?日常必备技能!

Comprehensive care analysis lyriq Ruige battery safety design

Yilingsi T35 FPGA drives LVDS display screen

安全员及环保员岗位职责

【学习笔记之菜Dog学C】字符串+内存函数
随机推荐
The EXE compiled by QT is started with administrator privileges
围圈报数(北理工机试题)(DAY 83)
[binary search simple question] leetcode 35. search insertion position, 69. Square root of X, 367. Effective complete square, 441. Arrange coins
[hash table] question collection
HCIP第十三天笔记
“date: write error: No space left on device”解决
Common questions and answers of software testing interview (divergent thinking, interface, performance, concept,)
Win10/win11 lossless expansion of C disk space, cross disk consolidation of C and e disks
196. 删除重复的电子邮箱
Hcip day 14 notes
How to visit the latest version of burpsuite pro in vain
The most complete basic knowledge of software testing in the whole network (a must for beginners)
Call jshaman's Web API interface to realize JS code encryption.
My crawler notes (VII) blog traffic +1 through Crawlers
Best practices of opentelemetry in service grid architecture
[栈和队列简单题] LeetCode 232. 用栈实现队列,225. 用队列实现栈
[SQL简单题] LeetCode 627. 变更性别
Leetcode 207. curriculum (July 26, 2022)
朴素贝叶斯——文档分类
Oracle有没有分布式数据库?