当前位置:网站首页>spark filter
spark filter
2022-07-31 09:02:00 【JAVA becomes a god】
//Filter fields you don't needrdd.filter(x=>{//filterval arr =x.split(",")val jg =arr(3)if(jg.eq("")){//If it is empty then don'tfalse}else{true}})map for grouping
map(x=>{//Group by the same typeval arr =x.split(",")val tp = arr(2)val jg =arr(3).toFloat(tp,jg)Turn to hasmap, followed by a }).collectAsMap() after rdd
}).groupByKey().map(x=>{val tp =x._1 //group groupingvar sum =0.0 //summationfor (y<- x._2){sum+=y//accumulate prices of the same type}val age =sum/x._2.size//Calculate the mean(tp,age)}).collectAsMap()//Convert the tuple to hasmap, the value is the map valuearr(3)=map.get(tp).toString//Remove the corresponding value in the maparr.mkString(",")//Link this array with string commas and returnFill in the blank case for product average value
1,AAA,Beverage,3.5,Wuhan2, BBB, drinks, 2.5, Wuhan3, CCC, Tobacco and Alcohol, 3.5, Shanghai11, OOO, Tobacco and Alcohol, 5.75, Shanghai4, DDD, drinks, 12.5, Shanghai5, EEE, drinks, 22.5, Wuhan6, FFF, Tobacco and Alcohol, 7.5, Shanghai11, OOO, drinks, 8.5, Wuhan7, GGG, drinks, 4.5, Wenzhou8, HHH, Tobacco and Alcohol, 9.5, Guiyang9, JJJ, Tobacco and Alcohol, 2.5, Nanning10, KKK, drinks, 5.5, Nanningpackage sparkimport org.apache.spark.sql.SparkSessionobject mean fill {def main(args: Array[String]): Unit = {val sparkSession =SparkSession.builder().master("local").appName("Mean Fill").getOrCreate()val sc = sparkSession.sparkContextval rdd =sc.textFile("src/product.txt")//1. Calculate the mean of each categoryval map =rdd.filter(x=>{//filterval arr =x.split(",")val jg =arr(3)if(jg.equals("")){//If it is empty then don'tfalse}else{true}}).map(x=>{//Group by the same typeval arr =x.split(",")val tp = arr(2)val jg =arr(3).toFloat(tp,jg)}).groupByKey().map(x=>{val tp =x._1 //group groupingvar sum =0.0 //summationfor (y<- x._2){sum+=y//accumulate prices of the same type}val age =sum/x._2.size//Calculate the mean(tp,age)}).collectAsMap()//Convert the tuple to hasmap, the value is the map value//complete empty value fillingrdd.map(x=>{val arr=x.split(",")val age =arr(3)val tp =arr(2)//Remove item typeif (age.equals("")){arr(3)=map.get(tp).get.toString//Remove the corresponding value in the map}arr.mkString(",")//Link this array with string commas and return}).saveAsTextFile("data/out1")sparkSession.stop()}}边栏推荐
猜你喜欢
随机推荐
【问题记录】TypeError: eval() arg 1 must be a string, bytes or code object
[Yellow ah code] Introduction to MySQL - 3. I use select, the boss directly drives me to take the train home, and I still buy a station ticket
0730~Mysql优化
I advise those juniors and juniors who have just started working: If you want to enter a big factory, you must master these core skills!Complete Learning Route!
vue element form表单规则校验 点击提交后直接报数据库错误,没有显示错误信息
重装系统后,hosts文件配置后不生效
Aleo Testnet3规划大纲
spark过滤器
如何在 Linux 上安装 MySQL
哪些字符串会被FastJson解析为null呢
Hematemesis summarizes thirteen experiences to help you create more suitable MySQL indexes
基于golang的swagger超贴心、超详细使用指南【有很多坑】
【小程序项目开发-- 京东商城】uni-app之商品列表页面 (上)
【云原生&微服务五】Ribbon负载均衡策略之随机ThreadLocalRandom
【MySQL功法】第4话 · 和kiko一起探索MySQL中的运算符
文件的逻辑结构与物理结构的对比与区别
【小程序项目开发--京东商城】uni-app之自定义搜索组件(上)-- 组件UI
手写promise
多版本node的安装与切换详细操作
SQL连接表(内连接、左连接、右连接、交叉连接、全外连接)







![[Cloud native and 5G] Microservices support 5G core network](/img/c9/4ccacd1e70285c2ceb50c324e5018c.png)

