当前位置:网站首页>Spark特征工程-归一化 和 分桶
Spark特征工程-归一化 和 分桶
2022-08-02 03:28:00 【Code_LT】
归一化 和 分桶
val double2vec: UserDefinedFunction = udf {
(value: Double) => org.apache.spark.ml.linalg.Vectors.dense(value) }
def ratingFeatures(samples:DataFrame): Unit ={
samples.printSchema()
samples.show(10)
//利用打分表ratings计算电影的平均分、被打分次数等数值型特征
val movieFeatures = samples.groupBy(col("movieId"))
.agg(count(lit(1)).as("ratingCount"),
avg(col("rating")).as("avgRating"),
variance(col("rating")).as("ratingVar"))
.withColumn("avgRatingVec", double2vec(col("avgRating")))
movieFeatures.show(10)
//分桶处理,创建QuantileDiscretizer进行分桶,将打分次数这一特征分到100个桶中
val ratingCountDiscretizer = new QuantileDiscretizer()
.setInputCol("ratingCount")
.setOutputCol("ratingCountBucket")
.setNumBuckets(100)
//归一化处理,创建MinMaxScaler进行归一化,将平均得分进行归一化
val ratingScaler = new MinMaxScaler()
.setInputCol("avgRatingVec")
.setOutputCol("scaleAvgRating")
//创建一个pipeline,依次执行两个特征处理过程
val pipelineStage: Array[PipelineStage] = Array(ratingCountDiscretizer, ratingScaler)
val featurePipeline = new Pipeline().setStages(pipelineStage)
val movieProcessedFeatures = featurePipeline.fit(movieFeatures).transform(movieFeatures)
//打印最终结果
movieProcessedFeatures.show(10)
注释:
spark的vector深入浅出
Spark MLIB的Normalizer、StandardScaler、MinMaxScaler、RobustScaler
Spark MLIB的Normalizer、StandardScaler、MinMaxScaler-2
分桶
Bucketizer
Bucketizer将连续的特征列转换成特征桶(buckets)列。这些桶由用户指定。它拥有一个splits参数。 例如商城的人群,觉得把人分为50以上和50以下太不精准了,应该分为20岁以下,20-30岁,30-40岁,36-50岁,50以上,那么就得用到数值离散化的处理方法了。
splits:如果有n+1个splits,那么将有n个桶。桶将由split x和split y共同确定,它的值范围为[x,y),如果是最后 一个桶,范围将是[x,y]。splits应该严格递增。负无穷和正无穷必须明确的提供用来覆盖所有的双精度值,否则,超出splits的值将会被 认为是一个错误。
注意,如果你并不知道目标列的上界和下界,你应该添加Double.NegativeInfinity和Double.PositiveInfinity作为边界从而防止潜在的 超过边界的异常。下面是程序调用的例子
object BucketizerDemo {
def main(args: Array[String]): Unit = {
var spark = SparkSession.builder().appName("BucketizerDemo").master("local[2]").getOrCreate();
val array = Array((1,13.0),(2,16.0),(3,23.0),(4,35.0),(5,56.0),(6,44.0))
//将数组转为DataFrame
val df = spark.createDataFrame(array).toDF("id","age")
// 设定边界,分为5个年龄组:[0,20),[20,30),[30,40),[40,50),[50,正无穷)
// 注:人的年龄当然不可能正无穷,这里演示正无穷PositiveInfinity的用法,负无穷是NegativeInfinity。
val splits = Array(0, 20, 30, 40, 50, Double.PositiveInfinity)
//初始化Bucketizer对象并进行设定:setSplits是设置我们的划分依据
val bucketizer = new Bucketizer().setSplits(splits).setInputCol("age").setOutputCol("bucketizer_feature")
//transform方法将DataFrame二值化。
val bucketizerdf = bucketizer.transform(df)
//show是用于展示结果
bucketizerdf.show
}
}
+---+----+------------------+
| id| age|bucketizer_feature|
+---+----+------------------+
| 1|13.0| 0.0|
| 2|16.0| 0.0|
| 3|23.0| 1.0|
| 4|35.0| 2.0|
| 5|56.0| 4.0|
| 6|44.0| 3.0|
+---+----+------------------+
QuantileDiscretizer
QuantileDiscretizer:按分位数,对给出的数据列进行离散化分箱处理。
和Bucketizer(分箱处理)一样也是:将连续数值特征转换为离散类别特征。实际上Class QuantileDiscretizer extends Bucketizer。
- 参数1:不同的是这里不再自己定义splits(分类标准),而是定义分几箱(段)就可以了。QuantileDiscretizer自己调用函数计算分位数,并完成离散化。
- 参数2: 另外一个参数是精度,如果设置为0,则计算最精确的分位数,这是一个高时间代价的操作,且上下边界将设置为正负无穷,覆盖所有实数范围。
分位数(Quantile),亦称分位点,是指将一个随机变量的概率分布范围分为几个等份的数值点,常用的有中位数(即二分位数)、四分位数、百分位数等。
package com.home.spark.ml
import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.QuantileDiscretizer
import org.apache.spark.sql.SparkSession
/** * @Description: 分位数离散器 * QuantileDiscretizer接收具有连续特征的列,并输出具有合并分类特征的列。按分位数,对给出的数据列进行离散化分箱处理。 * 箱数由numBuckets参数设置。 * 例如,如果输入的不同值太少而无法创建足够的不同分位数,则所使用的存储桶的数量可能会小于该值。 * * NaN值:在QuantileDiscretizer拟合过程中,将从柱中除去NaN值。这将产生一个Bucketizer模型进行预测。 * 在转换期间,Bucketizer在数据集中找到NaN值时将引发错误,但是用户也可以通过设置handleInvalid选择保留还是删除数据集中的NaN值。 * 如果用户选择保留NaN值,则将对其进行特殊处理并将其放入自己的存储桶中, * 例如,如果使用4个存储桶,则将非NaN数据放入存储桶[0-3]中,但NaN将被存储放在一个特殊的桶中[4]。 * * 算法:分箱范围是使用近似算法选择的(有关详细说明,请参见aboutQuantile的文档)。 * 可以使用relativeError参数控制近似精度。设置为零时,将计算精确的分位数(注意:计算精确的分位数是一项昂贵的操作)。 * 分箱的上下边界将是-Infinity和+ Infinity,覆盖所有实数值。 * **/
object Ex_QuantileDiscretizer {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf(true).setMaster("local[2]").setAppName("spark ml")
val spark = SparkSession.builder().config(conf).getOrCreate()
val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
val df = spark.createDataFrame(data).toDF("id", "hour")
val discretizer = new QuantileDiscretizer()
.setInputCol("hour")
.setOutputCol("result")
.setNumBuckets(3)
val result = discretizer.fit(df).transform(df)
result.show(false)
spark.stop()
}
}
+---+----+------+
|id |hour|result|
+---+----+------+
|0 |18.0|2.0 |
|1 |19.0|2.0 |
|2 |8.0 |1.0 |
|3 |5.0 |1.0 |
|4 |2.2 |0.0 |
+---+----+------+
决策树分桶
业界常用的是决策树分桶,待后续补充
讨论
如何决定分桶数?
边栏推荐
猜你喜欢
随机推荐
如何在正则表达式里表达可能存在也可能不存在的内容?
SGDP(2)——声纳寻宝游戏
Go中的一些优化笔记,简约而不简单
Out of memory error on GPU 0. Cannot allocate xxxGB memory on GPU 0, available memory is only xxx
对账、结账、错账更正方法、划线更正法、红字更正法、补充登记法
Binder机制详解(三)
RecyclerView使用和原理解析
链动2+1模式开发系统
关于我的项目-微信公众号~
Laravel 验证唯一时排除修改时的数据
无源域适应(SFDA)方向的领域探究和论文复现(第二部分)
元宇宙是一个炒作的科幻概念,还是互联网发展的下半场?
会计凭证概述、原始凭证、原始凭证的种类、原始凭证的基本内容、原始凭证的填制要求、原始凭证的审核
The first time to tear the code by hand, how to solve the problem of full arrangement
一文理解分布式开发中的服务治理
英语每日打卡
Syncthing文件同步方案完全攻略(亲测有效)
深度学习理论:model.fit 函数参数详解
Glide使用及原理分析
蓝桥杯:国二选手经验贴 附蓝桥杯历年真题









