Spark mllib
2019-04-26 本文已影响0人
枫隐_5f5f
mllib 包括三个核心机器学习功能
1 数据准备
特征提取 变换 选择 分类特征的散列 自然语言处理
2 机器学习算法
回归 分类 聚类
3 实用程序
统计方法 、如描述性统计、卡方检验 、 线性代数和模型评估方法
MLLIB的数据结构是labeledPoint RDD 用于训练模型 由两个数据组成 标签和特征
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import sys
spark = SparkSession.builder \
.appName("TestName") \
.getOrCreate()
labels = [
('INFANT_ALIVE_AT_REPORT', StringType()),
('BIRTH_YEAR', IntegerType()),
('BIRTH_MONTH', IntegerType()),
('BIRTH_PLACE', StringType()),
('MOTHER_AGE_YEARS', IntegerType()),
('MOTHER_RACE_6CODE', StringType()),
('MOTHER_EDUCATION', StringType()),
('FATHER_COMBINED_AGE', IntegerType()),
('FATHER_EDUCATION', StringType()),
('MONTH_PRECARE_RECODE', StringType()),
('CIG_BEFORE', IntegerType()),
('CIG_1_TRI', IntegerType()),
('CIG_2_TRI', IntegerType()),
('CIG_3_TRI', IntegerType()),
('MOTHER_HEIGHT_IN', IntegerType()),
('MOTHER_BMI_RECODE', IntegerType()),
('MOTHER_PRE_WEIGHT', IntegerType()),
('MOTHER_DELIVERY_WEIGHT', IntegerType()),
('MOTHER_WEIGHT_GAIN', IntegerType()),
('DIABETES_PRE', StringType()),
('DIABETES_GEST', StringType()),
('HYP_TENS_PRE', StringType()),
('HYP_TENS_GEST', StringType()),
('PREV_BIRTH_PRETERM', StringType()),
('NO_RISK', StringType()),
('NO_INFECTIONS_REPORTED', StringType()),
('LABOR_IND', StringType()),
('LABOR_AUGM', StringType()),
('STEROIDS', StringType()),
('ANTIBIOTICS', StringType()),
('ANESTHESIA', StringType()),
('DELIV_METHOD_RECODE_COMB', StringType()),
('ATTENDANT_BIRTH', StringType()),
('APGAR_5', IntegerType()),
('APGAR_5_RECODE', StringType()),
('APGAR_10', IntegerType()),
('APGAR_10_RECODE', StringType()),
('INFANT_SEX', StringType()),
('OBSTETRIC_GESTATION_WEEKS', IntegerType()),
('INFANT_WEIGHT_GRAMS', IntegerType()),
('INFANT_ASSIST_VENTI', StringType()),
('INFANT_ASSIST_VENTI_6HRS', StringType()),
('INFANT_NICU_ADMISSION', StringType()),
('INFANT_SURFACANT', StringType()),
('INFANT_ANTIBIOTICS', StringType()),
('INFANT_SEIZURES', StringType()),
('INFANT_NO_ABNORMALITIES', StringType()),
('INFANT_ANCEPHALY', StringType()),
('INFANT_MENINGOMYELOCELE', StringType()),
('INFANT_LIMB_REDUCTION', StringType()),
('INFANT_DOWN_SYNDROME', StringType()),
('INFANT_SUSPECTED_CHROMOSOMAL_DISORDER', StringType()),
('INFANT_NO_CONGENITAL_ANOMALIES_CHECKED', StringType()),
('INFANT_BREASTFED', StringType())
]
schema = StructType([StructField(e[0],e[1],False) for e in labels])
births = spark.read.csv("file:///home/njliu/prc/pyspark/05/births_train.csv.gz",header=True,schema=schema)
selected_feas = ["INFANT_ALIVE_AT_REPORT",
"BIRTH_PLACE",
"MOTHER_AGE_YEARS",
"FATHER_COMBINED_AGE",
"CIG_BEFORE",
"CIG_1_TRI",
"CIG_2_TRI",
"CIG_3_TRI",
"MOTHER_HEIGHT_IN",
"MOTHER_PRE_WEIGHT",
"MOTHER_DELIVERY_WEIGHT",
"MOTHER_WEIGHT_GAIN",
"DIABETES_PRE",
"DIABETES_GEST",
"HYP_TENS_PRE",
"HYP_TENS_GEST",
"PREV_BIRTH_PRETERM",
]
births_trim = births.select(selected_feas)
recode_dictionary = {"YNU":{"Y":1,"N":0,"U":0}}
import pyspark.sql.functions as func
def recode(col,key):
return recode_dictionary[key][col]
def correct_cig(feat):
return func.when(func.col(feat) != 99, func.col(feat)).otherwise(0)
#transform the function recode into UDF which could be used in spark
rec_integer = func.udf(recode,IntegerType())
births_transformed = births_trim.withColumn("CIG_BEFORE",correct_cig("CIG_BEFORE")) \
.withColumn("CIG_1_TRI",correct_cig("CIG_1_TRI")) \
.withColumn("CIG_2_TRI",correct_cig("CIG_2_TRI")) \
.withColumn("CIG_3_TRI",correct_cig("CIG_3_TRI"))
cols = [(col.name, col.dataType) for col in births_trim.schema]
YNU_cols = []
for i,s in enumerate(cols):
if s[1] == StringType():
dis = births.select(s[0]).distinct().rdd.map(lambda x:x[0]).collect()
if "Y" in dis:
YNU_cols.append(s[0])
exprs_YNU = [rec_integer(x,func.lit("YNU")).alias(x) if x in YNU_cols else x for x in births_transformed.columns]
births_transformed = births_transformed.select(exprs_YNU)
#births_transformed.select(YNU_cols[-5:]).show(5)
#summary feas colStats
import pyspark.mllib.stat as st
import numpy as np
numerical_cols = ["MOTHER_AGE_YEARS","FATHER_COMBINED_AGE","CIG_BEFORE","CIG_1_TRI","CIG_2_TRI","CIG_3_TRI","MOTHER_HEIGHT_IN","MOTHER_PRE_WEIGHT","MOTHER_DELIVERY_WEIGHT","MOTHER_WEIGHT_GAIN"]
numeric_rdd = births_transformed.select(numerical_cols).rdd.map(lambda row:[e for e in row])
mllib_stats = st.Statistics.colStats(numeric_rdd)
for col,m,v in zip(numerical_cols,mllib_stats.mean(),mllib_stats.variance()):
print ("{0}:\t{1:.2f}\t{2:.2f}".format(col,m,np.sqrt(v)))
#calc categorical variables
categorical_cols = [e for e in births_transformed.columns if e not in numerical_cols]
categorical_rdd = births_transformed.select(categorical_cols).rdd.map(lambda row:[e for e in row])
#for i,col in enumerate(categorical_cols):
# agg = categorical_rdd.groupBy(lambda row:row[i]).map(lambda row:(row[0],len(row[1])))
# print (col,sorted(agg.collect(),key=lambda el:el[1],reverse=True))
#numerical feas correlation
corrs = st.Statistics.corr(numeric_rdd)
print (corrs)
for i,e in enumerate(corrs > 0.5):
correlated = [(numerical_cols[j],corrs[i][j]) for j,e in enumerate(e) if e == 1.0 and j != i]
if len(correlated) > 0:
for e in correlated:
print ("{0}-to-{1}:{2:.2f}".format(numerical_cols[i],e[0],e[1]))
features_to_keep = [
'INFANT_ALIVE_AT_REPORT',
'BIRTH_PLACE',
'MOTHER_AGE_YEARS',
'FATHER_COMBINED_AGE',
'CIG_1_TRI',
'MOTHER_HEIGHT_IN',
'MOTHER_PRE_WEIGHT',
'DIABETES_PRE',
'DIABETES_GEST',
'HYP_TENS_PRE',
'HYP_TENS_GEST',
'PREV_BIRTH_PRETERM'
]
births_transformed = births_transformed.select([e for e in features_to_keep])
#statistical analysis
#chiSqTest feas and feas correlation for categorical variables
import pyspark.mllib.linalg as ln
for cat in categorical_cols[1:]:
agg = births_transformed.groupby("INFANT_ALIVE_AT_REPORT") \
.pivot(cat) \
.count()
agg_rdd = agg.rdd.map(lambda row:(row[1:])).flatMap(lambda row:[0 if e == None else e for e in row]).collect()
row_length = len(agg.collect()[0]) - 1
agg = ln.Matrices.dense(row_length,2,agg_rdd)
test = st.Statistics.chiSqTest(agg)
print (cat,round(test.pValue,4))
#create dataset for model predict
#translate dataframe to LabeledPoint RDD
import pyspark.mllib.feature as ft
import pyspark.mllib.regression as reg
hashing = ft.HashingTF(7)
births_hashed = births_transformed.rdd.map(lambda row:[list(hashing.transform(row[1]).toArray()) if col == "BIRTH_PLACE" else row[i] for i,col in enumerate(features_to_keep)]) \
.map(lambda row:[[e] if type(e) == int else e for e in row]) \
.map(lambda row:[item for sublist in row for item in sublist]) \
.map(lambda row:reg.LabeledPoint(row[0],ln.Vectors.dense(row[1:])))
#split train and test dataset
births_train,births_test = births_hashed.randomSplit([0.6,0.4])
#LR predict
from pyspark.mllib.classification import LogisticRegressionWithLBFGS as LR
lr_model = LR.train(births_train,iterations=10)
lr_results = (births_test.map(lambda row:row.label) \
.zip(lr_model.predict(births_test.map(lambda row:row.features)))).map(lambda row:(row[0],row[1] * 1.0))
#evaluate
import pyspark.mllib.evaluation as ev
lr_ev = ev.BinaryClassificationMetrics(lr_results)
print ("Area under PR:{}".format(lr_ev.areaUnderPR))
print ("Area under ROC: {}".format(lr_ev.areaUnderROC))
#feature selection with chi-square
selector = ft.ChiSqSelector(4).fit(births_train)
topFeatures_train = (
births_train.map(lambda row:row.label) \
.zip(selector.transform(births_train.map(lambda row:row.features)))
).map(lambda row:reg.LabeledPoint(row[0],row[1]))
topFeatures_test = (
births_test.map(lambda row:row.label) \
.zip(selector.transform(births_test.map(lambda row:row.features)))
).map(lambda row:reg.LabeledPoint(row[0],row[1]))
#random forest model
from pyspark.mllib.tree import RandomForest
rf_model = RandomForest.trainClassifier(data=topFeatures_train,
numClasses=2,
categoricalFeaturesInfo={},
numTrees=6,
featureSubsetStrategy="all",seed=666)
rf_results = (topFeatures_test.map(lambda row:row.label) \
.zip(rf_model.predict(topFeatures_test.map(lambda row:row.features))))
rf_ev = ev.BinaryClassificationMetrics(rf_results)
print ("Area under PR:{}".format(rf_ev.areaUnderPR))
print ("Area under ROC: {}".format(rf_ev.areaUnderROC))