Learning Spark [8] - MLlib库 - 线性
机器学习数据管道(Machine Learning Pipeline)
Pipeline的概念,在很多机器学习的模型中都存在,是一个种整理以及操控数据的方法。在MLlib中,Pipeline API提供了一个在dataframe之上,管理机器学习工作流的接口。
MLlib术语
- Transformer
输入一个Dataframe,并输出一个包含了一些新列的Dataframe。Transformer不会从数据中学习到参数,且只会引用一些rule-based规则去转换数据。函数为transform()。
Transformer主要用于清洗数据,以便于数据可以用于模型之中。
- Estimator
Estimator会从数据中学习到参数,根据函数fit()返回一个Model
该Model便可以被认定为一个Transformer
- Pipeline
将一个系列的Transformer和Estimator整理为一个模型。Pipeline自身同样为Estimator,函数为pipeline.fit(),并返回一个PipelineModel。
PipelineModel同样可以被认定为Transformer
例子
数据集为San Francisco housing dataset from Inside Airbnb。该数据集包括了Airbnb的租房的信息,例如卧室间数、位置、评价等等。我们的目标为预测一间夜的出租价格。
在起初拿到数据后,我们需要对数据的异常值或者离群值进行去除(例如:小于$0的出租价),并进行数据清洗(例如转换数据类型),等等,再次不过多赘述。
# In Python
from pyspark.sql import SparkSession
# create new spark instance
spark = (SparkSession
.builder
.appName('SparkSQLExampleApp')
# .master("local[*]")
# .config("spark.sql.catalogImplementation","hive")
# .enableHiveSupport()
.getOrCreate())
path = '.../sf-airbnb-clean.parquet'
airbnb = spark.read.parquet(path)
len(airbnb.columns)
34
该数据集有34个列,我们选择几列来拥有一些大概的数据概览。
airbnbDF.select('neighbourhood_cleansed', 'room_type', 'bedrooms', 'bathrooms',
'number_of_reviews', 'price').show(5)
+----------------------+---------------+--------+---------+-----------------+-----+
|neighbourhood_cleansed| room_type|bedrooms|bathrooms|number_of_reviews|price|
+----------------------+---------------+--------+---------+-----------------+-----+
| Western Addition|Entire home/apt| 1.0| 1.0| 180.0|170.0|
| Bernal Heights|Entire home/apt| 2.0| 1.0| 111.0|235.0|
| Haight Ashbury| Private room| 1.0| 4.0| 17.0| 65.0|
| Haight Ashbury| Private room| 1.0| 4.0| 8.0| 65.0|
| Western Addition|Entire home/apt| 2.0| 1.5| 27.0|785.0|
+----------------------+---------------+--------+---------+-----------------+-----+
only showing top 5 rows
创建训练集与测试集
![](https://img.haomeiwen.com/i25043032/ba166be3acf35438.png)
# Split in Train and Test
trainDF, testDF = airbnb.randomSplit([.8, .2], seed = 42) # seed is for set the randomness
print(f'train size: {trainDF.count()}, test size: {testDF.count()}')
train size: 5780, test size: 1366
单元线性回归
在Spark中建立线性回归模型,我们需要先将所有的自变量合并到一个向量里。我们先只选择一个自变量:卧室间数。我们可以使用vectorAssembler Transofrmer来进行这个操作。
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=["bedrooms"], outputCol="features")
vecTrainDF = vecAssembler.transform(trainDF)
vecTrainDF.select("bedrooms", "features", "price").show(10)
+--------+--------+-----+
|bedrooms|features|price|
+--------+--------+-----+
| 1.0| [1.0]|200.0|
| 1.0| [1.0]|130.0|
| 1.0| [1.0]| 95.0|
| 1.0| [1.0]|250.0|
| 3.0| [3.0]|250.0|
| 1.0| [1.0]|115.0|
| 1.0| [1.0]|105.0|
| 1.0| [1.0]| 86.0|
| 1.0| [1.0]|100.0|
| 2.0| [2.0]|220.0|
+--------+--------+-----+
only showing top 10 rows
在拥有的自变量(卧室间数)和因变量(价格)后,我们就可以进行下一步,构建模型。
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol = 'price')
lrModel = lr.fit(vecTrainDF)
这里的lrModel就是一个estimator,它包含了使用训练集训练出来的参数,也是就是线性方程y=ax+b中的a。实际的结果如下
a = round(lrModel.coefficients[0], 2)
b = round(lrModel.intercept, 2)
print(f'The formula for the linear regression is price = {a} * num_of_bedrooms + {b}')
The formula for the linear regression is price = 123.68 * num_of_bedrooms + 47.51
建立Pipeline(管道)
建立pipeline可以更好管理代码,我们就省去了上述冗长的代码,从而更优雅的训练模型或者使用测试集来测试模型的性能。
from pyspark.ml import Pipeline
# transformer config
vecAssembler = VectorAssembler(inputCols=["bedrooms"], outputCol="features")
# model config
lr = LinearRegression(featuresCol = 'features', labelCol = 'price')
# model training
pipelineModel = Pipeline(stages = [vecAssembler, lr]).fit(trainDF)
# At this time, pipelineModel已经自动可以以transformer来调用,所以便可以很便利的使用测试集来测试模型性能。
predDF = pipelineModel.transform(testDF)
predDF.select('bedrooms', 'features', 'price', 'prediction').show(10)
+--------+--------+------+------------------+
|bedrooms|features| price| prediction|
+--------+--------+------+------------------+
| 1.0| [1.0]| 85.0|171.18598011578285|
| 1.0| [1.0]| 45.0|171.18598011578285|
| 1.0| [1.0]| 70.0|171.18598011578285|
| 1.0| [1.0]| 128.0|171.18598011578285|
| 1.0| [1.0]| 159.0|171.18598011578285|
| 2.0| [2.0]| 250.0|294.86172649777757|
| 1.0| [1.0]| 99.0|171.18598011578285|
| 1.0| [1.0]| 95.0|171.18598011578285|
| 1.0| [1.0]| 100.0|171.18598011578285|
| 1.0| [1.0]|2010.0|171.18598011578285|
+--------+--------+------+------------------+
only showing top 10 rows
多元线性回归&虚拟变量
为了解决自变量中的离散值,我们可以讲该变量转换为Dummy Variable(虚拟变量)。
在Spark中,函数OneHotEncoder()可以实现这一转换。
from pyspark.ml.feature import OneHotEncoder, StringIndexer
# dummy variable
categoricalCols = [field for (field, dataTypes) in trainDF.dtypes if dataTypes == 'string']
indexOutputCols = [x + 'Index' for x in categoricalCols]
oheOutputCols = [x + 'OHE' for x in categoricalCols]
stringIndexer = StringIndexer(inputCols=categoricalCols,
outputCols=indexOutputCols,
handleInvalid = 'skip')
oheEncoder = OneHotEncoder(inputCols = indexOutputCols,
outputCols = oheOutputCols)
numericCols = [field for (field, dataTypes) in trainDF.dtypes if ((dataTypes == 'double') & (field != 'price'))]
assemblerInputs = oheOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols = assemblerInputs,
outputCol = 'features')
lr = LinearRegression(labelCol = 'price', featuresCol = 'features')
pipeline = Pipeline(stages = [stringIndexer, oheEncoder, vecAssembler, lr])
pipelineModel = pipeline.fit(trainDF)
predDF = pipelineModel.transform(testDF)
predDF.select('features', 'price', 'prediction').show(10)
+--------------------+------------------+------------------+
| features| price| prediction|
+--------------------+------------------+------------------+
|(98,[0,3,6,22,43,...| 4.442651256490317| 4.644425529745689|
|(98,[0,3,6,22,43,...|3.8066624897703196| 4.223594858687562|
|(98,[0,3,6,22,43,...| 4.248495242049359| 4.248280556674246|
|(98,[0,3,6,12,42,...| 4.852030263919617|3.8921581128135756|
|(98,[0,3,6,12,43,...|5.0689042022202315| 4.608476041020452|
|(98,[0,3,6,12,43,...| 5.521460917862246| 5.365868119786427|
|(98,[0,3,6,11,42,...| 4.59511985013459| 5.084838593929874|
|(98,[0,3,6,31,42,...| 4.553876891600541| 5.008339179369244|
|(98,[0,3,6,28,42,...| 4.605170185988092| 4.154386449584621|
|(98,[0,3,6,28,43,...| 7.605890001053122| 5.434322576497891|
+--------------------+------------------+------------------+
only showing top 10 rows
测试模型性能
常用模型性能的测试指标有:RMSE(root mean-square error),和R^2
from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator(
predictionCol = 'prediction',
labelCol = 'price',
metricName = 'rmse')
rmse = regressionEvaluator.evaluate(predDF)
print(f'RMSE is {round(rmse,2)}')
RMSE is 220.56
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"R2 is {round(r2, 2)}")
R2 is 0.16
R2过小,我们尝试对因变量进行对数转换。
# log-transform y
from pyspark.sql.functions import log, col
logTrainDF = trainDF.withColumn('price', log(col('price')))
logTestDF = testDF.withColumn('price', log(col('price')))
log_pipelineModel = pipeline.fit(logTrainDF)
predDF = log_pipelineModel.transform(logTestDF)
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"R2 is {round(r2, 2)}")
R2 is 0.57
predDF.select('features', 'price', 'prediction').show(10)
+--------------------+------------------+------------------+
| features| price| prediction|
+--------------------+------------------+------------------+
|(98,[0,3,6,22,43,...| 4.442651256490317| 4.644425529745689|
|(98,[0,3,6,22,43,...|3.8066624897703196| 4.223594858687562|
|(98,[0,3,6,22,43,...| 4.248495242049359| 4.248280556674246|
|(98,[0,3,6,12,42,...| 4.852030263919617|3.8921581128135756|
|(98,[0,3,6,12,43,...|5.0689042022202315| 4.608476041020452|
|(98,[0,3,6,12,43,...| 5.521460917862246| 5.365868119786427|
|(98,[0,3,6,11,42,...| 4.59511985013459| 5.084838593929874|
|(98,[0,3,6,31,42,...| 4.553876891600541| 5.008339179369244|
|(98,[0,3,6,28,42,...| 4.605170185988092| 4.154386449584621|
|(98,[0,3,6,28,43,...| 7.605890001053122| 5.434322576497891|
+--------------------+------------------+------------------+
only showing top 10 rows
Reference
Learning Spark 2nd - Lightning Fast Big Data Analysis by Jules S. Damji, Brooke Wenig, Tathagata Das, and Denny Lee