Spark mllib

2019-04-26  本文已影响0人  枫隐_5f5f

mllib 包括三个核心机器学习功能

1 数据准备

特征提取 变换 选择 分类特征的散列 自然语言处理

2 机器学习算法

回归 分类 聚类

3 实用程序

统计方法 、如描述性统计、卡方检验 、 线性代数和模型评估方法

MLLIB的数据结构是labeledPoint RDD 用于训练模型 由两个数据组成 标签和特征

from pyspark.sql import SparkSession
from pyspark.sql.types import *
import sys
spark = SparkSession.builder \
    .appName("TestName") \
    .getOrCreate()

labels = [
    ('INFANT_ALIVE_AT_REPORT', StringType()),
    ('BIRTH_YEAR', IntegerType()),
    ('BIRTH_MONTH', IntegerType()),
    ('BIRTH_PLACE', StringType()),
    ('MOTHER_AGE_YEARS', IntegerType()),
    ('MOTHER_RACE_6CODE', StringType()),
    ('MOTHER_EDUCATION', StringType()),
    ('FATHER_COMBINED_AGE', IntegerType()),
    ('FATHER_EDUCATION', StringType()),
    ('MONTH_PRECARE_RECODE', StringType()),
    ('CIG_BEFORE', IntegerType()),
    ('CIG_1_TRI', IntegerType()),
    ('CIG_2_TRI', IntegerType()),
    ('CIG_3_TRI', IntegerType()),
    ('MOTHER_HEIGHT_IN', IntegerType()),
    ('MOTHER_BMI_RECODE', IntegerType()),
    ('MOTHER_PRE_WEIGHT', IntegerType()),
    ('MOTHER_DELIVERY_WEIGHT', IntegerType()),
    ('MOTHER_WEIGHT_GAIN', IntegerType()),
    ('DIABETES_PRE', StringType()),
    ('DIABETES_GEST', StringType()),
    ('HYP_TENS_PRE', StringType()),
    ('HYP_TENS_GEST', StringType()),
    ('PREV_BIRTH_PRETERM', StringType()),
    ('NO_RISK', StringType()),
    ('NO_INFECTIONS_REPORTED', StringType()),
    ('LABOR_IND', StringType()),
    ('LABOR_AUGM', StringType()),
    ('STEROIDS', StringType()),
    ('ANTIBIOTICS', StringType()),
    ('ANESTHESIA', StringType()),
    ('DELIV_METHOD_RECODE_COMB', StringType()),
    ('ATTENDANT_BIRTH', StringType()),
    ('APGAR_5', IntegerType()),
    ('APGAR_5_RECODE', StringType()),
    ('APGAR_10', IntegerType()),
    ('APGAR_10_RECODE', StringType()),
    ('INFANT_SEX', StringType()),
    ('OBSTETRIC_GESTATION_WEEKS', IntegerType()),
    ('INFANT_WEIGHT_GRAMS', IntegerType()),
    ('INFANT_ASSIST_VENTI', StringType()),
    ('INFANT_ASSIST_VENTI_6HRS', StringType()),
    ('INFANT_NICU_ADMISSION', StringType()),
    ('INFANT_SURFACANT', StringType()),
    ('INFANT_ANTIBIOTICS', StringType()),
    ('INFANT_SEIZURES', StringType()),
    ('INFANT_NO_ABNORMALITIES', StringType()),
    ('INFANT_ANCEPHALY', StringType()),
    ('INFANT_MENINGOMYELOCELE', StringType()),
    ('INFANT_LIMB_REDUCTION', StringType()),
    ('INFANT_DOWN_SYNDROME', StringType()),
    ('INFANT_SUSPECTED_CHROMOSOMAL_DISORDER', StringType()),
    ('INFANT_NO_CONGENITAL_ANOMALIES_CHECKED', StringType()),
    ('INFANT_BREASTFED', StringType())
]
schema = StructType([StructField(e[0],e[1],False) for e in labels])

births = spark.read.csv("file:///home/njliu/prc/pyspark/05/births_train.csv.gz",header=True,schema=schema)

selected_feas = ["INFANT_ALIVE_AT_REPORT",
        "BIRTH_PLACE",
        "MOTHER_AGE_YEARS",
        "FATHER_COMBINED_AGE",
        "CIG_BEFORE",    
        "CIG_1_TRI",
        "CIG_2_TRI",
        "CIG_3_TRI",
        "MOTHER_HEIGHT_IN",    
        "MOTHER_PRE_WEIGHT",
        "MOTHER_DELIVERY_WEIGHT",
        "MOTHER_WEIGHT_GAIN",
        "DIABETES_PRE",
        "DIABETES_GEST",
        "HYP_TENS_PRE",
        "HYP_TENS_GEST",
        "PREV_BIRTH_PRETERM",
        ]

births_trim = births.select(selected_feas)

recode_dictionary = {"YNU":{"Y":1,"N":0,"U":0}}

import pyspark.sql.functions as func

def recode(col,key):
    return recode_dictionary[key][col]    

def correct_cig(feat):
    return func.when(func.col(feat) != 99, func.col(feat)).otherwise(0)


#transform the function recode into UDF which could be used in spark
rec_integer = func.udf(recode,IntegerType())    

births_transformed = births_trim.withColumn("CIG_BEFORE",correct_cig("CIG_BEFORE"))  \
    .withColumn("CIG_1_TRI",correct_cig("CIG_1_TRI")) \
    .withColumn("CIG_2_TRI",correct_cig("CIG_2_TRI")) \
    .withColumn("CIG_3_TRI",correct_cig("CIG_3_TRI"))

cols = [(col.name, col.dataType) for col in births_trim.schema]
YNU_cols = []
for i,s in enumerate(cols):
    if s[1] == StringType():
        dis = births.select(s[0]).distinct().rdd.map(lambda x:x[0]).collect()
        if "Y" in dis:
            YNU_cols.append(s[0])

exprs_YNU = [rec_integer(x,func.lit("YNU")).alias(x) if x in YNU_cols else x for x in births_transformed.columns]    

births_transformed = births_transformed.select(exprs_YNU)
#births_transformed.select(YNU_cols[-5:]).show(5)

#summary feas  colStats
import pyspark.mllib.stat as st
import numpy as np
numerical_cols = ["MOTHER_AGE_YEARS","FATHER_COMBINED_AGE","CIG_BEFORE","CIG_1_TRI","CIG_2_TRI","CIG_3_TRI","MOTHER_HEIGHT_IN","MOTHER_PRE_WEIGHT","MOTHER_DELIVERY_WEIGHT","MOTHER_WEIGHT_GAIN"]

numeric_rdd = births_transformed.select(numerical_cols).rdd.map(lambda row:[e for e in row])
mllib_stats = st.Statistics.colStats(numeric_rdd)
for col,m,v in zip(numerical_cols,mllib_stats.mean(),mllib_stats.variance()):
    print ("{0}:\t{1:.2f}\t{2:.2f}".format(col,m,np.sqrt(v)))

#calc categorical variables
categorical_cols = [e for e in births_transformed.columns if e not in numerical_cols]
categorical_rdd = births_transformed.select(categorical_cols).rdd.map(lambda row:[e for e in row])

#for i,col in enumerate(categorical_cols):
#    agg = categorical_rdd.groupBy(lambda row:row[i]).map(lambda row:(row[0],len(row[1])))
#    print (col,sorted(agg.collect(),key=lambda el:el[1],reverse=True))

#numerical feas correlation
corrs = st.Statistics.corr(numeric_rdd)
print (corrs)
for i,e in enumerate(corrs > 0.5):
    correlated = [(numerical_cols[j],corrs[i][j]) for j,e in enumerate(e) if e == 1.0 and j != i]
    if len(correlated) > 0:
        for e in correlated:
            print ("{0}-to-{1}:{2:.2f}".format(numerical_cols[i],e[0],e[1]))

features_to_keep = [
    'INFANT_ALIVE_AT_REPORT',
    'BIRTH_PLACE',
    'MOTHER_AGE_YEARS',
    'FATHER_COMBINED_AGE',
    'CIG_1_TRI',
    'MOTHER_HEIGHT_IN',
    'MOTHER_PRE_WEIGHT',
    'DIABETES_PRE',
    'DIABETES_GEST',
    'HYP_TENS_PRE',
    'HYP_TENS_GEST',
    'PREV_BIRTH_PRETERM'
]

births_transformed = births_transformed.select([e for e in features_to_keep])

#statistical analysis
#chiSqTest  feas and feas correlation for categorical variables
import pyspark.mllib.linalg as ln
for cat in categorical_cols[1:]:
    agg = births_transformed.groupby("INFANT_ALIVE_AT_REPORT") \
        .pivot(cat) \
        .count()
    agg_rdd = agg.rdd.map(lambda row:(row[1:])).flatMap(lambda row:[0 if e == None else e for e in row]).collect()
    row_length = len(agg.collect()[0]) - 1
    agg = ln.Matrices.dense(row_length,2,agg_rdd)
    test = st.Statistics.chiSqTest(agg)
    print (cat,round(test.pValue,4))




#create dataset for model predict
#translate dataframe to LabeledPoint RDD
import pyspark.mllib.feature as ft
import pyspark.mllib.regression as reg
hashing = ft.HashingTF(7)
births_hashed = births_transformed.rdd.map(lambda row:[list(hashing.transform(row[1]).toArray()) if col == "BIRTH_PLACE" else row[i] for i,col in enumerate(features_to_keep)]) \
    .map(lambda row:[[e] if type(e) == int else e for e in row]) \
    .map(lambda row:[item for sublist in row for item in sublist]) \
    .map(lambda row:reg.LabeledPoint(row[0],ln.Vectors.dense(row[1:])))

#split train and test dataset
births_train,births_test = births_hashed.randomSplit([0.6,0.4])

#LR predict
from pyspark.mllib.classification import LogisticRegressionWithLBFGS as LR
lr_model = LR.train(births_train,iterations=10)

lr_results = (births_test.map(lambda row:row.label) \
        .zip(lr_model.predict(births_test.map(lambda row:row.features)))).map(lambda row:(row[0],row[1] * 1.0))

#evaluate
import pyspark.mllib.evaluation as ev
lr_ev = ev.BinaryClassificationMetrics(lr_results)
print ("Area under PR:{}".format(lr_ev.areaUnderPR))
print ("Area under ROC: {}".format(lr_ev.areaUnderROC))

#feature selection with chi-square
selector = ft.ChiSqSelector(4).fit(births_train)
topFeatures_train = (
    births_train.map(lambda row:row.label) \
    .zip(selector.transform(births_train.map(lambda row:row.features)))

).map(lambda row:reg.LabeledPoint(row[0],row[1]))

topFeatures_test = (
    births_test.map(lambda row:row.label) \
    .zip(selector.transform(births_test.map(lambda row:row.features)))

).map(lambda row:reg.LabeledPoint(row[0],row[1]))


#random forest model
from pyspark.mllib.tree import RandomForest
rf_model = RandomForest.trainClassifier(data=topFeatures_train,
                numClasses=2,
                categoricalFeaturesInfo={},
                numTrees=6,
                featureSubsetStrategy="all",seed=666)

rf_results = (topFeatures_test.map(lambda row:row.label) \
        .zip(rf_model.predict(topFeatures_test.map(lambda row:row.features))))


rf_ev = ev.BinaryClassificationMetrics(rf_results)
print ("Area under PR:{}".format(rf_ev.areaUnderPR))
print ("Area under ROC: {}".format(rf_ev.areaUnderROC))

上一篇下一篇

猜你喜欢

热点阅读