当前位置:网站首页>Spark中groupByKey() 和 reduceByKey() 和combineByKey()
Spark中groupByKey() 和 reduceByKey() 和combineByKey()
2022-07-05 05:50:00 【YaoYong_BigData】
一、groupByKey:
在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD,也是对每个key进行操作,但只生成一个sequence,groupByKey本身不能自定义函数,需要先用groupByKey生成RDD,然后才能对此RDD通过map进行自定义函数操作。
作用:
GroupByKey 算子的主要作用是按照 Key 分组, 和 ReduceByKey 有点类似, 但是 GroupByKey 并不求聚合, 只是列举 Key 对应的所有 Value。
注意点:
GroupByKey 是一个 Shuffled。
GroupByKey 和 ReduceByKey 不同, 因为需要列举 Key 对应的所有数据, 所以无法在 Map 端做 Combine, 所以 GroupByKey 的性能并没有 ReduceByKey 好。
二、reduceByKey:
是对key的value进行merge操作,在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义。
作用:
首先按照 Key 分组生成一个 Tuple, 然后针对每个组执行
reduce
算子。
调用:
def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]
参数:
func → 执行数据处理的函数, 传入两个参数, 一个是当前值, 一个是局部汇总, 这个函数需要有一个输出, 输出就是这个 Key 的汇总结果。
注意点:
ReduceByKey 只能作用于 Key-Value 型数据, Key-Value 型数据在当前语境中特指 Tuple2。
ReduceByKey 是一个需要 Shuffled 的操作。
和其它的 Shuffled 相比, ReduceByKey是高效的, 因为类似 MapReduce 的, 在 Map 端有一个 Cominer, 这样 I/O 的数据便会减少。
三、combineByKey()
spark根据key做聚合的aggregateByKey,groupByKey,reduceByKey等常用算子其底层本质都是调用combineByKey实现的,即都是combineByKey的一种特殊情况,下面介绍下combineByKey算子:
源码有两种方式:
/**
*
* @param createCombiner
* @param mergeValue
* @param mergeCombiners
* @tparam C
* @return
*/
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
/**
*
* @param createCombiner
* @param mergeValue
* @param mergeCombiners
* @param partitioner
* @param mapSideCombine
* @param serializer
* @tparam C
* @return
*/
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
partitioner, mapSideCombine, serializer)(null)
}
}
参数说明:
1、createCombiner:V => C ,分区内创建组合函数。这个函数把当前的值作为参数,此时我们可以对其做些附加操作(类型转换)并把它返回 (这一步类似于初始化操作)。
说明:同一分区内,同一个key,只有第一次出现的value才会运行此函数。
2、mergeValue: (C, V) => C,分区内合并值函数。该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内进行)。
说明:同一分区内,同一个key,非第一次出现value运行此函数,其中C为上次的结果,V为本次值。
3、mergeCombiners: (C, C) => C,多分区合并组合器函数。该函数把2个元素C合并。
说明:这个操作在不同分区间进行。
4、partitioner:自定义分区数,默认为HashPartitioner。
5、mapSideCombine:是否在map端进行Combine操作,默认为true。
过一下图,逻辑应该就清晰了:
补充一个更详细的图:
四、实例
/**
* ------输出结果------
* (tim,1)
* (lucy,1)
* (Hello,3)
* (lily,1)
*/
@Test
def reduceByKeyTest():Unit = {
// 1. 创建 RDD
val rdd1 = sc.parallelize(Seq("Hello lily", "Hello lucy", "Hello tim"))
// 2. 处理数据
val rdd2 = rdd1.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
// 3. 得到结果
val result = rdd2.collect()
result.foreach(println(_))
// 4. 关闭sc
sc.stop()
}
/**
* ------输出结果------
* (a,CompactBuffer(1, 1))
* (b,CompactBuffer(1))
*/
@Test
def groupByKey(): Unit = {
val rdd: RDD[(String, Int)] = sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1)))
val rdd1: RDD[(String, Iterable[Int])] = rdd.groupByKey()
val rdd2: Array[(String, Iterable[Int])] = rdd1.collect()
rdd2.foreach(println(_))
}
/**
* ------输出结果------
* (zhangsan,97.33333333333333)
* (lisi,97.5)
* ---求每个学生的平均成绩
*/
@Test
def combineByKey(): Unit = {
// 1. 准备集合
val rdd: RDD[(String, Double)] = sc.parallelize(Seq(
("zhangsan", 99.0),
("zhangsan", 96.0),
("lisi", 97.0),
("lisi", 98.0),
("zhangsan", 97.0))
)
// 2. 算子操作
// 2.1. createCombiner 转换数据
// 2.2. mergeValue 分区上的聚合
// 2.3. mergeCombiners 把所有分区上的结果再次聚合, 生成最终结果
val combineResult = rdd.combineByKey(
createCombiner = (curr: Double) => (curr, 1),
mergeValue = (curr: (Double, Int), nextValue: Double) => (curr._1 + nextValue, curr._2 + 1),
mergeCombiners = (curr: (Double, Int), agg: (Double, Int)) => (curr._1 + agg._1, curr._2 + agg._2)
)
/**
* ------输出结果------
* (zhangsan,(292.0,3))
* (lisi,(195.0,2))
*/
//val resultRDD = combineResult
val resultRDD = combineResult.map( item => (item._1, item._2._1 / item._2._2) )
// 3. 获取结果, 打印结果
resultRDD.collect().foreach(println(_))
}
边栏推荐
- leetcode-9:回文数
- Sword finger offer 35 Replication of complex linked list
- Educational Codeforces Round 107 (Rated for Div. 2) E. Colorings and Dominoes
- [jailhouse article] performance measurements for hypervisors on embedded ARM processors
- 卷积神经网络简介
- Acwing 4300. Two operations
- One question per day 1447 Simplest fraction
- 网络工程师考核的一些常见的问题:WLAN、BGP、交换机
- Brief introduction to tcp/ip protocol stack
- 注解与反射
猜你喜欢
【云原生】微服务之Feign自定义配置的记录
Time of process
[practical skills] how to do a good job in technical training?
Acwing 4300. Two operations
Introduction and experience of wazuh open source host security solution
Dichotomy, discretization, etc
Implement an iterative stack
Dynamic planning solution ideas and summary (30000 words)
剑指 Offer 53 - II. 0~n-1中缺失的数字
Analysis of backdoor vulnerability in remote code execution penetration test / / phpstudy of national game title of national secondary vocational network security B module
随机推荐
过拟合与正则化
Common optimization methods
全排列的代码 (递归写法)
2022年貴州省職業院校技能大賽中職組網絡安全賽項規程
每日一题-无重复字符的最长子串
[cloud native] record of feign custom configuration of microservices
Daily question 1342 Number of operations to change the number to 0
API related to TCP connection
QT判断界面当前点击的按钮和当前鼠标坐标
Light a light with stm32
[practical skills] how to do a good job in technical training?
卷积神经网络——卷积层
Analysis of backdoor vulnerability in remote code execution penetration test / / phpstudy of national game title of national secondary vocational network security B module
【实战技能】非技术背景经理的技术管理
Sword finger offer 05 Replace spaces
kubeadm系列-01-preflight究竟有多少check
卷积神经网络简介
Dynamic planning solution ideas and summary (30000 words)
Typical use cases for knapsacks, queues, and stacks
Solution to game 10 of the personal field