pyspark

pySpark 机器学习库ml入门

2019-10-05  本文已影响0人  井底蛙蛙呱呱呱

在spark中提供了两个机器学习库mllib和ml,mllib的操作是基于RDD的,而ml则是基于DataFrame,是主流机器学习库。

1、ml包的概述

ml包包括三个主要的抽象类:转换器(Transformer)、评估器(Estimator)和管道(Pipeline)。

1.1 转换器

转换器类通过将一个新列附加到DataFrame来转换数据。

从高层次上看,当从转换器的抽象类派生时,每个新的转换器类需要实现.transform()方法。该方法要求传递一个要被转换的DataFrame,该参数通常是第一个也是唯一的一个强制性参数。这在ml包的不同方法中也不尽相同:其他常见的参数有inputCol和outputCol;然而,这些参数通常用一些预定义的值作为默认值,例如inputCol参数的默认值为“features”。

spark.ml.feature中提供了许多转换器,下面做个简要介绍:

1.2 评估器

评估器可以被视为需要评估的统计模型,对你的观测对象做预测或分类。

如果从抽象的评估器类派生,新模型必须实现fit方法,该方法给出的在DataFrame中找到的数据和某些默认或自定义的参数来拟合模型。在pyspark中有很多评估器可用,下面简要介绍下spark中提供的模型。

分类

ML包提供了七种分类(Classification)模型以供选择,从最简单的逻辑回归到一些更复杂的模型,下面作简要的描述:

回归

pyspark ML软件包中有七种可用于回归(Regression)任务的模型。与分类一样,范围从一些基本的回归(如强制线性回归)到更复杂的回归:

聚类

聚类是一系列无监督模型,用于查找数据中的隐含模式。pyspark ML包提供了四种当前最流行的模型:

1.3 管道

pyspark ML中管道的概念用来表示从转换到评估(具有一系列不同阶段)的端到端的过程,这个过程可以对输入的一些原始数据(以DataFrame形式)执行必要的数据加工(转换),最后评估模型。

一个管道可以被认为是由一系列不同阶段组成的。在Pipeline对象上执行fit方法时,所有阶段按照stage参数中指定的顺序执行;stage参数是转换器和评估器对象的列表。管道对象的fit方法执行每个转换器的transform方法和所有评估器的fit方法。

通常,前一阶段的输出会成为下一阶段的输入:当从转换器或评估器抽象类型派生时,需要实现getOutputCol()方法,该方法返回创建对象时指定的outputCol参数的值。

下面通过一些例子来详细介绍ml的用法。

2、例子:使用ML预测婴儿生存几率

在本节中我们将使用ml中的函数方法来预测婴儿生存率,数据可从http://www.tomdrabas.com/data/LearningPySpark/births_transformed.csv.gz下载。

2.1 加载数据
import pyspark.sql.types as typ
from pyspark.ml import Pipeline
import pyspark.ml.classification as cl
import pyspark.ml.evaluation as ev
import pandas as pd
import numpy as np
import os

labels = [('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
          ('BIRTH_PLACE', typ.StringType()),
          ('MOTHER_AGE_YEARS', typ.IntegerType()),
          ('FATHER_COMBINE_AGE', typ.IntegerType()),
          ('CIG_BEFORE', typ.IntegerType()),
          ('CIG_1_TRI', typ.IntegerType()),
          ('CIG_2_TRI', typ.IntegerType()),
          ('CIG_3_TRI', typ.IntegerType()),
          ('MOTHER_HEIGHT_IN', typ.IntegerType()),
          ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
          ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
          ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
          ('DIABETES_PRE', typ.IntegerType()),
          ('DIABETES_GEST', typ.IntegerType()),
          ('HYP_TENS_PRE', typ.IntegerType()),
          ('HYP_TENS_GEST', typ.IntegerType()),
          ('PREV_BIRTH_PRETERM', typ.IntegerType())
          ]

schema = typ.StructType([
    typ.StructField(e[0], e[1], False) for e in labels
])

births = spark.read.csv(
    '/Users/shexuan/Downloads/births_transformed.csv.gz', header=True, schema=schema)

births.show(3)
+----------------------+-----------+----------------+------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+
|INFANT_ALIVE_AT_REPORT|BIRTH_PLACE|MOTHER_AGE_YEARS|FATHER_COMBINE_AGE|CIG_BEFORE|CIG_1_TRI|CIG_2_TRI|CIG_3_TRI|MOTHER_HEIGHT_IN|MOTHER_PRE_WEIGHT|MOTHER_DELIVERY_WEIGHT|MOTHER_WEIGHT_GAIN|DIABETES_PRE|DIABETES_GEST|HYP_TENS_PRE|HYP_TENS_GEST|PREV_BIRTH_PRETERM|
+----------------------+-----------+----------------+------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+
|                     0|          1|              29|                99|         0|        0|        0|        0|              99|              999|                   999|                99|           0|            0|           0|            0|                 0|
|                     0|          1|              22|                29|         0|        0|        0|        0|              65|              180|                   198|                18|           0|            0|           0|            0|                 0|
|                     0|          1|              38|                40|         0|        0|        0|        0|              63|              155|                   167|                12|           0|            0|           0|            0|                 0|
+----------------------+-----------+----------------+------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+

在这里我们指定DataFrame的schema,限制数据集只有17列。

2.2 创建转换器

在使用模型对数据集进行评估预测前,先要对数据做一些特征转换。

# 创建转换器、评估器
import  pyspark.ml.feature as ft

births = births.withColumn('BIRTH_PLACE_INT', births['BIRTH_PLACE']\
    .cast(typ.IntegerType()))

# birth place使用one-hot编码
encoder = ft.OneHotEncoder(inputCol='BIRTH_PLACE_INT',
                           outputCol='BIRTH_PLACE_VEC')

# 创建单一的列将所有特征整合在一起
featuresCreator = ft.VectorAssembler(
    inputCols=[col[0] for col in labels[2:]] + [encoder.getOutputCol()],
    outputCol='features'
)

# 创建一个评估器
import pyspark.ml.classification as cl

logistic = cl.LogisticRegression(maxIter=10,
                                regParam=0.01,
                                featuresCol=featuresCreator.getOutputCol(),
                                labelCol='INFANT_ALIVE_AT_REPORT')
2.3 创建一个管道、拟合模型

在前面我们已经创建了数据转换器和评估器,现在我们可以通过管道将他们串联起来并方便的进行模型拟合了。

# 创建一个管道
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[encoder, featuresCreator, logistic])

# 拟合模型
birth_train, birth_test = births.randomSplit([0.7,0.3],seed=123)

model = pipeline.fit(birth_train)
test_model = model.transform(birth_test)
2.4 评估模型

在前面我们将数据分为了两部分并通过管道方便的对训练集进行了拟合以及对测试集进行了测试。现在我们可以通过测试集的结果来对模型拟合效果进行评估了。

# 评估模型性能
import pyspark.ml.evaluation as ev

evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability',
    labelCol='INFANT_ALIVE_AT_REPORT'
)

print(evaluator.evaluate(test_model, {evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(test_model, {evaluator.metricName:'areaUnderPR'}))

0.7187355793173213
0.6819691176245866
2.5 保存模型

PySpark不仅允许保存训练好的模型,还可以保存管道结构及所有转换器和评估器的定义。

# 保存模型pipeline
pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)

# 重载模型pipeline
loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline.fit(birth_train).transform(birth_test).take(1)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', ...]

# 保存模型
from pyspark.ml import PipelineModel

modelPath = './infant_oneHotEncoder_LogisticPipelineModel'
model.write().overwrite().save(modelPath)

# 载入模型
loadedPipelineModel = PipelineModel.load(modelPath)
test_reloadedModel = loadedPipelineModel.transform(birth_test)
test_reloadedModel.take(1)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=12, ...]
2.6 超参调优

我们的第一个模型几乎不可能是最好的模型。利用超参调优能帮我们找到模型的最佳参数,如逻辑回归模型所需的最佳迭代次数或决策树的最大深度。

在超参调优时PySpark提供了两种验证方法:K-Fold交叉验证和train-validation(相当于1-Fold)交叉验证。

# 超参调优:grid search和train-validation splitting 

# 网格搜索
import pyspark.ml.tuning as tune

logistic = cl.LogisticRegression(labelCol='INFANT_ALIVE_AT_REPORT')
grid = tune.ParamGridBuilder()\
    .addGrid(logistic.maxIter, [5,10,50])\
    .addGrid(logistic.regParam, [0.01,0.05,0.3])\
    .build()

evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability',
    labelCol='INFANT_ALIVE_AT_REPORT'
)

# 使用K-Fold交叉验证评估各种参数的模型
cv = tune.CrossValidator(
    estimator=logistic,
    estimatorParamMaps=grid,
    evaluator=evaluator,
    numFolds=3
)

# 创建一个构建特征的pipeline
pipeline = Pipeline(stages=[encoder, featuresCreator])
birth_train, birth_test = births.randomSplit([0.7,0.3],seed=123) # 重新打开数据进行处理
data_transformer = pipeline.fit(birth_train)
data_test = data_transformer.transform(birth_test)


# cvModel 返回估计的最佳模型
cvModel = cv.fit(data_transformer.transform(birth_train))
results = cvModel.transform(data_test)

print(evaluator.evaluate(results, {evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(results, {evaluator.metricName:'areaUnderPR'}))

0.735848884034915
0.6959036715961695

使用下面的代码可以查看模型最佳参数:

# 查看最佳模型参数
param_maps = cvModel.getEstimatorParamMaps()
eval_metrics = cvModel.avgMetrics

param_res = []

for params, metric in zip(param_maps, eval_metrics):
    param_metric = {}
    for key, param_val in zip(params.keys(), params.values()):
        param_metric[key.name]=param_val
    param_res.append((param_metric, metric))

sorted(param_res, key=lambda x:x[1], reverse=True)

[({'maxIter': 50, 'regParam': 0.01}, 0.7406291618177623),
 ({'maxIter': 10, 'regParam': 0.01}, 0.735580969909943),
 ({'maxIter': 50, 'regParam': 0.05}, 0.7355100622938429),
 ({'maxIter': 10, 'regParam': 0.05}, 0.7351586303619441),
 ({'maxIter': 10, 'regParam': 0.3}, 0.7248698034708339),
 ({'maxIter': 50, 'regParam': 0.3}, 0.7214679272915997),
 ({'maxIter': 5, 'regParam': 0.3}, 0.7180255703028883),
 ({'maxIter': 5, 'regParam': 0.01}, 0.7179304617840288),
 ({'maxIter': 5, 'regParam': 0.05}, 0.7173397593133481)]

上面使用的使用K-Fold来进行超参调优,K-Fold交叉验证往往非常耗时,使用1-Fold的交叉验证(即将数据集按比例分为训练集合验证集)能大大缩短时间。

# Train-validation划分

# 使用卡方检验选择特征
selector = ft.ChiSqSelector(
    numTopFeatures=5,
    featuresCol=featuresCreator.getOutputCol(),
    outputCol='selectedFeatures',
    labelCol='INFANT_ALIVE_AT_REPORT'
)

logistic = cl.LogisticRegression(labelCol='INFANT_ALIVE_AT_REPORT',
                                featuresCol='selectedFeatures')

pipeline = Pipeline(stages=[encoder, featuresCreator, selector])
data_transformer = pipeline.fit(birth_train)

tvs = tune.TrainValidationSplit(estimator=logistic,
                               estimatorParamMaps=grid,
                               evaluator=evaluator,
                                trainRatio=0.75
                               )

tvsModel = tvs.fit(data_transformer.transform(birth_train))
data_test = data_transformer.transform(birth_test)
results = tvsModel.transform(data_test)

print(evaluator.evaluate(results, {evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(results, {evaluator.metricName:'areaUnderPR'}))

0.6111344483529891
0.5735913338089571

3、使用PySpark ML的其他功能

在上面我们完整的介绍了利用pyspark ml库来进行建模的过程。下面我们介绍一些其他常用的功能。

3.1 特征提取
3.1.1 NLP相关特征提取

如第一部分所述,NGram模型采用标记文本的列表,并生成单词对(或n-gram)。
本例中,我们从pyspark的文档中摘录一段,并介绍如何在将文本传递给NGram模型之前进行清理。

# NLP相关特征提取(NGram模型采用标记文本的列表,并生成单词对或n-gram)

text_data = spark.createDataFrame([
    ['''K-fold cross validation performs model selection by splitting the dataset into a set of non-overlapping 
    randomly partitioned folds which are used as separate training and test datasets e.g., with k=3 folds, 
    K-fold cross validation will generate 3 (training, test) dataset pairs, each of which uses 2/3 of the data 
    for training and 1/3 for testing. Each fold is used as the test set exactly once.'''],
    ['''CrossValidatorModel contains the model with the highest average cross-validation metric across folds and
    uses this model to transform input data. CrossValidatorModel also tracks the metrics for each param map 
    evaluated.'''],
    ['''Creates a copy of this instance with a randomly generated uid and some extra params. This copies the 
    underlying bestModel, creates a deep copy of the embedded paramMap, and copies the embedded and extra 
    parameters over.''']
], ['input'])

# 将文本拆分成单词
tokenizer = ft.RegexTokenizer(inputCol='input',
                              outputCol='input_arr',
                              pattern='\s+|[,.\"]')

# 删掉停用词
stopwords = ft.StopWordsRemover(inputCol=tokenizer.getOutputCol(),
                               outputCol='input_stop')
# 生成ngram词对
ngram = ft.NGram(n=2,
                inputCol=stopwords.getOutputCol(),
                outputCol='nGrams')

# 构建特征pipeline
pipeline = Pipeline(stages=[tokenizer, stopwords, ngram])

data_ngram = pipeline\
    .fit(text_data)\
    .transform(text_data)

data_ngram.show()

+--------------------+--------------------+--------------------+--------------------+
|               input|           input_arr|          input_stop|              nGrams|
+--------------------+--------------------+--------------------+--------------------+
|K-fold cross vali...|[k-fold, cross, v...|[k-fold, cross, v...|[k-fold cross, cr...|
|CrossValidatorMod...|[crossvalidatormo...|[crossvalidatormo...|[crossvalidatormo...|
|Creates a copy of...|[creates, a, copy...|[creates, copy, i...|[creates copy, co...|
+--------------------+--------------------+--------------------+--------------------+
3.1.2 离散连续变量

我们常常需要处理高度非线性连续特征,很难只用一个系数来供给模型。这种情况下,可能难以用一个系数来解释这样的特征与目标之间的关系。有时候,将值划分成分类级别是很有用的。

# 离散连续变量
x = np.arange(0, 100)
x = (x/100.0)*np.pi*4
y = x*np.sin(x/1.764)+20.1234

schema = typ.StructType([typ.StructField('continuous_var', typ.DoubleType(), nullable=False)])
data = spark.createDataFrame([[float(e)] for e in y], schema=schema)
data.show(4)
+------------------+
|    continuous_var|
+------------------+
|           20.1234|
|20.132344452369832|
|20.159087064491775|
|20.203356291885854|
+------------------+


# 使用QuantileDiscretizer模型将连续变量分为五个分类级别
discretizer = ft.QuantileDiscretizer(
    numBuckets=5, 
    inputCol='continuous_var',
    outputCol='discritized'
)

data_discretized = discretizer.fit(data).transform(data)
data_discretized.show(5,truncate=False)
+------------------+-----------+
|continuous_var    |discritized|
+------------------+-----------+
|20.1234           |2.0        |
|20.132344452369832|2.0        |
|20.159087064491775|2.0        |
|20.203356291885854|2.0        |
|20.26470185735763 |2.0        |
+------------------+-----------+
3.1.3 标准化连续变量

标准化连续变量不仅有助于更好地理解特征之间的关系,而且还有助于计算效率,并防止运行到某些数字陷阱。

# 标准化连续变量
# 首先,要创建一个向量代表连续变量(因为它只是一个float)
vectorizer = ft.VectorAssembler(inputCols=['continuous_var'],
                               outputCol='continuous_vec')

normlizer = ft.StandardScaler(inputCol=vectorizer.getOutputCol(),
                             outputCol='normlized',
                             withMean=True,
                             withStd=True)

pipeline = Pipeline(stages=[vectorizer, normlizer])
data_standardized = pipeline.fit(data).transform(data)
data_standardized.show(4)
+------------------+--------------------+--------------------+
|    continuous_var|      continuous_vec|           normlized|
+------------------+--------------------+--------------------+
|           20.1234|           [20.1234]|[0.23429139554502...|
|20.132344452369832|[20.132344452369832]|[0.23630959828688...|
|20.159087064491775|[20.159087064491775]|[0.24234373105179...|
|20.203356291885854|[20.203356291885854]|[0.25233252325644...|
+------------------+--------------------+--------------------+
3.2 聚类

在前面的例子中我们介绍了如何使用pyspark ml库来拟合分类模型,在本节我们将简单介绍pyspark ml库中的聚类模型。

聚类是机器学习中的另一个重要组成部分:通常在现实世界中,我们没有那么幸运具有目标特征,所以需要回到一个无监督的学习范例,来试图从中发掘数据内的模式。

# 聚类
# 使用Kmeans模型在出生数据中查找相似性
import pyspark.ml.clustering as clus

kmeans = clus.KMeans(k=5, featuresCol='features')
pipeline = Pipeline(stages=[encoder, featuresCreator, kmeans])
model = pipeline.fit(birth_train)

test = model.transform(birth_test)
test.groupby('prediction')\
    .agg({'*':'count',
         'MOTHER_HEIGHT_IN':'avg'})\
    .collect()

[Row(prediction=1, avg(MOTHER_HEIGHT_IN)=67.43708609271523, count(1)=453),
 Row(prediction=3, avg(MOTHER_HEIGHT_IN)=67.65714285714286, count(1)=245),
 Row(prediction=4, avg(MOTHER_HEIGHT_IN)=63.92423385728825, count(1)=8843),
 Row(prediction=2, avg(MOTHER_HEIGHT_IN)=84.97315436241611, count(1)=447),
 Row(prediction=0, avg(MOTHER_HEIGHT_IN)=65.45034965034965, count(1)=3575)]
3.3 回归

上面已经介绍过了分类和聚类模型,最后我们再来简单介绍一下回归模型。
在本节中,我们将尝试用给定的一些特征来预测MOTHER_WEIGHT_GAIN。

# 回归
# 使用梯度提升决策树来预测增加的体重
import pyspark.ml.regression as reg

features = ['MOTHER_AGE_YEARS', 'MOTHER_HEIGHT_IN', 'MOTHER_PRE_WEIGHT',
            'DIABETES_PRE', 'DIABETES_GEST', 'HYP_TENS_PRE', 'HYP_TENS_GEST',
            'PREV_BIRTH_PRETERM', 'CIG_BEFORE', 'CIG_1_TRI', 'CIG_2_TRI',
            'CIG_3_TRI']

featuresCreator = ft.VectorAssembler(
    inputCols=[col for col in features[1:]],
    outputCol='features'
)

# 这里使用卡方检验选择前六个最重要的特征
selector = ft.ChiSqSelector(numTopFeatures=6,
                            outputCol='selectedFeatures',
                            labelCol='MOTHER_WEIGHT_GAIN')

regressor = reg.GBTRegressor(maxIter=15,
                            maxDepth=3,
                            labelCol='MOTHER_WEIGHT_GAIN')

pipeline = Pipeline(stages=[featuresCreator, selector, regressor])
weight_gain = pipeline.fit(birth_train)

# 测试集评估
evaluator = ev.RegressionEvaluator(predictionCol='prediction',labelCol='MOTHER_WEIGHT_GAIN')
print(evaluator.evaluate(weight_gain.transform(birth_test),
                        {evaluator.metricName:'r2'}))

0.49363823556949404

虽然模型结果不太好,但是我们的重点不在这里,我们只是为了了解pyspark ml库中回归模型的用法。

参考:

《pyspark 实战指南:利用python和spark构建数据密集型应用并规模化部署》

上一篇下一篇

猜你喜欢

热点阅读