当前位置:网站首页>Spark DF adds a column
Spark DF adds a column
2022-07-25 15:15:00 【The south wind knows what I mean】
List of articles
- Method 1 : utilize createDataFrame Method , The process of adding new columns is included in building rdd and schema in
- Method 2 : utilize withColumn Method , The process of adding new columns is included in udf Function
- Method 3 : utilize SQL Code , The process of adding new columns is written directly to SQL In the code
- Method four : The above three are to add a judged column , If you want to add a unique sequence number , have access to monotonically_increasing_id
Method 1 : utilize createDataFrame Method , The process of adding new columns is included in building rdd and schema in
val trdd = input.select(targetColumns).rdd.map(x=>{
if (x.get(0).toString().toDouble > critValueR || x.get(0).toString().toDouble < critValueL)
Row(x.get(0).toString().toDouble,"F")
else Row(x.get(0).toString().toDouble,"T")
})
val schema = input.select(targetColumns).schema.add("flag", StringType, true)
val sample3 = ss.createDataFrame(trdd, schema).distinct().withColumnRenamed(targetColumns, "idx")
Method 2 : utilize withColumn Method , The process of adding new columns is included in udf Function
val code :(Int => String) = (arg: Int) => {
if (arg > critValueR || arg < critValueL) "F" else "T"}
val addCol = udf(code)
val sample3 = input.select(targetColumns).withColumn("flag", addCol(input(targetColumns)))
.withColumnRenamed(targetColumns, "idx")
Method 3 : utilize SQL Code , The process of adding new columns is written directly to SQL In the code
input.select(targetColumns).createOrReplaceTempView("tmp")
val sample3 = ss.sqlContext.sql("select distinct "+targetColname+
" as idx,case when "+targetColname+">"+critValueR+" then 'F'"+
" when "+targetColname+"<"+critValueL+" then 'F' else 'T' end as flag from tmp")
Method four : The above three are to add a judged column , If you want to add a unique sequence number , have access to monotonically_increasing_id
// Add sequence number column add a column method 4
import org.apache.spark.sql.functions.monotonically_increasing_id
val inputnew = input.withColumn("idx", monotonically_increasing_id)
边栏推荐
- 解决asp.net上传文件时文件太大导致的错误
- 打开虚拟机时出现VMware Workstation 未能启动 VMware Authorization Service
- 从 join on 和 where 执行顺序认识T-sql查询执行顺序
- npm的nexus私服 E401 E500错误处理记录
- Spark-SQL UDF函数
- pkg_resources动态加载插件
- pl/sql 创建并执行oralce存储过程,并返回结果集
- [Android] recyclerview caching mechanism, is it really difficult to understand? What level of cache is it?
- Share a department design method that avoids recursion
- "How to use" decorator mode
猜你喜欢

npm的nexus私服 E401 E500错误处理记录

How much memory can a program use at most?

Overview of JS synchronous, asynchronous, macro task and micro task

Vs2010添加wap移动窗体模板

Spark AQE

Yan required executor memory is above the max threshold (8192mb) of this cluster!

Spark 内存管理机制 新版

NPM's nexus private server e401 E500 error handling record

oracle_12505错误解决方法

VS2010 add WAP mobile form template
随机推荐
args参数解析
spark中saveAsTextFile如何最终生成一个文件
记一次Spark foreachPartition导致OOM
密码强度验证示例
Maxcompute SQL 的查询结果条数受限1W
Implementation of asynchronous FIFO
Hbck 修复问题
Gbdt source code analysis of boosting
剑指Offer | 二进制中1的个数
Deployment and simple use of PostgreSQL learning
redis淘汰策列
My creation anniversary
用OpenPose进行单个或多个人体姿态估计
请问seata中mysql参数每个客户端连接最大的错误允许数量要怎么理解呢?
了解一下new的过程发生了什么
mysql heap表_MySQL内存表heap使用总结-九五小庞
从 join on 和 where 执行顺序认识T-sql查询执行顺序
处理ORACLE死锁
Process control (Part 1)
记一次Yarn Required executor memeory is above the max threshold(8192MB) of this cluster!