当前位置:网站首页>pyspark --- 统计多列的众数并一次返回
pyspark --- 统计多列的众数并一次返回
2022-08-03 05:29:00 【WGS.】
df = ss.createDataFrame([{
'newid': 'DDD1', 'time_h': 10, 'city': '北京', 'model': '华为'},
{
'newid': 'DDD1', 'time_h': 10, 'city': '北京', 'model': '华为'},
{
'newid': 'DDD1', 'time_h': 20, 'city': '北京', 'model': '小米'},
{
'newid': 'DDD1', 'time_h': 2, 'city': '上海', 'model': '苹果'},
{
'newid': 'DDD1', 'time_h': 3, 'city': '青岛', 'model': '华为'},
{
'newid': 'www1', 'time_h': 20, 'city': '青岛', 'model': '华为'}])\
.select(*['newid', 'city', 'model', 'time_h'])
df.show()
df.createOrReplaceTempView('info')
sqlcommn = "select {}, count(*) as `cnt` from info group by {} having count(*) >= (select max(count) from (select count(*) count from info group by {}) c)"
strsql = """ select city, model, time_h from ({}) sub_city left join ({}) sub_model on 1=1 left join ({}) sub_time_h on 1=1 """.format(
sqlcommn.format('city', 'city', 'city'),
sqlcommn.format('model', 'model', 'model'),
sqlcommn.format('time_h', 'time_h', 'time_h')
)
df = ss.sql(strsql)
df.show()
all_modes_dict = {
}
cols = ['city', 'model', 'time_h']
modeCol = ['city_mode', 'model_mode', 'time_h_mode']
for c in modeCol:
all_modes_dict[c] = []
for row in df.select(['city', 'model', 'time_h']).collect():
for mc in range(len(modeCol)):
all_modes_dict.get(modeCol[mc]).append(row[cols[mc]])
for k, v in all_modes_dict.items():
all_modes_dict[k] = set(v)
# 只取一个
# all_modes_dict[k] = list(set(v))[0]
all_modes_dict
global all_modes_dict_bc
all_modes_dict_bc = sc.broadcast(all_modes_dict)
+-----+----+-----+------+
|newid|city|model|time_h|
+-----+----+-----+------+
| DDD1|北京| 华为| 10|
| DDD1|北京| 华为| 10|
| DDD1|北京| 小米| 20|
| DDD1|上海| 苹果| 2|
| DDD1|青岛| 华为| 3|
| www1|青岛| 华为| 20|
+-----+----+-----+------+
+----+-----+------+
|city|model|time_h|
+----+-----+------+
|北京| 华为| 10|
|北京| 华为| 20|
+----+-----+------+
{'city_mode': {'北京'}, 'model_mode': {'华为'}, 'time_h_mode': {10, 20}}
udf方式填充众数
spark版本如果是2.4的话,多次调用这个sql和udf方式填充众数会报错,可考虑用fillna方式
# UserDefinedFunction
def deal_na_udf(row_col, modekey):
if row_col is None or row_col == '':
row_col = all_modes_dict_bc.value[modekey]
return row_col
ludf = UserDefinedFunction(lambda row_col, modekey: deal_na_udf(row_col, modekey))
# # lambda udf
# ludf = fn.udf(lambda row_col, modekey: all_modes_dict_bc.value[modekey] if row_col is None or row_col == '' else row_col)
for i in range(len(colnames)):
df = df.withColumn(colnames[i], ludf(fn.col(colnames[i]), fn.lit(modeCol[i])))
finall方式填充众数
# 将 空串 替换为 null
# # 方法1
# for column in df.columns:
# trimmed = fn.trim(fn.col(column))
# df = df.withColumn(column, fn.when(fn.length(trimmed) != 0, trimmed).otherwise(None))
# 方法2
df = df.replace(to_replace='', value=None, subset=['time_h', 'model', 'city'])
for i in range(len(colnames)):
df = df.fillna({
colnames[i]: all_modes_dict_bc.value[modeCol[i]]})
还可以以词频统计的方式:
rdds = df.select('city').rdd.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)\
.sortBy(lambda x: x[1], False).take(3)
# examples: [(Row(city=None), 2), (Row(city='北京'), 2), (Row(city='杭州'), 1)]
边栏推荐
- 2021-06-14
- Zabbix历史数据清理(保留以往每个项目每天一条数据)
- mysql事务与多版本并发控制
- Composer require 报错 Installation failed, reverting ./composer.json and ./composer.lock to their ...
- 使用Powershell批量导入Task
- prometheus 监控mysql数据库
- Use of Alibaba Cloud SMS Service (create, test notes)
- 【DIoU CIoU】DIoU和CIoU损失函数理解及代码实现
- linux安装mysql
- 5 个开源的 Rust Web 开发框架,你选择哪个?
猜你喜欢
随机推荐
PCB 多层板为什么都是偶数层?
【应届生租房】应届生如何租房以及注意事项
【干货分享】PCB 板变形原因!不看不知道
【IoU loss】IoU损失函数理解
【nohup】nohup命令的简单使用
502 bad gateway原因、解决方法
Oracle 11g silent install
单节点部署 gpmall 商城系统(二)
sql中 exists的用法
MySQL中的行锁
JUC并发编程深入浅出!
【EA Price strategy OC1】以实时价格为依据的EA,首月翻仓!】
ORM框架:Dapper的使用
C#操作FTP上传文件后检查上传正确性
我的Go+语言初体验——祝福留言小系统,让她也可以感受到你的祝福
Scala 高阶(七):集合内容汇总(上篇)
MySQL的 DDL和DML和DQL的基本语法
TFS(Azure DevOps)禁止多人同时签出
Cesium加载离线地图和离线地形
在Zabbix5.4上使用ODBC监控Oracle数据库









