当前位置:网站首页>pyspark --- 统计多列的众数并一次返回
pyspark --- 统计多列的众数并一次返回
2022-08-03 05:29:00 【WGS.】
df = ss.createDataFrame([{
'newid': 'DDD1', 'time_h': 10, 'city': '北京', 'model': '华为'},
{
'newid': 'DDD1', 'time_h': 10, 'city': '北京', 'model': '华为'},
{
'newid': 'DDD1', 'time_h': 20, 'city': '北京', 'model': '小米'},
{
'newid': 'DDD1', 'time_h': 2, 'city': '上海', 'model': '苹果'},
{
'newid': 'DDD1', 'time_h': 3, 'city': '青岛', 'model': '华为'},
{
'newid': 'www1', 'time_h': 20, 'city': '青岛', 'model': '华为'}])\
.select(*['newid', 'city', 'model', 'time_h'])
df.show()
df.createOrReplaceTempView('info')
sqlcommn = "select {}, count(*) as `cnt` from info group by {} having count(*) >= (select max(count) from (select count(*) count from info group by {}) c)"
strsql = """ select city, model, time_h from ({}) sub_city left join ({}) sub_model on 1=1 left join ({}) sub_time_h on 1=1 """.format(
sqlcommn.format('city', 'city', 'city'),
sqlcommn.format('model', 'model', 'model'),
sqlcommn.format('time_h', 'time_h', 'time_h')
)
df = ss.sql(strsql)
df.show()
all_modes_dict = {
}
cols = ['city', 'model', 'time_h']
modeCol = ['city_mode', 'model_mode', 'time_h_mode']
for c in modeCol:
all_modes_dict[c] = []
for row in df.select(['city', 'model', 'time_h']).collect():
for mc in range(len(modeCol)):
all_modes_dict.get(modeCol[mc]).append(row[cols[mc]])
for k, v in all_modes_dict.items():
all_modes_dict[k] = set(v)
# 只取一个
# all_modes_dict[k] = list(set(v))[0]
all_modes_dict
global all_modes_dict_bc
all_modes_dict_bc = sc.broadcast(all_modes_dict)
+-----+----+-----+------+
|newid|city|model|time_h|
+-----+----+-----+------+
| DDD1|北京| 华为| 10|
| DDD1|北京| 华为| 10|
| DDD1|北京| 小米| 20|
| DDD1|上海| 苹果| 2|
| DDD1|青岛| 华为| 3|
| www1|青岛| 华为| 20|
+-----+----+-----+------+
+----+-----+------+
|city|model|time_h|
+----+-----+------+
|北京| 华为| 10|
|北京| 华为| 20|
+----+-----+------+
{'city_mode': {'北京'}, 'model_mode': {'华为'}, 'time_h_mode': {10, 20}}
udf方式填充众数
spark版本如果是2.4的话,多次调用这个sql和udf方式填充众数会报错,可考虑用fillna方式
# UserDefinedFunction
def deal_na_udf(row_col, modekey):
if row_col is None or row_col == '':
row_col = all_modes_dict_bc.value[modekey]
return row_col
ludf = UserDefinedFunction(lambda row_col, modekey: deal_na_udf(row_col, modekey))
# # lambda udf
# ludf = fn.udf(lambda row_col, modekey: all_modes_dict_bc.value[modekey] if row_col is None or row_col == '' else row_col)
for i in range(len(colnames)):
df = df.withColumn(colnames[i], ludf(fn.col(colnames[i]), fn.lit(modeCol[i])))
finall方式填充众数
# 将 空串 替换为 null
# # 方法1
# for column in df.columns:
# trimmed = fn.trim(fn.col(column))
# df = df.withColumn(column, fn.when(fn.length(trimmed) != 0, trimmed).otherwise(None))
# 方法2
df = df.replace(to_replace='', value=None, subset=['time_h', 'model', 'city'])
for i in range(len(colnames)):
df = df.fillna({
colnames[i]: all_modes_dict_bc.value[modeCol[i]]})
还可以以词频统计的方式:
rdds = df.select('city').rdd.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)\
.sortBy(lambda x: x[1], False).take(3)
# examples: [(Row(city=None), 2), (Row(city='北京'), 2), (Row(city='杭州'), 1)]
边栏推荐
- process.env环境变量配置方式(配置环境变量区分开发环境和生产环境)
- Zabbix历史数据清理(保留以往每个项目每天一条数据)
- 计算机网络高频面试考点
- 【Personal summary】Key points of MES system development/management
- Oracle 11g silent install
- MySql的安装配置超详细教程与简单的建库建表方法
- 【干货分享】PCB 板变形原因!不看不知道
- Oracle数据文件收缩_最佳实践_超简单方法
- AR路由器如何配置Portal认证(二层网络)
- contos install php-ffmpeg and tp5.1 using plugin
猜你喜欢

Prometheus monitors container, pod, email alerts

ESXI中损坏虚拟机数据如何找回

Use of Alibaba Cloud SMS Service (create, test notes)

【OpenStack云平台】搭建openstack云平台

【项目案例】配置小型网络WLAN基本业务示例

Podman一篇就学会

TFS (Azure conversation) prohibit people checked out at the same time

计算机网络高频面试考点

【DIoU CIoU】DIoU和CIoU损失函数理解及代码实现

Prometheus监控容器、pod、邮件告警
随机推荐
Oracle 11g silent install
【IoU loss】IoU损失函数理解
MySQL的安装(详细教程)
Oracle数据文件收缩_最佳实践_超简单方法
Command errored out with exit status 1类似问题解决方案
记一次postgresql中使用正则表达式
一家可靠的HDI板厂,需要具备哪些基本条件?
【dllogger bug】AttributeError: module ‘dllogger‘ has no attribute ‘StdOutBackend‘
Prometheus monitors container, pod, email alerts
linux安装redis
SQL——左连接(Left join)、右连接(Right join)、内连接(Inner join)
JUC并发编程深入浅出!
MySQL的on duplicate key update 的使用
Cesium加载离线地图和离线地形
【经验分享】配置用户通过Console口登录设备示例
mysql的配置文件(my.ini或者 my.cnf)所在位置
MySQL的 DDL和DML和DQL的基本语法
ESXI中损坏虚拟机数据如何找回
torch.nn.modules.activation.ReLU is not a Module subclass
MySQL master-slave replication