当前位置:网站首页>pyspark --- count the mode of multiple columns and return it at once
pyspark --- count the mode of multiple columns and return it at once
2022-08-03 07:02:00 【WGS.】
df = ss.createDataFrame([{
'newid': 'DDD1', 'time_h': 10, 'city': '北京', 'model': '华为'},
{
'newid': 'DDD1', 'time_h': 10, 'city': '北京', 'model': '华为'},
{
'newid': 'DDD1', 'time_h': 20, 'city': '北京', 'model': '小米'},
{
'newid': 'DDD1', 'time_h': 2, 'city': '上海', 'model': '苹果'},
{
'newid': 'DDD1', 'time_h': 3, 'city': '青岛', 'model': '华为'},
{
'newid': 'www1', 'time_h': 20, 'city': '青岛', 'model': '华为'}])\
.select(*['newid', 'city', 'model', 'time_h'])
df.show()
df.createOrReplaceTempView('info')
sqlcommn = "select {}, count(*) as `cnt` from info group by {} having count(*) >= (select max(count) from (select count(*) count from info group by {}) c)"
strsql = """ select city, model, time_h from ({}) sub_city left join ({}) sub_model on 1=1 left join ({}) sub_time_h on 1=1 """.format(
sqlcommn.format('city', 'city', 'city'),
sqlcommn.format('model', 'model', 'model'),
sqlcommn.format('time_h', 'time_h', 'time_h')
)
df = ss.sql(strsql)
df.show()
all_modes_dict = {
}
cols = ['city', 'model', 'time_h']
modeCol = ['city_mode', 'model_mode', 'time_h_mode']
for c in modeCol:
all_modes_dict[c] = []
for row in df.select(['city', 'model', 'time_h']).collect():
for mc in range(len(modeCol)):
all_modes_dict.get(modeCol[mc]).append(row[cols[mc]])
for k, v in all_modes_dict.items():
all_modes_dict[k] = set(v)
# 只取一个
# all_modes_dict[k] = list(set(v))[0]
all_modes_dict
global all_modes_dict_bc
all_modes_dict_bc = sc.broadcast(all_modes_dict)
+-----+----+-----+------+
|newid|city|model|time_h|
+-----+----+-----+------+
| DDD1|北京| 华为| 10|
| DDD1|北京| 华为| 10|
| DDD1|北京| 小米| 20|
| DDD1|上海| 苹果| 2|
| DDD1|青岛| 华为| 3|
| www1|青岛| 华为| 20|
+-----+----+-----+------+
+----+-----+------+
|city|model|time_h|
+----+-----+------+
|北京| 华为| 10|
|北京| 华为| 20|
+----+-----+------+
{'city_mode': {'北京'}, 'model_mode': {'华为'}, 'time_h_mode': {10, 20}}
udfway to fill the mode
spark版本如果是2.4的话,多次调用这个sql和udfAn error will be reported if the mode is filled in the mode,可考虑用fillna方式
# UserDefinedFunction
def deal_na_udf(row_col, modekey):
if row_col is None or row_col == '':
row_col = all_modes_dict_bc.value[modekey]
return row_col
ludf = UserDefinedFunction(lambda row_col, modekey: deal_na_udf(row_col, modekey))
# # lambda udf
# ludf = fn.udf(lambda row_col, modekey: all_modes_dict_bc.value[modekey] if row_col is None or row_col == '' else row_col)
for i in range(len(colnames)):
df = df.withColumn(colnames[i], ludf(fn.col(colnames[i]), fn.lit(modeCol[i])))
finallway to fill the mode
# 将 空串 替换为 null
# # 方法1
# for column in df.columns:
# trimmed = fn.trim(fn.col(column))
# df = df.withColumn(column, fn.when(fn.length(trimmed) != 0, trimmed).otherwise(None))
# 方法2
df = df.replace(to_replace='', value=None, subset=['time_h', 'model', 'city'])
for i in range(len(colnames)):
df = df.fillna({
colnames[i]: all_modes_dict_bc.value[modeCol[i]]})
It can also be in the form of word frequency statistics:
rdds = df.select('city').rdd.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)\
.sortBy(lambda x: x[1], False).take(3)
# examples: [(Row(city=None), 2), (Row(city='北京'), 2), (Row(city='杭州'), 1)]
边栏推荐
猜你喜欢
随机推荐
2021-06-15
IPV4地址详解
IDEA连接mysql又报错!Server returns invalid timezone. Go to ‘Advanced‘ tab and set ‘serverTimezone‘ prope
在Zabbix5.4上使用ODBC监控Oracle数据库
单节点部署 gpmall 商城系统(二)
MySQL的安装教程(嗷嗷详细,包教包会~)
【云原生 · Kubernetes】Kubernetes简介及基本组件
mysql 数据去重的三种方式[实战]
Redis哨兵模式+过期策略、淘汰策略、读写策略
PCB 多层板为什么都是偶数层?
【OpenStack云平台】搭建openstack云平台
JUC并发编程深入浅出!
一文读懂PCB品质体系认证
Mysql去除重复数据
2021年PHP-Laravel面试题问卷题 答案记录
【DIoU CIoU】DIoU和CIoU损失函数理解及代码实现
Scala 基础 (三):运算符和流程控制
TFS(Azure DevOps)禁止多人同时签出
cnpm的安装与使用
Charles抓包显示<unknown>解决方案