Spark ML

2019-04-27  本文已影响0人  枫隐_5f5f

MLLIB 包是基于RDD操作
ML包是基于DataFrame操作的

ML包主要分成三个部分:

一 转换器

将一个新列附加到dataframe上来转换数据
当从转换器的抽象类派生时,每个新的转换器类需要实现 .transform 方法 该方法要求传递一个要被转换的DataFrame

在spark.ml.feature 中提供了许多转换器

二 评估器

  评估器就是模型   对观测对象进行预测或分类

分类模型

回归模型

聚类模型

三 管道

管道是用来表示从转换到评估的过程
demo实际使用

from  pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.ml.feature as ft
spark = SparkSession.builder \
    .appName("testname") \
    .getOrCreate()

# read data
labels = [
    ('INFANT_ALIVE_AT_REPORT', IntegerType()),
    ('BIRTH_PLACE', StringType()),
    ('MOTHER_AGE_YEARS',IntegerType()),
    ('FATHER_COMBINED_AGE',IntegerType()),
    ('CIG_BEFORE',IntegerType()),
    ('CIG_1_TRI',IntegerType()),
    ('CIG_2_TRI',IntegerType()),
    ('CIG_3_TRI',IntegerType()),
    ('MOTHER_HEIGHT_IN',IntegerType()),
    ('MOTHER_PRE_WEIGHT',IntegerType()),
    ('MOTHER_DELIVERY_WEIGHT',IntegerType()),
    ('MOTHER_WEIGHT_GAIN',IntegerType()),
    ('DIABETES_PRE',IntegerType()),
    ('DIABETES_GEST',IntegerType()),
    ('HYP_TENS_PRE',IntegerType()),
    ('HYP_TENS_GEST',IntegerType()),
    ('PREV_BIRTH_PRETERM',StringType())
    ]

schema = StructType([StructField(e[0],e[1],False) for e in labels])
births = spark.read.csv("file:///home/njliu/prc/pyspark/05/births_train.csv.gz",header=True,schema=schema)


# transformation
# trans stringType into IntegerType
births = births.withColumn('BIRTH_PLACE_INT',births["BIRTH_PLACE"].cast(IntegerType()))

#Onehotencoder for BIRTH_PLACE_INT
encoder = ft.OneHotEncoder(inputCol="BIRTH_PLACE_INT",outputCol="BIRHT_PLACE_VEC")

#create a single col which merge all feas
featuresCreator = ft.VectorAssembler(inputCols = [col[0] for col in labels[2:]] + [encoder.getOutputCol()],outputCols="features")


#create a model
import pyspark.ml.classification as cl
logistic = cl.LogisticRegression(maxiter=10,regParam=0.01,labelCol="INFANT_ALIVE_AT_REPORT")

#create a pipline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[encoder,featuresCreator,logistic])

#fit the model
births_train,births_test = births.randomSplit([0.7,0.3],seed=666)

model = pipeline.fit(births_train)
test_model = model.transform(births_test)

#evaluate the performance
import pyspark.ml.evaluation as ev
evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol="INFANT_ALIVE_AT_REPORT")
print (evaluator.evaluate(test_model,{evaluator.metricName:"areaUnderROC"}))
print (evaluator.evaluate(test_model,{evaluator.metricName:"areaUnderPR"}))

#save the pipeline
pipelinePath = "./lr_pipline"
pipeline.write().overwrite().save(pipelinePath)

#load the pipeline and predict
loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline.fit(births_train).transform(births_test)

#save the trained model  use PiplineModel
from pyspark.ml import PipelineModel
modelPath = "./lr_PipelineModel"
model.write().overwrite().save(modelPath)

loadedPipelineModel = PipelineModel.load(modelPath)
test_reloadedModel = loadedPipelineModel.transform(births_test)


#parameter optimize
#grid search
import pyspark.ml.tuning as tune
logistic = cl.LogisticRegression(labelCol = "INFANT_ALIVE_AT_REPORT")
grid = tune.ParamGridBuilder() \
    .addGrid(logistic.maxIter,[2,10,50]) \
    .addGrid(logistic.regParam,[0.01,0.05,0.3]) \
    .build()

evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol="probability",labelCol="INFANT_ALIVE_AT_REPORT")

cv = tune.CrossValidator(estimator = logistic,estimatorParamMaps = grid, evaluator = evaluator)
pipeline = Pipeline(stages=[encoder,featuresCreator])
data_transformer = pipeline.fit(births_train)
cvModel = cv.fit(data_transformer.transform(births_train))

data_test = data_transformer.transform(births_test)
results = cvModel.transform(data_test)

print (evaluator.evaluate(results,{evaluator.metricName:"areaUnderROC"}))
print (evaluator.evaluate(results,{evaluator.metricName:"areaUnderPR"}))

#extract the best model
results = [([{key.name:paramValue} for key,paramValue in zip(params.keys(),params.values())],metric) for params,metric in zip(cvModel.getEstimatorParamMaps(),cvModel.avgMetrics)]
sorted(results,key=lambda el:el[1],reverse=True)[0]


#train-validation splitting   split datasets into train and test sets
#select five features
selector = ft.ChiSqSelector(numTopFeatures=5, featuresCol=featuresCreator.getOutputCol(),outputCol="selectedFeatures", labelCol = "INFANT_ALIVE_AT_REPORT")

logistic = cl.LogisticRegression(labelCol="INFANT_ALIVE_AT_REPORT",featuresCol="selectedFeatures")
pipeline = Pipeline(stages=[encoder,featureCreator,selector])
data_transformer = pipeline.fit(births_train)
tvs = tune.TrainValidationSplit(estimator=logistic,estimatorParamMaps=grid,evaluator=evaluator)

tvsModel = tvs.fit(data_transformer.transform(births_train))
data_test = data_transformer.transform(births_test)
results = tvsModel.transform(data_test)

print (evaluator.evaluate(results,{evaluator.metricName:"areaUnderROC"}))
print (evaluator.evaluate(results,{evaluator.metricName:"areaUnderPR"}))

上一篇 下一篇

猜你喜欢

热点阅读