当前位置:网站首页>pyspark --- 统计多列的众数并一次返回
pyspark --- 统计多列的众数并一次返回
2022-08-03 05:29:00 【WGS.】
df = ss.createDataFrame([{
'newid': 'DDD1', 'time_h': 10, 'city': '北京', 'model': '华为'},
{
'newid': 'DDD1', 'time_h': 10, 'city': '北京', 'model': '华为'},
{
'newid': 'DDD1', 'time_h': 20, 'city': '北京', 'model': '小米'},
{
'newid': 'DDD1', 'time_h': 2, 'city': '上海', 'model': '苹果'},
{
'newid': 'DDD1', 'time_h': 3, 'city': '青岛', 'model': '华为'},
{
'newid': 'www1', 'time_h': 20, 'city': '青岛', 'model': '华为'}])\
.select(*['newid', 'city', 'model', 'time_h'])
df.show()
df.createOrReplaceTempView('info')
sqlcommn = "select {}, count(*) as `cnt` from info group by {} having count(*) >= (select max(count) from (select count(*) count from info group by {}) c)"
strsql = """ select city, model, time_h from ({}) sub_city left join ({}) sub_model on 1=1 left join ({}) sub_time_h on 1=1 """.format(
sqlcommn.format('city', 'city', 'city'),
sqlcommn.format('model', 'model', 'model'),
sqlcommn.format('time_h', 'time_h', 'time_h')
)
df = ss.sql(strsql)
df.show()
all_modes_dict = {
}
cols = ['city', 'model', 'time_h']
modeCol = ['city_mode', 'model_mode', 'time_h_mode']
for c in modeCol:
all_modes_dict[c] = []
for row in df.select(['city', 'model', 'time_h']).collect():
for mc in range(len(modeCol)):
all_modes_dict.get(modeCol[mc]).append(row[cols[mc]])
for k, v in all_modes_dict.items():
all_modes_dict[k] = set(v)
# 只取一个
# all_modes_dict[k] = list(set(v))[0]
all_modes_dict
global all_modes_dict_bc
all_modes_dict_bc = sc.broadcast(all_modes_dict)
+-----+----+-----+------+
|newid|city|model|time_h|
+-----+----+-----+------+
| DDD1|北京| 华为| 10|
| DDD1|北京| 华为| 10|
| DDD1|北京| 小米| 20|
| DDD1|上海| 苹果| 2|
| DDD1|青岛| 华为| 3|
| www1|青岛| 华为| 20|
+-----+----+-----+------+
+----+-----+------+
|city|model|time_h|
+----+-----+------+
|北京| 华为| 10|
|北京| 华为| 20|
+----+-----+------+
{'city_mode': {'北京'}, 'model_mode': {'华为'}, 'time_h_mode': {10, 20}}
udf方式填充众数
spark版本如果是2.4的话,多次调用这个sql和udf方式填充众数会报错,可考虑用fillna方式
# UserDefinedFunction
def deal_na_udf(row_col, modekey):
if row_col is None or row_col == '':
row_col = all_modes_dict_bc.value[modekey]
return row_col
ludf = UserDefinedFunction(lambda row_col, modekey: deal_na_udf(row_col, modekey))
# # lambda udf
# ludf = fn.udf(lambda row_col, modekey: all_modes_dict_bc.value[modekey] if row_col is None or row_col == '' else row_col)
for i in range(len(colnames)):
df = df.withColumn(colnames[i], ludf(fn.col(colnames[i]), fn.lit(modeCol[i])))
finall方式填充众数
# 将 空串 替换为 null
# # 方法1
# for column in df.columns:
# trimmed = fn.trim(fn.col(column))
# df = df.withColumn(column, fn.when(fn.length(trimmed) != 0, trimmed).otherwise(None))
# 方法2
df = df.replace(to_replace='', value=None, subset=['time_h', 'model', 'city'])
for i in range(len(colnames)):
df = df.fillna({
colnames[i]: all_modes_dict_bc.value[modeCol[i]]})
还可以以词频统计的方式:
rdds = df.select('city').rdd.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)\
.sortBy(lambda x: x[1], False).take(3)
# examples: [(Row(city=None), 2), (Row(city='北京'), 2), (Row(city='杭州'), 1)]
边栏推荐
猜你喜欢
Servlet详解含实例
【DIoU CIoU】DIoU和CIoU损失函数理解及代码实现
Content type ‘applicationx-www-form-urlencoded;charset=UTF-8‘ not supported“【已解决】
2021-06-14
ES 中时间日期类型 “yyyy-MM-dd HHmmss” 的完全避坑指南
nvm 卸载详细流程
国内首款PCB资料分析软件,华秋DFM使用介绍
计算机网络高频面试考点
PCB板上的字母代表哪些元器件?一文看全!
TFS (Azure conversation) prohibit people checked out at the same time
随机推荐
Use of Alibaba Cloud SMS Service (create, test notes)
Oracle 11g silent install
contos install php-ffmpeg and tp5.1 using plugin
mysql事务与多版本并发控制
SQLServer2019安装(Windows)
记一次postgresql中使用正则表达式
Redis哨兵模式+过期策略、淘汰策略、读写策略
RADIUS计费认证如何配置?这篇文章一步一步教你完成
TFS (Azure conversation) prohibit people checked out at the same time
Scala 高阶(七):集合内容汇总(上篇)
计算机网络高频面试考点
Mysql去除重复数据
linux安装redis
VS Project Configuration Manager
【YOLOv3 SPP 数据集准备】YOLOv3 SPP数据集准备代码理解
ORM框架:Dapper的使用
微信小程序 - 监听 TabBar 切换点击事件
IDEA连接mysql又报错!Server returns invalid timezone. Go to ‘Advanced‘ tab and set ‘serverTimezone‘ prope
SQLSERVER将子查询数据合并拼接成一个字段
MySQL的安装(详细教程)