当前位置:网站首页>Spark DF adds a column
Spark DF adds a column
2022-07-25 15:15:00 【The south wind knows what I mean】
List of articles
- Method 1 : utilize createDataFrame Method , The process of adding new columns is included in building rdd and schema in
- Method 2 : utilize withColumn Method , The process of adding new columns is included in udf Function
- Method 3 : utilize SQL Code , The process of adding new columns is written directly to SQL In the code
- Method four : The above three are to add a judged column , If you want to add a unique sequence number , have access to monotonically_increasing_id
Method 1 : utilize createDataFrame Method , The process of adding new columns is included in building rdd and schema in
val trdd = input.select(targetColumns).rdd.map(x=>{
if (x.get(0).toString().toDouble > critValueR || x.get(0).toString().toDouble < critValueL)
Row(x.get(0).toString().toDouble,"F")
else Row(x.get(0).toString().toDouble,"T")
})
val schema = input.select(targetColumns).schema.add("flag", StringType, true)
val sample3 = ss.createDataFrame(trdd, schema).distinct().withColumnRenamed(targetColumns, "idx")
Method 2 : utilize withColumn Method , The process of adding new columns is included in udf Function
val code :(Int => String) = (arg: Int) => {
if (arg > critValueR || arg < critValueL) "F" else "T"}
val addCol = udf(code)
val sample3 = input.select(targetColumns).withColumn("flag", addCol(input(targetColumns)))
.withColumnRenamed(targetColumns, "idx")
Method 3 : utilize SQL Code , The process of adding new columns is written directly to SQL In the code
input.select(targetColumns).createOrReplaceTempView("tmp")
val sample3 = ss.sqlContext.sql("select distinct "+targetColname+
" as idx,case when "+targetColname+">"+critValueR+" then 'F'"+
" when "+targetColname+"<"+critValueL+" then 'F' else 'T' end as flag from tmp")
Method four : The above three are to add a judged column , If you want to add a unique sequence number , have access to monotonically_increasing_id
// Add sequence number column add a column method 4
import org.apache.spark.sql.functions.monotonically_increasing_id
val inputnew = input.withColumn("idx", monotonically_increasing_id)
边栏推荐
- Deployment and simple use of PostgreSQL learning
- mysql heap表_MySQL内存表heap使用总结-九五小庞
- System. Accessviolationexception: an attempt was made to read or write to protected memory. This usually indicates that other memory is corrupted
- Spark sql 常用时间函数
- 记一次Yarn Required executor memeory is above the max threshold(8192MB) of this cluster!
- Automatically set the template for VS2010 and add header comments
- Detailed explanation of lio-sam operation process and code
- Run redis on docker to start in the form of configuration file, and the connection client reports an error: server closed the connection
- Cmake specify opencv version
- 安装EntityFramework方法
猜你喜欢

密码强度验证示例

Idea远程提交spark任务到yarn集群

Unable to start web server when Nacos starts

Yarn: the file yarn.ps1 cannot be loaded because running scripts is prohibited on this system.

Spark 内存管理机制 新版

Gbdt source code analysis of boosting

基于OpenCV和YOLOv3的目标检测实例应用

Visual Studio 2022 查看类关系图
![[Nacos] what does nacosclient do during service registration](/img/76/3c2e8f9ba19e36d9581f34fda65923.png)
[Nacos] what does nacosclient do during service registration

VS2010 add WAP mobile form template
随机推荐
spark分区算子partitionBy、coalesce、repartition
C language function review (pass value and address [binary search], recursion [factorial, Hanoi Tower, etc.))
密码强度验证示例
如何更新更新数据库中的json值?
Instance Tunnel 使用
Cmake specify opencv version
深入:微任务与宏任务
Promise对象与宏任务、微任务
简易轮播图和打地鼠
打开虚拟机时出现VMware Workstation 未能启动 VMware Authorization Service
C, c/s upgrade update
【微信小程序】小程序宿主环境详解
给VS2010自动设置模板,加头注释
ES5写继承的思路
Leetcode combination sum + pruning
SQL Server forcibly disconnects
My creation anniversary
vscode 插件篇收集
Implementation of asynchronous FIFO
pkg_resources动态加载插件