当前位置:网站首页>[spark] (task8) pipeline channel establishment in sparkml

[spark] (task8) pipeline channel establishment in sparkml

2022-06-10 12:58:00 Mountain peak evening view

One 、ML Pipeline Machine learning process

If there are fewer samples , You can use it directly python Carry out the sample ML modeling , But when large data sets are needed , have access to spark Perform distributed memory computing , although spark The native language of is scala, But if use python Writing can be used pyspark Machine learning pipeline Link establishment .

1.1 ML Pipeline Construction process

spark Yes MLlib Machine learning library , Than ML Pipeline complex , Let's take a look at ML Pipeline Build a machine learning process :

  • Data preparation : Organize eigenvalues and predictive variables into DataFrame
  • Establish a machine learning process Pipeline:
    • StringIndexer: Convert text classification features into numbers
    • OneHotEncoder: Transform digital classification features into sparse vectors
    • VectorAssembler: Integrate all feature fields into one Vector Field
    • DecisionTreeClassfier: Training generation model
  • Training : Training set use pipeline.fit() Training , produce pipelineModel
  • forecast : Use pipelineModel.transform() Prediction test set , Produce prediction results

1.2 ML Pipeline Components

Be careful :pyspark Some components and python Components with the same name in are not exactly the same :

  • DataFrame: yes Spark ML machine learning API The data format to be processed , You can use text files 、RDD、 perhaps Spark SQL establish , And python Of Dataframe The concept is similar, but the method is completely different .
  • Transformer: have access to .transform Method will be a DataFrame Convert to another one DataFrame.
  • Estimator: have access to .fit Methods the incoming DataFrame, Generate a Transformer.
  • pipeline: It can be connected in series Transformer and Estimator establish ML Machine learning workflow .
  • Parameter: above Transformer and Estimator Parameters that can be shared API.

Two 、 With GBDT For chestnuts

2.0 GBTs Introduce

Spark Medium GBDT a GBTs—— Gradient lifting tree , Because it is based on decision tree (Decision Tree,DT) Realized , So called GBDT.Spark Medium GBDT The algorithm exists in ml Bao He mllib In bag :

  • mllib Is based on RDD Of ,
  • ml The package is for DataFrame Of ,ml In bag GBDT It is divided into classification and regression .

Because in the actual production environment, based on RDD More of , So use it directly MLLib In bag GBTs,ML The in the package is briefly explained .

 Insert picture description here

  • pipeline: One Pipeline Structurally, it will contain one or more PipelineStage, every last PipelineStage Will complete a task , Such as data set processing and transformation , model training , Parameter setting or data prediction, etc , In this way PipelineStage stay ML There are corresponding definitions and implementations according to different types of problems .
  • transformer: It's a pipelineStage, Put one df Change to another df, One model You can put a test data set that does not contain a prediction tag DataFrame Label it and convert it into another one with forecast label DataFrame, Obviously, such a result set can be used to visualize the analysis results .
  • estimator: operation df The data generates a transformer, Include fit part .

2.1 load libsvm data

# gbdt_test
import findspark
findspark.init()
import pyspark
from pyspark import SparkConf
from pyspark.ml import Pipeline
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, VectorIndexer,IndexToString
from pyspark.ml.classification import GBTClassifier

file_path = "file:///home/hadoop/development/RecSys/data"

# def gradientBoostedTreeClassifier(data="data/sample_libsvm_data.txt"):
# def gradientBoostedTreeClassifier(data):
''' GBDT classifier  '''
# load LIBSVM Data set in format 
data = spark.read.format("libsvm").load(data)  
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
data.show(n = 3)

""" +-----+--------------------+ |label| features| +-----+--------------------+ | 0.0|(692,[127,128,129...| | 1.0|(692,[158,159,160...| | 1.0|(692,[124,125,126...| +-----+--------------------+ only showing top 3 rows """

2.2 pipeline Link procedure

# Training set 、 Test set partitioning 
(trainingData, testData) = data.randomSplit([0.7, 0.3])
#print(" Training set :\n", trainingData.show(n = 1), "\n")
#print(" Test set :\n", testData.show(n = 1))

#  Use 10 Base classifiers 
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)
print("gbt_test:\n", gbt, "\n")

#  Modelled pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])
print("pipeline:\n", type(pipeline), "\n")

model = pipeline.fit(trainingData)

#  Make predictions  
predictions = model.transform(testData)

# Before the exhibition 5 Row data 
predictions.select("prediction", "indexedLabel", "features").show(5)

# Show forecast tags and real tags , Calculate the test error  fit part
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

# predict
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
gbtModel = model.stages[2]
print('gbtModelSummary: ',gbtModel)  # Model summary 

give the result as follows , from Test Error = 0.12 see , namely accuracy by 98% The effect of , The above is a simple GBDT Classification task , adopt 10 Base classifiers , according to boosting The strategy is optimized according to the negative gradient :

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[95,96,97,12...|
+-----+--------------------+
only showing top 1 row

 Training set :
 None 

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[123,124,125...|
+-----+--------------------+
only showing top 1 row

 Test set :
 None
gbt_test:
 GBTClassifier_eafe5d3c8749 

pipeline:
 <class 'pyspark.ml.pipeline.Pipeline'> 

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       1.0|         1.0|(692,[123,124,125...|
|       1.0|         1.0|(692,[124,125,126...|
|       1.0|         1.0|(692,[126,127,128...|
|       1.0|         1.0|(692,[129,130,131...|
|       1.0|         1.0|(692,[150,151,152...|
+----------+------------+--------------------+
only showing top 5 rows

Test Error = 0.12
gbtModelSummary:  GBTClassificationModel: uid = GBTClassifier_eafe5d3c8749, numTrees=10, numClasses=2, numFeatures=692

Reference

[1] Spark Sorting algorithm series GBTs How to use
[2] MLlib:https://www.jianshu.com/p/4d7003182398
[3] pyspark learning —— Assembly line Pipeline

原网站

版权声明
本文为[Mountain peak evening view]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/161/202206101236383312.html