Spark

Learning Spark [8] - MLlib库 - 线性

2021-02-01  本文已影响0人  屹然1ran

机器学习数据管道(Machine Learning Pipeline)

Pipeline的概念,在很多机器学习的模型中都存在,是一个种整理以及操控数据的方法。在MLlib中,Pipeline API提供了一个在dataframe之上,管理机器学习工作流的接口。

MLlib术语

Transformer主要用于清洗数据,以便于数据可以用于模型之中。

该Model便可以被认定为一个Transformer

PipelineModel同样可以被认定为Transformer

例子

数据集为San Francisco housing dataset from Inside Airbnb。该数据集包括了Airbnb的租房的信息,例如卧室间数、位置、评价等等。我们的目标为预测一间夜的出租价格。
在起初拿到数据后,我们需要对数据的异常值或者离群值进行去除(例如:小于$0的出租价),并进行数据清洗(例如转换数据类型),等等,再次不过多赘述。

# In Python
from pyspark.sql import SparkSession

# create new spark instance
spark = (SparkSession
         .builder
         .appName('SparkSQLExampleApp')
         # .master("local[*]")
         # .config("spark.sql.catalogImplementation","hive")
         # .enableHiveSupport()
         .getOrCreate())

path = '.../sf-airbnb-clean.parquet'

airbnb = spark.read.parquet(path)
len(airbnb.columns)
34

该数据集有34个列,我们选择几列来拥有一些大概的数据概览。

airbnbDF.select('neighbourhood_cleansed', 'room_type', 'bedrooms', 'bathrooms',                 
                'number_of_reviews', 'price').show(5)
+----------------------+---------------+--------+---------+-----------------+-----+
|neighbourhood_cleansed|      room_type|bedrooms|bathrooms|number_of_reviews|price|
+----------------------+---------------+--------+---------+-----------------+-----+
|      Western Addition|Entire home/apt|     1.0|      1.0|            180.0|170.0|
|        Bernal Heights|Entire home/apt|     2.0|      1.0|            111.0|235.0|
|        Haight Ashbury|   Private room|     1.0|      4.0|             17.0| 65.0|
|        Haight Ashbury|   Private room|     1.0|      4.0|              8.0| 65.0|
|      Western Addition|Entire home/apt|     2.0|      1.5|             27.0|785.0|
+----------------------+---------------+--------+---------+-----------------+-----+
only showing top 5 rows

创建训练集与测试集

训练集与测试集
# Split in Train and Test
trainDF, testDF = airbnb.randomSplit([.8, .2], seed = 42) # seed is for set the randomness
print(f'train size: {trainDF.count()}, test size: {testDF.count()}')
train size: 5780, test size: 1366

单元线性回归

在Spark中建立线性回归模型,我们需要先将所有的自变量合并到一个向量里。我们先只选择一个自变量:卧室间数。我们可以使用vectorAssembler Transofrmer来进行这个操作。

from pyspark.ml.feature import VectorAssembler 

vecAssembler = VectorAssembler(inputCols=["bedrooms"], outputCol="features") 
vecTrainDF = vecAssembler.transform(trainDF) 
vecTrainDF.select("bedrooms", "features", "price").show(10)
+--------+--------+-----+
|bedrooms|features|price|
+--------+--------+-----+
|     1.0|   [1.0]|200.0|
|     1.0|   [1.0]|130.0|
|     1.0|   [1.0]| 95.0|
|     1.0|   [1.0]|250.0|
|     3.0|   [3.0]|250.0|
|     1.0|   [1.0]|115.0|
|     1.0|   [1.0]|105.0|
|     1.0|   [1.0]| 86.0|
|     1.0|   [1.0]|100.0|
|     2.0|   [2.0]|220.0|
+--------+--------+-----+
only showing top 10 rows

在拥有的自变量(卧室间数)和因变量(价格)后,我们就可以进行下一步,构建模型。

from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'features', labelCol = 'price')
lrModel = lr.fit(vecTrainDF)

这里的lrModel就是一个estimator,它包含了使用训练集训练出来的参数,也是就是线性方程y=ax+b中的a。实际的结果如下

a = round(lrModel.coefficients[0], 2)
b = round(lrModel.intercept, 2)
print(f'The formula for the linear regression is price = {a} * num_of_bedrooms + {b}')
The formula for the linear regression is price = 123.68 * num_of_bedrooms + 47.51

建立Pipeline(管道)

建立pipeline可以更好管理代码,我们就省去了上述冗长的代码,从而更优雅的训练模型或者使用测试集来测试模型的性能。

from pyspark.ml import Pipeline

# transformer config
vecAssembler = VectorAssembler(inputCols=["bedrooms"], outputCol="features") 
# model config
lr = LinearRegression(featuresCol = 'features', labelCol = 'price')
# model training
pipelineModel = Pipeline(stages = [vecAssembler, lr]).fit(trainDF)

# At this time, pipelineModel已经自动可以以transformer来调用,所以便可以很便利的使用测试集来测试模型性能。
predDF = pipelineModel.transform(testDF)
predDF.select('bedrooms', 'features', 'price', 'prediction').show(10)
+--------+--------+------+------------------+
|bedrooms|features| price|        prediction|
+--------+--------+------+------------------+
|     1.0|   [1.0]|  85.0|171.18598011578285|
|     1.0|   [1.0]|  45.0|171.18598011578285|
|     1.0|   [1.0]|  70.0|171.18598011578285|
|     1.0|   [1.0]| 128.0|171.18598011578285|
|     1.0|   [1.0]| 159.0|171.18598011578285|
|     2.0|   [2.0]| 250.0|294.86172649777757|
|     1.0|   [1.0]|  99.0|171.18598011578285|
|     1.0|   [1.0]|  95.0|171.18598011578285|
|     1.0|   [1.0]| 100.0|171.18598011578285|
|     1.0|   [1.0]|2010.0|171.18598011578285|
+--------+--------+------+------------------+
only showing top 10 rows

多元线性回归&虚拟变量

为了解决自变量中的离散值,我们可以讲该变量转换为Dummy Variable(虚拟变量)。
在Spark中,函数OneHotEncoder()可以实现这一转换。

from pyspark.ml.feature import OneHotEncoder, StringIndexer

# dummy variable 
categoricalCols = [field for (field, dataTypes) in trainDF.dtypes if dataTypes == 'string']

indexOutputCols = [x + 'Index' for x in categoricalCols]
oheOutputCols = [x + 'OHE' for x in categoricalCols]

stringIndexer = StringIndexer(inputCols=categoricalCols,                               
                              outputCols=indexOutputCols,                               
                              handleInvalid = 'skip')

oheEncoder = OneHotEncoder(inputCols = indexOutputCols,
                           outputCols = oheOutputCols)

numericCols = [field for (field, dataTypes) in trainDF.dtypes if ((dataTypes == 'double') & (field != 'price'))]
assemblerInputs = oheOutputCols + numericCols

vecAssembler = VectorAssembler(inputCols = assemblerInputs,
                               outputCol = 'features')

lr = LinearRegression(labelCol = 'price', featuresCol = 'features')

pipeline = Pipeline(stages = [stringIndexer, oheEncoder, vecAssembler, lr])

pipelineModel = pipeline.fit(trainDF)
predDF = pipelineModel.transform(testDF)
predDF.select('features', 'price', 'prediction').show(10)
+--------------------+------------------+------------------+
|            features|             price|        prediction|
+--------------------+------------------+------------------+
|(98,[0,3,6,22,43,...| 4.442651256490317| 4.644425529745689|
|(98,[0,3,6,22,43,...|3.8066624897703196| 4.223594858687562|
|(98,[0,3,6,22,43,...| 4.248495242049359| 4.248280556674246|
|(98,[0,3,6,12,42,...| 4.852030263919617|3.8921581128135756|
|(98,[0,3,6,12,43,...|5.0689042022202315| 4.608476041020452|
|(98,[0,3,6,12,43,...| 5.521460917862246| 5.365868119786427|
|(98,[0,3,6,11,42,...|  4.59511985013459| 5.084838593929874|
|(98,[0,3,6,31,42,...| 4.553876891600541| 5.008339179369244|
|(98,[0,3,6,28,42,...| 4.605170185988092| 4.154386449584621|
|(98,[0,3,6,28,43,...| 7.605890001053122| 5.434322576497891|
+--------------------+------------------+------------------+
only showing top 10 rows

测试模型性能

常用模型性能的测试指标有:RMSE(root mean-square error),和R^2

from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator(
    predictionCol = 'prediction',
    labelCol = 'price',
    metricName = 'rmse')
rmse = regressionEvaluator.evaluate(predDF)
print(f'RMSE is {round(rmse,2)}')
RMSE is 220.56
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF) 
print(f"R2 is {round(r2, 2)}")
R2 is 0.16

R2过小,我们尝试对因变量进行对数转换。

# log-transform y
from pyspark.sql.functions import log, col

logTrainDF = trainDF.withColumn('price', log(col('price')))
logTestDF = testDF.withColumn('price', log(col('price')))

log_pipelineModel = pipeline.fit(logTrainDF)
predDF = log_pipelineModel.transform(logTestDF)

r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF) 
print(f"R2 is {round(r2, 2)}")
R2 is 0.57
predDF.select('features', 'price', 'prediction').show(10)
+--------------------+------------------+------------------+
|            features|             price|        prediction|
+--------------------+------------------+------------------+
|(98,[0,3,6,22,43,...| 4.442651256490317| 4.644425529745689|
|(98,[0,3,6,22,43,...|3.8066624897703196| 4.223594858687562|
|(98,[0,3,6,22,43,...| 4.248495242049359| 4.248280556674246|
|(98,[0,3,6,12,42,...| 4.852030263919617|3.8921581128135756|
|(98,[0,3,6,12,43,...|5.0689042022202315| 4.608476041020452|
|(98,[0,3,6,12,43,...| 5.521460917862246| 5.365868119786427|
|(98,[0,3,6,11,42,...|  4.59511985013459| 5.084838593929874|
|(98,[0,3,6,31,42,...| 4.553876891600541| 5.008339179369244|
|(98,[0,3,6,28,42,...| 4.605170185988092| 4.154386449584621|
|(98,[0,3,6,28,43,...| 7.605890001053122| 5.434322576497891|
+--------------------+------------------+------------------+
only showing top 10 rows

Reference
Learning Spark 2nd - Lightning Fast Big Data Analysis by Jules S. Damji, Brooke Wenig, Tathagata Das, and Denny Lee

上一篇下一篇

猜你喜欢

热点阅读