当前位置:网站首页>spark过滤器
spark过滤器
2022-07-31 08:59:00 【JAVA百练成神】
//过滤你不需要的字段
rdd.filter(x=>{//过滤器
val arr =x.split(",")
val jg =arr(3)
if(jg.eq("")){//如果为空那就不要了
false
}else{
true
}
})
map进行分组
map(x=>{//按照同类型分组
val arr =x.split(",")
val tp =arr(2)
val jg =arr(3).toFloat
(tp,jg)
转hasmap,在rdd后面跟一个 }).collectAsMap()即可
}).groupByKey().map(x=>{
val tp =x._1 //group分组
var sum =0.0 //求和
for (y<- x._2){
sum+=y//累加同类型的价格
}
val age =sum/x._2.size//计算均值
(tp,age)
}).collectAsMap()//把元组转成hasmap,value就是map值
arr(3)=map.get(tp).toString//取出map中对应的值
arr.mkString(",")//把这个数组用字符串逗号链接然后返回
求商品平均值填空案例
1,AAA,饮料,3.5,武汉
2,BBB,饮料,2.5,武汉
3,CCC,烟酒,3.5,上海
11,OOO,烟酒,5.75,上海
4,DDD,饮料,12.5,上海
5,EEE,饮料,22.5,武汉
6,FFF,烟酒,7.5,上海
11,OOO,饮料,8.5,武汉
7,GGG,饮料,4.5,温州
8,HHH,烟酒,9.5,贵阳
9,JJJ,烟酒,2.5,南宁
10,KKK,饮料,5.5,南宁
package spark
import org.apache.spark.sql.SparkSession
object 均值填充 {
def main(args: Array[String]): Unit = {
val sparkSession =SparkSession.builder().master("local")
.appName("均值填充").getOrCreate()
val sc =sparkSession.sparkContext
val rdd =sc.textFile("src/product.txt")
//1.计算各个类别的均值
val map =rdd.filter(x=>{//过滤器
val arr =x.split(",")
val jg =arr(3)
if(jg.equals("")){//如果为空那就不要了
false
}else{
true
}
}).map(x=>{//按照同类型分组
val arr =x.split(",")
val tp =arr(2)
val jg =arr(3).toFloat
(tp,jg)
}).groupByKey().map(x=>{
val tp =x._1 //group分组
var sum =0.0 //求和
for (y<- x._2){
sum+=y//累加同类型的价格
}
val age =sum/x._2.size//计算均值
(tp,age)
}).collectAsMap()//把元组转成hasmap,value就是map值
//完成空值填充
rdd.map(x=>{
val arr=x.split(",")
val age =arr(3)
val tp =arr(2)//取出商品类型
if (age.equals("")){
arr(3)=map.get(tp).get.toString//取出map中对应的值
}
arr.mkString(",")//把这个数组用字符串逗号链接然后返回
}).saveAsTextFile("data/out1")
sparkSession.stop()
}
}
边栏推荐
猜你喜欢

skynet中一条消息从取出到处理完整流程(源码刨析)

【Unity】编辑器扩展-01-拓展Project视图

【小程序项目开发--京东商城】uni-app之自定义搜索组件(上)-- 组件UI

How on one machine (Windows) to install two MYSQL database

MySQL安装常见报错处理大全
Hematemesis summarizes thirteen experiences to help you create more suitable MySQL indexes

MySQL 视图(详解)

A, MySQL principle of master-slave replication
![[Cloud native] Introduction and use of Feign of microservices](/img/39/05cf7673155954c90e75a8a2eecd96.jpg)
[Cloud native] Introduction and use of Feign of microservices

【RISC-V】risc-v架构学习笔记(架构初学)
随机推荐
期刊会议排名、信息检索网站推荐以及IEEE Latex模板下载
JS中原型和原型链的详细讲解(附代码示例)以及 new关键字具体做了什么的详细讲解
SQL statement knowledge
JSP exception对象简介说明
[MySQL exercises] Chapter 3 Common data types in MySQL
How on one machine (Windows) to install two MYSQL database
Docker-compose安装mysql
JSP config对象的简介说明
【云原生&微服务五】Ribbon负载均衡策略之随机ThreadLocalRandom
SQL连接表(内连接、左连接、右连接、交叉连接、全外连接)
Vue项目通过node连接MySQL数据库并实现增删改查操作
【机器学习】用特征量重要度(feature importance)解释模型靠谱么?怎么才能算出更靠谱的重要度?
MySQL----多表查询
模块化规范
mysql安装教程【安装版】
【小程序项目开发-- 京东商城】uni-app之自定义搜索组件(下) -- 搜索历史
刷题《剑指Offer》day05
MySQL 日期时间类型精确到毫秒
第八章 、接口
Flutter Paystack implements all options