当前位置:网站首页>Spark DF adds a column

Spark DF adds a column

2022-07-06 00:28:00 The south wind knows what I mean

Method 1 : utilize createDataFrame Method , The process of adding new columns is included in building rdd and schema in

val trdd = input.select(targetColumns).rdd.map(x=>{
    
  if (x.get(0).toString().toDouble > critValueR || x.get(0).toString().toDouble < critValueL) 
    Row(x.get(0).toString().toDouble,"F")
  else Row(x.get(0).toString().toDouble,"T")      
  })      
val schema = input.select(targetColumns).schema.add("flag", StringType, true)
val sample3 = ss.createDataFrame(trdd, schema).distinct().withColumnRenamed(targetColumns, "idx")

Method 2 : utilize withColumn Method , The process of adding new columns is included in udf Function

val code :(Int => String) = (arg: Int) => {
    if (arg > critValueR || arg < critValueL) "F" else "T"}
val addCol = udf(code)
val sample3 = input.select(targetColumns).withColumn("flag", addCol(input(targetColumns)))
.withColumnRenamed(targetColumns, "idx")

Method 3 : utilize SQL Code , The process of adding new columns is written directly to SQL In the code

input.select(targetColumns).createOrReplaceTempView("tmp")
val sample3 = ss.sqlContext.sql("select distinct "+targetColname+
    " as idx,case when "+targetColname+">"+critValueR+" then 'F'"+
    " when "+targetColname+"<"+critValueL+" then 'F' else 'T' end as flag from tmp")

Method four : The above three are to add a judged column , If you want to add a unique sequence number , have access to monotonically_increasing_id

// Add sequence number column add a column method 4
import org.apache.spark.sql.functions.monotonically_increasing_id
val inputnew = input.withColumn("idx", monotonically_increasing_id)
原网站

版权声明
本文为[The south wind knows what I mean]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/187/202207060022444336.html