当前位置:网站首页>spark学习笔记(六)——sparkcore核心编程-RDD行动算子
spark学习笔记(六)——sparkcore核心编程-RDD行动算子
2022-07-27 01:31:00 【一个人的牛牛】
行动算子-触发作业的执行(runjob)
创建activeJob,提交并执行
目录
RDD转换:对RDD功能的补充和封装,将旧的RDD包装成为新的RDD;
RDD行动:触发任务的调度和作业的执行。
(1)reduce
函数签名:def reduce(f: (T, T) => T): T
函数说明:聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————reduce
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
val A = rdd.reduce(_ + _) //相加
println(A)
//TODO 关闭环境
sc.stop()
}
(2)collect
函数签名:def collect(): Array[T]
函数说明:在驱动程序中,以数组Array的形式返回数据集的所有元素。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————collect
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
//采集
val ints = rdd.collect()
println(ints.mkString("."))
//TODO 关闭环境
sc.stop()
}
(3)count
函数签名:def count(): Long
函数说明:返回RDD中元素的个数。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————count
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
val l = rdd.count()
println(l)
//TODO 关闭环境
sc.stop()
}
(4)first
函数签名:def first(): T
函数说明:返回RDD中的第一个元素。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————first
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
val firstRDD = rdd.first()
println(firstRDD)
//TODO 关闭环境
sc.stop()
}
(5)take
函数签名:def take(num: Int): Array[T]
函数说明:返回一个由RDD的前n个元素组成的数组。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————take
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
val takeRDD = rdd.take(4)
println(takeRDD.mkString("."))
//TODO 关闭环境
sc.stop()
}
(6)takeOrdered
函数签名:def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
函数说明:返回该RDD排序后的前n个元素组成的数组。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————takeOrdered
val rdd = sc.makeRDD(List(1, 4, 5, 2, 3, 6))
val takeRDD = rdd.takeOrdered(4)
println(takeRDD.mkString("."))
//TODO 关闭环境
sc.stop()
}
(7)aggregate
函数签名:def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
函数说明:分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合 。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————aggregate
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
val aggregateRDD = rdd.aggregate(5)(_ + _, _ + _)
//1+2+3+5=11 4+5+6+5=20 11+20+5=36
println(aggregateRDD)
//TODO 关闭环境
sc.stop()
}
(8)fold
函数签名:def fold(zeroValue: T)(op: (T, T) => T): T
函数说明:折叠操作,aggregate的简化版操作。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————fold
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6),2)
val foldRDD = rdd.fold(5)(_+_)
//1+2+3+5=11 4+5+6+5=20 11+20+5=36
println(foldRDD)
//TODO 关闭环境
sc.stop()
}
(9)countByKey
函数签名:def countByKey(): Map[K, Long]
函数说明:统计每种key的个数。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————countByKey
//countByValue
println("countByValue")
val rdd = sc.makeRDD(List(1,2,3,4,1,1,2),2)
val countRDD = rdd.countByValue()
println(countRDD)
//countByKey
println("countByKey")
val rdd2 = sc.makeRDD(List(
("A",1),("B",1),("A",2),("C",3)
))
val countRDD2 = rdd2.countByKey()
println(countRDD2)
//TODO 关闭环境
sc.stop()
}
(10)save 相关算子
函数签名:
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
函数说明:将数据保存到不同格式的文件中。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————save
val rdd = sc.makeRDD(List(
("A", 1), ("B", 1), ("C", 1), ("A", 2), ("B", 2),("C",2)
),2)
rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output1")
rdd.saveAsSequenceFile("output2")
//TODO 关闭环境
sc.stop()
}
(11)foreach
函数签名:def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) }
函数说明:分布式遍历RDD中的每一个元素,调用指定函数。
def main(args: Array[String]): Unit = {
//TODO 创建环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//TODO 行动算子 action-operator————foreach
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
rdd.collect().foreach(println)
println(">>>>>>>>>>>>>")
rdd.foreach(println) // 从内存打印
//TODO 关闭环境
sc.stop()
}
本文仅仅是学习笔记的记录!!!
边栏推荐
- 二叉树(北京邮电大学机试题)(DAY 85)
- Boom 3D全新2022版音频增强应用程序App
- Functions that should be selected for URL encoding and decoding
- On the prototype of constructor
- 自己梳理的LocalDateTime的工具类
- 易灵思T35 FPGA驱动LVDS显示屏
- 在线问题反馈模块实战(十五):实现在线更新反馈状态功能
- [binary search simple question] leetcode 35. search insertion position, 69. Square root of X, 367. Effective complete square, 441. Arrange coins
- 朴素贝叶斯——文档分类
- MySQL:互联网公司常用分库分表方案汇总
猜你喜欢

How big is the bandwidth of the Tiktok server for hundreds of millions of people to brush at the same time?

Post responsibilities of safety officer and environmental protection officer

商城小程序项目完整源码(微信小程序)

深度学习——词汇embedded、Beam Search

关于OpenFeign的源码分析

记录一次,php程序访问系统文件访问错误的问题

OpenTelemetry 在服务网格架构下的最佳实践

队列达到最大长度代码实战

HCIP第十四天笔记

毕业2年转行软件测试获得12K+,不考研月薪过万的梦想实现了
随机推荐
[二分查找简单题] LeetCode 35. 搜索插入位置,69. x 的平方根,367. 有效的完全平方数,441. 排列硬币
CAS deployment and successful login jump address
Oracle有没有分布式数据库?
周全的照护 解析LYRIQ锐歌电池安全设计
Worthington过氧化物酶活性的6种测定方法
数模1232
毕业2年转行软件测试获得12K+,不考研月薪过万的梦想实现了
围圈报数(北理工机试题)(DAY 83)
最大连续子序列(DAY 77)
数据湖(二十):Flink兼容Iceberg目前不足和Iceberg与Hudi对比
spark:计算不同分区中相同key的平均值(入门级-简单实现)
【flask】服务端获取客户端请求的文件
基于.NetCore开发博客项目 StarBlog - (16) 一些新功能 (监控/统计/配置/初始化)
pip3 设置阿里云
[动态规划简单题] LeetCode 53. 最大子数组和
2513: 小勇学分数(公约数问题)
Skywalking系列学习之告警通知源码分析
Source code analysis of warning notification for skywalking series learning
Use the most primitive method to manually implement the common 20 array methods
be based on. NETCORE development blog project starblog - (16) some new functions (monitoring / statistics / configuration / initialization)