2022-08-03 07:02:00 WGS.

df = ss.createDataFrame([{
    'newid': 'DDD1', 'time_h': 10, 'city': '北京', 'model': '华为'}, 
    'newid': 'DDD1', 'time_h': 20, 'city': '北京', 'model': '小米'},
    'newid': 'DDD1', 'time_h': 2, 'city': '上海', 'model': '苹果'},
    'newid': 'DDD1', 'time_h': 3, 'city': '青岛', 'model': '华为'},
    'newid': 'www1', 'time_h': 20, 'city': '青岛', 'model': '华为'}])\
                .select(*['newid', 'city', 'model', 'time_h'])


sqlcommn = "select {}, count(*) as `cnt` from info group by {} having count(*) >= (select max(count) from (select count(*) count from info group by {}) c)"

strsql = """ select city, model, time_h from ({}) sub_city left join ({}) sub_model on 1=1 left join ({}) sub_time_h on 1=1 """.format(
    sqlcommn.format('city', 'city', 'city'),
    sqlcommn.format('model', 'model', 'model'),
    sqlcommn.format('time_h', 'time_h', 'time_h')

df = ss.sql(strsql)

all_modes_dict = {
cols = ['city', 'model', 'time_h']
modeCol = ['city_mode', 'model_mode', 'time_h_mode']
for c in modeCol:
    all_modes_dict[c] = []
for row in df.select(['city', 'model', 'time_h']).collect():
    for mc in range(len(modeCol)):
for k, v in all_modes_dict.items():
    all_modes_dict[k] = set(v)
    # 只取一个
    # all_modes_dict[k] = list(set(v))[0]


global all_modes_dict_bc
all_modes_dict_bc = sc.broadcast(all_modes_dict)
| DDD1|北京| 华为|    10|
| DDD1|北京| 小米|    20|
| DDD1|上海| 苹果|     2|
| DDD1|青岛| 华为|     3|
| www1|青岛| 华为|    20|

|北京| 华为|    10|
|北京| 华为|    20|

{'city_mode': {'北京'}, 'model_mode': {'华为'}, 'time_h_mode': {10, 20}}

udfway to fill the mode

spark版本如果是2.4的话,多次调用这个sql和udfAn error will be reported if the mode is filled in the mode,可考虑用fillna方式

# UserDefinedFunction
def deal_na_udf(row_col, modekey):
    if row_col is None or row_col == '':
        row_col = all_modes_dict_bc.value[modekey]
    return row_col
ludf = UserDefinedFunction(lambda row_col, modekey: deal_na_udf(row_col, modekey))

# # lambda udf
# ludf = fn.udf(lambda row_col, modekey: all_modes_dict_bc.value[modekey] if row_col is None or row_col == '' else row_col)

for i in range(len(colnames)):
    df = df.withColumn(colnames[i], ludf(fn.col(colnames[i]), fn.lit(modeCol[i])))

finallway to fill the mode

# 将 空串 替换为 null
# # 方法1
# for column in df.columns:
# trimmed = fn.trim(fn.col(column))
# df = df.withColumn(column, fn.when(fn.length(trimmed) != 0, trimmed).otherwise(None))

# 方法2
df = df.replace(to_replace='', value=None, subset=['time_h', 'model', 'city'])

for i in range(len(colnames)):
	df = df.fillna({
    colnames[i]: all_modes_dict_bc.value[modeCol[i]]})

It can also be in the form of word frequency statistics:

rdds = df.select('city').rdd.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)\
        .sortBy(lambda x: x[1], False).take(3)
# examples: [(Row(city=None), 2), (Row(city='北京'), 2), (Row(city='杭州'), 1)]
