当前位置:网站首页>Scala110-combineByKey
Scala110-combineByKey
2022-07-25 15:07:00 【Weigtang 406 team】
Intro
combineByKey be applied to Key-Value data , Used to calculate a key Some indicators of . Look directly at specific examples , Know how to use ~
Generate the data
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{
DataFrame, Row, SparkSession}
import scala.collection.mutable.{
ListBuffer, ArrayBuffer}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import scala.collection.mutable.{ListBuffer, ArrayBuffer}
val builder = SparkSession
.builder()
.appName("learningScala")
.config("spark.executor.heartbeatInterval","60s")
.config("spark.network.timeout","120s")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryoserializer.buffer.max","512m")
.config("spark.dynamicAllocation.enabled", false)
.config("spark.sql.inMemoryColumnarStorage.compressed", true)
.config("spark.sql.inMemoryColumnarStorage.batchSize", 10000)
.config("spark.sql.broadcastTimeout", 600)
.config("spark.sql.autoBroadcastJoinThreshold", -1)
.config("spark.sql.crossJoin.enabled", true)
.master("local[*]")
val spark = builder.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("ERROR")
builder: org.apache.spark.sql.SparkSession.Builder = [email protected]
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.S[email protected]
import spark.implicits._
val df = Seq(
(1, "male", "18" ,"2019-01-01 11:45:50"),
(2, "female", "37" ,"2019-01-02 11:55:50"),
(3, "male", "21" ,"2019-01-21 11:45:50"),
(4, "female", "44" ,"2019-02-01 12:45:50"),
(5, "male", "40" ,"2019-01-15 10:40:50")
).toDF("id","sex","age", "createtime_str")
df: org.apache.spark.sql.DataFrame = [id: int, sex: string ... 2 more fields]
df.show(truncate=false)
+---+------+---+-------------------+
|id |sex |age|createtime_str |
+---+------+---+-------------------+
|1 |male |18 |2019-01-01 11:45:50|
|2 |female|37 |2019-01-02 11:55:50|
|3 |male |21 |2019-01-21 11:45:50|
|4 |female|44 |2019-02-01 12:45:50|
|5 |male |40 |2019-01-15 10:40:50|
+---+------+---+-------------------+
df.rdd.map(r => {
val sex = r.getAs[String]("sex")
val age = r.getAs[String]("age")
(sex,age.toInt)
}).foreach(println)
(male,18)
(female,37)
(male,21)
(female,44)
(male,40)
adopt map constitute k-v Format data
combineByKey
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}
The average age of different sexes
Look at an example : Calculate the average age of users of different genders
df.rdd.map(r => {
(r.getAs[String]("sex"),r.getAs[String]("age").toInt)}).combineByKey(
(v) => (v.toInt, 1),// initialization value, If it's the first time , Just assign a value of 1
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),// The same division , The same key How to deal with
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)// Different zones & The same key How to deal with
).foreach(println)
(female,(81,2))
(male,(79,3))
- Step 1 above , This is the first time k-v when , Generate (value,1), Write it down as acc
- The second step , The same partition is encountered again key when , perform mergeValue operation , namely acc._1+value,acc._2+1, As the cumulative frequency
- The third step , Different zones , The same key perform mergeCombiners operation , Value addition , Cumulative frequency also wants to add
- Finally, the intermediate result value is obtained , This value is equivalent to that we have divisor and dividend , One more step map, Get the final result
df.rdd.map(r => {
(r.getAs[String]("sex"),r.getAs[String]("age").toInt)}).combineByKey(
(v) => (v.toInt, 1),// initialization value, If it's the first time , Just assign a value of 1
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),// The same division , The same key How to deal with
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)// Different zones & The same key How to deal with
).map(
k=>{
(k._1,k._2._1.toFloat/k._2._2)}
).foreach(println)
(female,40.5)
(male,26.333334)
Age sets of different genders
df.rdd.map(r => {
(r.getAs[String]("sex")
,r.getAs[String]("age").toInt)})
.combineByKey(
(v) => ArrayBuffer(v),// initialization value, If it's the first time , Generate ArrayBuffer
(acc: ArrayBuffer[Int],v) => acc+=v,// The same division , The same key, Add as element to ArrayBuffer in
(acc1: ArrayBuffer[Int], acc2: ArrayBuffer[Int]) => acc1++=acc2// Different zones & The same key Merge ArrayBuffer
).foreach(println)
(female,ArrayBuffer(37, 44))
(male,ArrayBuffer(18, 21, 40))
It is similar to finding the mean value in another step , Or superimpose other calculations
df.rdd.map(r => {
(r.getAs[String]("sex")
,r.getAs[String]("age").toInt)})
.combineByKey(
(v) => ArrayBuffer(v),// initialization value, If it's the first time , Generate ArrayBuffer
(acc: ArrayBuffer[Int],v) => acc+=v,// The same division , The same key, Add as element to ArrayBuffer in
(acc1: ArrayBuffer[Int], acc2: ArrayBuffer[Int]) => acc1++=acc2// Different zones & The same key Merge ArrayBuffer
).map(k=>(k._1,k._2.sum.toFloat/k._2.length,k._2.max)).foreach(println)
(female,40.5,44)
(male,26.333334,40)
df.rdd.map(r => {
(r.getAs[String]("sex")
,r.getAs[String]("age").toInt)})
.combineByKey(
(v) => ArrayBuffer(v),// initialization value, If it's the first time , Generate ArrayBuffer
(acc: ArrayBuffer[Int],v) => acc+=v,// The same division , The same key, Add as element to ArrayBuffer in
(acc1: ArrayBuffer[Int], acc2: ArrayBuffer[Int]) => acc1++=acc2// Different zones & The same key Merge ArrayBuffer
).map(k=>(k._1,k._2.sum.toFloat/k._2.length,k._2.max)).toDF("sex","avgAge","MaxAge").show(truncate=false)
+------+---------+------+
|sex |avgAge |MaxAge|
+------+---------+------+
|male |26.333334|40 |
|female|40.5 |44 |
+------+---------+------+
2021-03-11 Jiulong lake, Jiangning District, Nanjing
边栏推荐
- Nacos2.1.0 cluster construction
- Melody + realsense d435i configuration and error resolution
- LeetCode_ String_ Medium_ 151. Reverse the words in the string
- 了解一下new的过程发生了什么
- How to realize a correct double check locking
- Yes, UDP protocol can also be used to request DNS server
- MySQL的登陆【数据库系统】
- Scala111-map、flatten、flatMap
- pl/sql 创建并执行oralce存储过程,并返回结果集
- PHP implements non blocking (concurrent) request mode through native curl
猜你喜欢

I2C device driver hierarchy

Bridge NF call ip6tables is an unknown key exception handling
![Number of high-quality number pairs [bit operation characteristics + abstract ability evaluation + grouping fast statistics]](/img/c9/8f8f0934111f7ae8f8abd656d92f12.png)
Number of high-quality number pairs [bit operation characteristics + abstract ability evaluation + grouping fast statistics]

Add the jar package under lib directory to the project in idea

44 Sina navigation, Xiaomi sidebar exercise

The solution to the problem that the progress bar of ros2 installation connext RMW is stuck at 13%

Process control (Part 1)

Deployment and simple use of PostgreSQL learning

006 operator introduction

Docker上运行redis以配置文件方式启动,连接客户端报错Error: Server closed the connection
随机推荐
[C题目]牛客 链表中倒数第k个结点
如何解决Visual Stuido2019 30天体验期过后的登陆问题
Syntax summary of easygui
Scala110-combineByKey
CMake指定OpenCV版本
LeetCode_ Factorization_ Simple_ 263. Ugly number
[C topic] Li Kou 206. reverse the linked list
用setTimeout模拟setInterval定时器
32 chrome调试工具的使用
How to use the random number function of JMeter
IP address classification, which determines whether a network segment is a subnet supernetwork
Ssh server rejected password
pl/sql 创建并执行oralce存储过程,并返回结果集
27 选择器的分类
37 element mode (inline element, block element, inline block element)
ES5写继承的思路
TypeScript学习2——接口
Deployment and simple use of PostgreSQL learning
Visual Studio 2022 查看类关系图
[C题目]力扣88. 合并两个有序数组