当前位置:网站首页>Spark特征工程-归一化 和 分桶
Spark特征工程-归一化 和 分桶
2022-08-02 03:28:00 【Code_LT】
归一化 和 分桶
val double2vec: UserDefinedFunction = udf {
(value: Double) => org.apache.spark.ml.linalg.Vectors.dense(value) }
def ratingFeatures(samples:DataFrame): Unit ={
samples.printSchema()
samples.show(10)
//利用打分表ratings计算电影的平均分、被打分次数等数值型特征
val movieFeatures = samples.groupBy(col("movieId"))
.agg(count(lit(1)).as("ratingCount"),
avg(col("rating")).as("avgRating"),
variance(col("rating")).as("ratingVar"))
.withColumn("avgRatingVec", double2vec(col("avgRating")))
movieFeatures.show(10)
//分桶处理,创建QuantileDiscretizer进行分桶,将打分次数这一特征分到100个桶中
val ratingCountDiscretizer = new QuantileDiscretizer()
.setInputCol("ratingCount")
.setOutputCol("ratingCountBucket")
.setNumBuckets(100)
//归一化处理,创建MinMaxScaler进行归一化,将平均得分进行归一化
val ratingScaler = new MinMaxScaler()
.setInputCol("avgRatingVec")
.setOutputCol("scaleAvgRating")
//创建一个pipeline,依次执行两个特征处理过程
val pipelineStage: Array[PipelineStage] = Array(ratingCountDiscretizer, ratingScaler)
val featurePipeline = new Pipeline().setStages(pipelineStage)
val movieProcessedFeatures = featurePipeline.fit(movieFeatures).transform(movieFeatures)
//打印最终结果
movieProcessedFeatures.show(10)
注释:
spark的vector深入浅出
Spark MLIB的Normalizer、StandardScaler、MinMaxScaler、RobustScaler
Spark MLIB的Normalizer、StandardScaler、MinMaxScaler-2
分桶
Bucketizer
Bucketizer将连续的特征列转换成特征桶(buckets)列。这些桶由用户指定。它拥有一个splits参数。 例如商城的人群,觉得把人分为50以上和50以下太不精准了,应该分为20岁以下,20-30岁,30-40岁,36-50岁,50以上,那么就得用到数值离散化的处理方法了。
splits:如果有n+1个splits,那么将有n个桶。桶将由split x和split y共同确定,它的值范围为[x,y),如果是最后 一个桶,范围将是[x,y]。splits应该严格递增。负无穷和正无穷必须明确的提供用来覆盖所有的双精度值,否则,超出splits的值将会被 认为是一个错误。
注意,如果你并不知道目标列的上界和下界,你应该添加Double.NegativeInfinity和Double.PositiveInfinity作为边界从而防止潜在的 超过边界的异常。下面是程序调用的例子
object BucketizerDemo {
def main(args: Array[String]): Unit = {
var spark = SparkSession.builder().appName("BucketizerDemo").master("local[2]").getOrCreate();
val array = Array((1,13.0),(2,16.0),(3,23.0),(4,35.0),(5,56.0),(6,44.0))
//将数组转为DataFrame
val df = spark.createDataFrame(array).toDF("id","age")
// 设定边界,分为5个年龄组:[0,20),[20,30),[30,40),[40,50),[50,正无穷)
// 注:人的年龄当然不可能正无穷,这里演示正无穷PositiveInfinity的用法,负无穷是NegativeInfinity。
val splits = Array(0, 20, 30, 40, 50, Double.PositiveInfinity)
//初始化Bucketizer对象并进行设定:setSplits是设置我们的划分依据
val bucketizer = new Bucketizer().setSplits(splits).setInputCol("age").setOutputCol("bucketizer_feature")
//transform方法将DataFrame二值化。
val bucketizerdf = bucketizer.transform(df)
//show是用于展示结果
bucketizerdf.show
}
}
+---+----+------------------+
| id| age|bucketizer_feature|
+---+----+------------------+
| 1|13.0| 0.0|
| 2|16.0| 0.0|
| 3|23.0| 1.0|
| 4|35.0| 2.0|
| 5|56.0| 4.0|
| 6|44.0| 3.0|
+---+----+------------------+
QuantileDiscretizer
QuantileDiscretizer:按分位数,对给出的数据列进行离散化分箱处理。
和Bucketizer(分箱处理)一样也是:将连续数值特征转换为离散类别特征。实际上Class QuantileDiscretizer extends Bucketizer。
- 参数1:不同的是这里不再自己定义splits(分类标准),而是定义分几箱(段)就可以了。QuantileDiscretizer自己调用函数计算分位数,并完成离散化。
- 参数2: 另外一个参数是精度,如果设置为0,则计算最精确的分位数,这是一个高时间代价的操作,且上下边界将设置为正负无穷,覆盖所有实数范围。
分位数(Quantile),亦称分位点,是指将一个随机变量的概率分布范围分为几个等份的数值点,常用的有中位数(即二分位数)、四分位数、百分位数等。
package com.home.spark.ml
import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.QuantileDiscretizer
import org.apache.spark.sql.SparkSession
/** * @Description: 分位数离散器 * QuantileDiscretizer接收具有连续特征的列,并输出具有合并分类特征的列。按分位数,对给出的数据列进行离散化分箱处理。 * 箱数由numBuckets参数设置。 * 例如,如果输入的不同值太少而无法创建足够的不同分位数,则所使用的存储桶的数量可能会小于该值。 * * NaN值:在QuantileDiscretizer拟合过程中,将从柱中除去NaN值。这将产生一个Bucketizer模型进行预测。 * 在转换期间,Bucketizer在数据集中找到NaN值时将引发错误,但是用户也可以通过设置handleInvalid选择保留还是删除数据集中的NaN值。 * 如果用户选择保留NaN值,则将对其进行特殊处理并将其放入自己的存储桶中, * 例如,如果使用4个存储桶,则将非NaN数据放入存储桶[0-3]中,但NaN将被存储放在一个特殊的桶中[4]。 * * 算法:分箱范围是使用近似算法选择的(有关详细说明,请参见aboutQuantile的文档)。 * 可以使用relativeError参数控制近似精度。设置为零时,将计算精确的分位数(注意:计算精确的分位数是一项昂贵的操作)。 * 分箱的上下边界将是-Infinity和+ Infinity,覆盖所有实数值。 * **/
object Ex_QuantileDiscretizer {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf(true).setMaster("local[2]").setAppName("spark ml")
val spark = SparkSession.builder().config(conf).getOrCreate()
val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
val df = spark.createDataFrame(data).toDF("id", "hour")
val discretizer = new QuantileDiscretizer()
.setInputCol("hour")
.setOutputCol("result")
.setNumBuckets(3)
val result = discretizer.fit(df).transform(df)
result.show(false)
spark.stop()
}
}
+---+----+------+
|id |hour|result|
+---+----+------+
|0 |18.0|2.0 |
|1 |19.0|2.0 |
|2 |8.0 |1.0 |
|3 |5.0 |1.0 |
|4 |2.2 |0.0 |
+---+----+------+
决策树分桶
业界常用的是决策树分桶,待后续补充
讨论
如何决定分桶数?
边栏推荐
猜你喜欢
随机推荐
Go Build报错汇总(持续更新)
对账、结账、错账更正方法、划线更正法、红字更正法、补充登记法
Laravel 登录,中间件和路由分组
18张图,直观理解神经网络、流形和拓扑
2021-09-04 最简单的Golang定时器应用以及最简单的协程入门儿
自定义view实现半圆弧进度条
Kotlin - 延迟初始化和密封类
会计凭证概述、原始凭证、原始凭证的种类、原始凭证的基本内容、原始凭证的填制要求、原始凭证的审核
C# 注释语法
英语每日打卡
synchronized锁原理详解
Glide中图片处理
uniapp发布到微信小程序:分包、删减代码全过程
Go 程序太大了,能要个延迟初始化不?
Jetpack中各个组件简介
关于我的专利、软著~
关于我的数学建模~
3D建模作品
备战金九银十:Android 高级架构师的学习路线及面试题分享
面试知识点整理:Skia 架构的场景渲染