数据挖掘

sklearn(0.22)实用特征工程总结,整合预处理和模型,F

2020-03-20  本文已影响0人  xiaogp

最近又开始接触sklearn(0.22.1),发现sklearn已经不是以前的那个sklearn了,增加了很多数据预处理的转化器,目的就是所有数据预处理和评估器/模型整合在同一个pipeline,序列化pickle后直接读取新数据为dataframe预测,方便部署。实际上只需要pandas读取数据成dataframe格式,其他全部由sklearn搞定。


(一)sklearn特征工程接口整理

缺失值填充

from sklearn.impute import SimpleImputer

(1)简单填充,支持均值,中位数,众数填充
(2)默认填充np.nan,可以指定missing_values
(3)已经存在np.nan的情况下,无法先填充其他特定缺失值,比如?,unk等
(4)如果一列或多列有多种形式的缺失值,需要封装多个SimpleImputer到Pipeline

from sklearn.impute import KNNImputer

(1)k近邻填充,其他非空特征计算欧式距离,用空特征的那列的均值作为填充值
(2)需要把多个列(不管哪一列有缺失值)全部传入转化器中,不能有字符型变量,因为无法计算欧式距离

特征标准化

from sklearn.preprocessing import MinMaxScaler

(1)把数据映射在标准区间

特征分箱

from sklearn.preprocessing import KBinsDiscretizer

(1)分箱方式可以指定等频分箱(默认)和等距分箱,和聚类分箱
(2)分箱的结果可以是直接每个桶onehot,也可以标注自然数

离散特征编码

from sklearn.preprocessing import OneHotEncoder

(1)可以直接对字符串变量做OneHotEncoder
(2)可以指定ignore忽略出现新类别的样本(全部是0,没有1)
(3)可以手动写死每个变量的所有类别

自定义函数转化器

from sklearn.preprocessing import FunctionTransformer

(1)先定义函数,传入FunctionTransformer,fit_transform传入数据的每一列

列选择器转化

from sklearn.compose import ColumnTransformer

(1)可以将单个转化器,组合流水线指定列对象,最常见的是分为离散变量和连续变量,也可以每一列都单独指定
(2)格式是["自定义操作名", 转化器/流水线, 操作的列]

pipeline

from sklearn.pipeline import Pipeline

(1)将多个转化器/评估器组合在一起
(2)格式是["自定义操作名", 转化器/流水线]
(3)可以通过steps方法通过索引拿到Pipeline内部的对象


(二)sklearn特征工程接口代码测试

分箱:KBinsDiscretizer
连续变量离散化,增加非线性能力,更有利于特征交叉,同时增加特征的稳定性,KBinsDiscretizer参数:

n_bins:int,分箱桶数,
encode:str,分箱结果输出,"onehot"输出稀疏的onehot矩阵(默认),"ordinal"输出桶的自然数编码,"onehot-dense"输出稠密的onehot矩阵
strategy:str,分箱策略,"quantile"等频分箱,每个桶的样本数量相等(默认),"uniform"等距分箱,"kmeans"聚类分箱

from sklearn.preprocessing import KBinsDiscretizer
df = pd.read_json("ershou_suz_20200319.txt", lines=True)  # 读取数据

# 分箱之后全部onehot,采用等频分箱,输出稀疏矩阵
kbin = KBinsDiscretizer(n_bins=5, encode="onehot", strategy="quantile")
kbin.fit_transform(df["area"].values.reshape(-1 ,1)).toarray()  # 输入要是2D array
#array([[0., 1., 0., 0., 0.],
#       [0., 0., 0., 1., 0.],
#       [1., 0., 0., 0., 0.],
#       ...,
#       [0., 0., 0., 0., 1.],
#       [0., 0., 1., 0., 0.],
#       [0., 1., 0., 0., 0.]])

# 分箱之后从到大数值编码,等距分箱
kbin = KBinsDiscretizer(n_bins=5, encode="ordinal", strategy="uniform")
df["area2"] = kbin.fit_transform(df["area"].values.reshape(-1 ,1))
#array([[0.],
#       [1.],
#       [0.],
#       ...,
#       [1.],
#       [0.],
#       [0.]])

# 分箱之后全部onehot,采用等频分箱,和encode="onehot"相比不需要toarray了
kbin = KBinsDiscretizer(n_bins=5, encode="onehot-dense", strategy="quantile")
kbin.fit_transform(df["area"].values.reshape(-1 ,1))
#array([[0., 1., 0., 0., 0.],
#       [0., 0., 0., 1., 0.],
#       [1., 0., 0., 0., 0.],
#       ...,
#       [0., 0., 0., 0., 1.],
#       [0., 0., 1., 0., 0.],
#       [0., 1., 0., 0., 0.]])

# 测试新数据
new = pd.DataFrame([[50], [80], [120], [240], [500]], columns=["area"])
kbin.transform(new["area"].values.reshape(-1 ,1))
#array([[0.],
#       [0.],
#       [1.],
#       [2.],
#       [4.]])

缺失值填充:sklearn.impute
主要支持简单填充(均值,众数,中位数)和k近邻填充。

简单填充 SimpleImputer

strategy:str,填充策略,"constant"常数,此时需要指定fill_value,"mean","median","most_frequent"分别对应均值,中位数,众数,此时不需要指定fill_value
fill_value:str,int,float,当策略是"constant",需要指定
missing_values:str,int,float,np.nan,指定缺失值,默认是np.nan,也可以指定特定的缺失值,此时替换此特定值

from sklearn.impute import SimpleImputer
df = pd.DataFrame(
        {"a": [1, 0, 0, 1, 1], 
         "b": [np.nan, 0, 0, 1, 2], 
         "c": [1, 0, 1, 0, 3],
         "d": [0, 1, 0, 1, 4],
         "label": ["a", "b", "c", "c", "?"]
                })
#   a    b  c  d label
#0  1  NaN  1  0     a
#1  0  0.0  0  1     b
#2  0  0.0  1  0     c
#3  1  1.0  0  1     c
#4  1  2.0  3  4     ?

# 使用常数填充,此时需要指定fill_value
fillna = SimpleImputer(strategy='constant', fill_value=0)
df2 = fillna.fit_transform(df)
#array([[1, 0, 1, 0, 'a'],
#       [0, 0.0, 0, 1, 'b'],
#       [0, 0.0, 1, 0, 'c'],
#       [1, 1.0, 0, 1, 'c'],
#       [1, 2.0, 3, 4, '?']], dtype=object)

# 使用统计量填充,此时不需要fill_value,支持mean(默认), median, most_frequent
fillna = SimpleImputer(strategy='mean')
df2 = fillna.fit_transform(df["b"].values.reshape(-1, 1))
#array([[0.75],
#       [0.  ],
#       [0.  ],
#       [1.  ],
#       [2.  ]])

# 可以指定缺失值missing_values,默认missing_values=np.nan,可以指定是数字,字符串,np.nan,None
# 使用众数
fillna = SimpleImputer(missing_values="?", strategy='most_frequent')
df2 = fillna.fit_transform(df["label"].values.reshape(-1, 1))
#array([['a'],
#       ['b'],
#       ['c'],
#       ['c'],
#       ['c']], dtype=object)

# 替换多个缺失值
df = pd.DataFrame(
        {"a": [1, 0, 0, 1, 1], 
         "b": [np.nan, 0, 0, 1, 2], 
         "c": [1, 0, 1, 0, 3],
         "d": [0, 1, 0, 1, 4],
         "label": ["a", "b", np.nan, "c", "?"]
                })
fillna1 = SimpleImputer(strategy='constant', fill_value="unknow")
fillna2 = SimpleImputer(missing_values="?", strategy='constant', fill_value="unknow")
fillna_pipeline = Pipeline([("fillna1", fillna1), ("fillna2", fillna2)])

df2 = fillna_pipeline.fit_transform(df["label"].values.reshape(-1, 1))
#array([['a'],
#       ['b'],
#       ['unknow'],
#       ['c'],
#       ['unknow']], dtype=object)

# 配合ColumnTransform指定填充列,和不同的填充的方式
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
cat_cols = ['a', 'b']
cin_cols = ['c', 'd']
df = pd.DataFrame(
        {"a": [1, 0, 0, 1, 0], 
         "b": [np.nan, "a", "v", "a", "c"], 
         "c": [1, 0, np.nan, 0, 1],
         "d": [0, 1, 0, 1, 0],
         "label": [1, 0, 1, 0, 0]
                }
        )
fillna1 = SimpleImputer(strategy='most_frequent')
fillna2 = SimpleImputer(strategy='mean')
preprocessor = ColumnTransformer(
    transformers=[
        ('cat', fillna1, cat_cols),
        ('num', fillna2, cin_cols)])   # 这个流水线可以继续封装模型
df3 = preprocessor.fit_transform(df)
#array([[1, 'a', 1.0, 0.0],
#       [0, 'a', 0.0, 1.0],
#       [0, 'v', 0.5, 0.0],
#       [1, 'a', 0.0, 1.0],
#       [0, 'c', 1.0, 0.0]], dtype=object)

k近邻缺失值填充:KNNImputer
k近邻策略会对缺失特征的这条样本进行近邻搜索,通过其他存在的特征计算样本间欧式距离,取最近的topN条样本的该缺失特征的均值作为填充值。

n_neighbors:int,k近邻样本数
weights:str,距离权重,"uniform"权重相等(默认),"distance"距离越近的样本,权重越大

# KNNImputer k近邻填充
# 找到最相似的n条样本,,用这些样本的这个非空特征的均值填充
# 只能填充数值特征
from sklearn.impute import KNNImputer
df = pd.DataFrame(
        {"a": [1, 2, 0, 1, 0, 2, 5, 9, 103, np.nan, 102, 999, 5, 2], 
         "b": ["a", "v", "a", "c", "a", "v", "a", "c", "a", "c", "a", "v", "a", "c"], 
         "c": [10, 12, 98, 25, 103, 102, 89, 55, 43, -1, 67, 89, 133, 252]
                })
#        a  b    c
#0     1.0  a   10
#1     2.0  v   12
#2     0.0  a   98
#3     1.0  c   25
#4     0.0  a  103
#5     2.0  v  102
#6     5.0  a   89
#7     9.0  c   55
#8   103.0  a   43
#9     NaN  c   -1
#10  102.0  a   67
#11  999.0  v   89
#12    5.0  a  133
#13    2.0  c  252

knnfillna = KNNImputer(n_neighbors=2, weights="uniform")
# 会比较除了a特征外的其他特征的相似度,这个例子中比较
# np.nan对应的C特征是-1,-1的两个近邻是10,12,均值是(1 + 2) / 2 = 1.5
knnfillna.fit_transform(df[["a", "c"]])
#array([[  1. ,  10. ],
#       [  2. ,  12. ],
#       [  0. ,  98. ],
#       [  1. ,  25. ],
#       [  0. , 103. ],
#       [  2. , 102. ],
#       [  5. ,  89. ],
#       [  9. ,  55. ],
#       [103. ,  43. ],
#       [  1.5,  -1. ],
#       [102. ,  67. ],
#       [999. ,  89. ],
#       [  5. , 133. ],
#       [  2. , 252. ]])

knnfillna.fit_transform(df[["a", "b"]])
# could not convert string to float: 'a'
# 报错,所以进入fit的特征必须全部是数值,否则无法计算欧式距离

列选择器:ColumnTransformer
ColumnTransformer用于指定不同的转换器或者pipeline,分别作用于同一个数据集的不同列,实现对同一个数据机集所有特征的分而治之。

transformers:("自定义操作名", 转化器/流水线, 操作的列)
注意:ColumnTransformer中列不能重复指定,常规的是对连续变量统一一个操作,对离散变量同意一个操作,并且都只操作一次,如果离散变量或者连续变量不能通过一次变换就满足要求,则使用pipeline打包为一个完整操作,再装入ColumnTransformer中。

# knn缺失值填充之后,整合离散变量onehot和连续变量分箱onehot
from sklearn.impute import KNNImputer
from sklearn.preprocessing import OneHotEncoder, KBinsDiscretizer, MinMaxScaler
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer

df = pd.DataFrame(
        {"a": [1, 2, 0, 1, 0, 2, 5, 9, 103, np.nan, 102, 999, 5, 2], 
         "b": ["a", "v", "a", "c", "a", "v", "a", "c", "a", "c", "a", "v", "a", "c"], 
         "c": [10, 12, 98, 25, np.nan, 102, 89, 55, 43, -1, 67, 89, 133, 252]
                })
continus_cols = ["a", "c"]
category_cols = ["b"]

# 连续变量处理单独是一个pipeline
num_pipeline = Pipeline([
        ('continus_cols_knnfillna', KNNImputer(n_neighbors=2, weights="uniform")),
        ('continus_cols_cut', KBinsDiscretizer(n_bins=3, encode="onehot", strategy="quantile"))
    ])
                
# ColumnTransformer 中的列不能重复指定,比如指定两次continus_cols,这样会重新调用continus_cols生成新的特征,操作断开了 
preprocessor = ColumnTransformer(
    transformers=[
        ('num_pipeline', num_pipeline, continus_cols),
        ('category_cols_onehot', OneHotEncoder(handle_unknown='ignore'), category_cols)
        ])
                
preprocessor.fit_transform(df)
# 结果是先用knn填充缺失值,再各自分箱3段,加上onehot的3段,一个9个特征
#array([[1., 0., 0., 1., 0., 0., 1., 0., 0.],
#       [0., 1., 0., 1., 0., 0., 0., 0., 1.],
#       [1., 0., 0., 0., 0., 1., 1., 0., 0.],
#       [1., 0., 0., 1., 0., 0., 0., 1., 0.],
#       [1., 0., 0., 0., 1., 0., 1., 0., 0.],
#       [0., 1., 0., 0., 0., 1., 0., 0., 1.],
#       [0., 0., 1., 0., 0., 1., 1., 0., 0.],
#       [0., 0., 1., 0., 1., 0., 0., 1., 0.],
#       [0., 0., 1., 1., 0., 0., 1., 0., 0.],
#       [1., 0., 0., 1., 0., 0., 0., 1., 0.],
#       [0., 0., 1., 0., 1., 0., 1., 0., 0.],
#       [0., 0., 1., 0., 0., 1., 0., 0., 1.],
#       [0., 0., 1., 0., 0., 1., 1., 0., 0.],
#       [0., 1., 0., 0., 0., 1., 0., 1., 0.]])

# 再一个例子
# 连续变量用均值填充+MinMaxScaler
# 离散变量onehot
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import MinMaxScaler
from sklearn.pipeline import Pipeline

num_pipeline = Pipeline([
        ('continus_cols_meanfillna', SimpleImputer(strategy='mean')),
        ('continus_cols_minmax', MinMaxScaler())
    ])
                
# ColumnTransformer 中的类不能重复指定,比如指定两次continus_cols,这样会重新调用continus_cols生成新的特征,操作断开了 
preprocessor = ColumnTransformer(
    transformers=[
        ('num_pipeline', num_pipeline, continus_cols),
        ('category_cols_onehot', OneHotEncoder(handle_unknown='ignore'), category_cols)
        ])

#array([[0.001001  , 0.04347826, 1.        , 0.        , 0.        ],
#       [0.002002  , 0.0513834 , 0.        , 0.        , 1.        ],
#       [0.        , 0.39130435, 1.        , 0.        , 0.        ],
#       [0.001001  , 0.1027668 , 0.        , 1.        , 0.        ],
#       [0.        , 0.30009121, 1.        , 0.        , 0.        ],
#       [0.002002  , 0.40711462, 0.        , 0.        , 1.        ],
#       [0.00500501, 0.35573123, 1.        , 0.        , 0.        ],
#       [0.00900901, 0.22134387, 0.        , 1.        , 0.        ],
#       [0.1031031 , 0.17391304, 1.        , 0.        , 0.        ],
#       [0.09478709, 0.        , 0.        , 1.        , 0.        ],
#       [0.1021021 , 0.2687747 , 1.        , 0.        , 0.        ],
#       [1.        , 0.35573123, 0.        , 0.        , 1.        ],
#       [0.00500501, 0.52964427, 1.        , 0.        , 0.        ],
#       [0.002002  , 1.        , 0.        , 1.        , 0.        ]])
                
preprocessor.fit_transform(df)

ColumnTransformer中使用Pipeline,对离散变量先填充缺失值再onehot为一个整体pipeline,在使用ColumnTransformer和连续变量一起包装起来。

from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder

df = pd.DataFrame(
        {"a": [1, 2, 0, 1, 0, 2, 5, 9, 103, 22, 102, 999, 5, 2], 
         "b": ["a", "v", "a", "c", "a", np.nan, "a", "c", "a", "c", "a", "v", "a", "c"]
         })
continus_cols = ["a"]
category_cols = ["b"]

cat_pipeline = Pipeline([
        ('cat_cols_fillna', SimpleImputer(strategy='constant', fill_value="unknown")), 
        ('cat_cols_onehot', OneHotEncoder(handle_unknown='ignore'))
        ])

preprocessor = ColumnTransformer(
    transformers=[
        ('cat_pipeline', cat_pipeline, category_cols),
        ('category_cols_onehot', SimpleImputer(strategy='constant', fill_value=0), continus_cols)
        ])

res = preprocessor.fit_transform(df)

ColumnTransformer只会对指定的列进行操作, 没有指定的列直接忽略不进入结果,如果有列不进行任何操作,可以使用自定义转换器返回自己

preprocessor = ColumnTransformer(
            transformers=[
                    ("none", FunctionTransformer(), continus_cols), 
                    ("onehot", OneHotEncoder(handle_unknown='ignore'), category_cols)
                    ])
train = preprocessor.fit_transform(df[continus_cols + category_cols])

自定义函数转化器:FunctionTransformer
自定义一个函数,作用于数据集的一列或者多列

func:自定义函数

# 自定义函数转化器 FunctionTransformer
from sklearn.preprocessing import FunctionTransformer

df = pd.DataFrame([[1, 2], [3, 2], [9, 2]], columns=["a", "b"])
#   a  b
#0  1  2
#1  3  2
#2  9  2

def myfunc(x):
    return x + 1

FunctionTransformer(myfunc).fit_transform(df)
#    a  b
#0   2  3
#1   4  3
#2  10  3

# 对Y做log(1+)变换
FunctionTransformer(log1p).fit_transform(iris.data)

流水线Pipeline:Pipeline
所有对数据的转化,评估操作最终被包在一个Pipeline中,通过steps=[ ]的来指定顺序,同样可以调用steps和索引拿到流水线的某个数据处理部分,比如将已经训练好的Pipeline拿到评估器XGBClassifier进行绘图

clf = Pipeline(steps=[("preprocessor", preprocessor), ("classifier", XGBClassifier())])
clf.fit(train, train["label"])
# 拿到xgb
model = clf.steps[1][1]
xgboost.plot_importance(model, max_num_features=10)

(三)整体模型部署demo

一个整体pipeline在线部署预测模型的demo,传入的数据必须还是dataframe格式,使用pandas的read_json得到数据源,模型pickle文件使用flask部署web server,采用restful接口部署模型,POST请求得到预测结果

训练用户流失模型,特征工程采用sklearn,模型采用xgboost的sklearn接口,ColumnTransformer完成特征预处理,Pipeline完成预处理和模型的整合,整体输出pickle文件。

import pickle

import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder
from xgboost.sklearn import XGBClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, recall_score, precision_score, roc_auc_score


continus_cols = ["shop_duration", "recent", "monetary", "max_amount", "items_count", 
                 "valid_points_sum", "member_day", "frequence", "avg_amount", "item_count_turn", 
                 "avg_piece_amount", "monetary3","max_amount3", "items_count3", 
                 "frequence3", "shops_count", "promote_percent", "wxapp_diff", "store_diff", 
                 "week_percent"]

category_cols = ["CHANNEL_NUM_ID", "shop_channel", "infant_group", "water_product_group", 
                 "meat_group", "beauty_group", "health_group", "fruits_group", "vegetables_group", 
                 "pets_group", "snacks_group", "smoke_group", "milk_group", "instant_group", 
                 "grain_group"]


if __name__ == "__main__":
    df = pd.read_csv("./data/churn.csv")
    train, test = train_test_split(df, test_size=0.2)
    
    # 整体组装
    preprocessor = ColumnTransformer(
            transformers=[
                    ("0_fillna", SimpleImputer(strategy='constant', fill_value=0), continus_cols), 
                    ("onehot", OneHotEncoder(handle_unknown='ignore'), category_cols)
                    ])
    clf = Pipeline(steps=[("preprocessor", preprocessor), 
                          ("classifier", XGBClassifier(max_depth=8, n_estimators=100, n_jods=3))])
    
    # 训练
    clf.fit(train[continus_cols + category_cols], train["label"])
    pickle.dump(clf, open("./churn_xgb.model", "wb"))
    
    # 预测
    model = pickle.load(open("./churn_xgb.model", "rb"))
    predictions = model.predict(test[continus_cols + category_cols])
    predict_proba = model.predict_proba(test[continus_cols + category_cols])[:, 1]
    
    # 模型评价
    print("acc:", accuracy_score(test["label"], predictions))
    print("pri:", precision_score(test["label"], predictions))
    print("rec:", recall_score(test["label"], predictions))
    print("auc:", roc_auc_score(test["label"], predict_proba))

flask框架编写churn_xgb_server.py,读取模型模型pickle文件部署restful接口,特征入参json格式,通过pd.read_json转化为dataframe。

from flask import Flask, jsonify, request
import pickle
import pandas as pd
import json

app = Flask(__name__)


@app.route('/predict', methods=['POST'])
def apicall():
    try:
        test_json = request.get_json(force=True)
        test = pd.read_json(json.dumps(test_json), orient='records')  # reconds格式,转化为dataframe数据格式
        loan_ids = test['USR_NUM_ID']

    except Exception as e:
        raise e

    clf = 'churn_xgb.model'

    if test.empty:
        return (bad_request())
    else:
        print("Loading the model...")
        loaded_model = None
        with open('/Users/gengpeng/' + clf, 'rb') as f:
            loaded_model = pickle.load(f)

        print("The model has been loaded...doing predictions now...")
        predictions = loaded_model.predict_proba(test)[:, 1]
        prediction_series = list(pd.Series(predictions))
        res = dict(zip(loan_ids, prediction_series))
        responses = jsonify(predictions=res)
        responses.status_code = 200

        return (responses)


@app.errorhandler(400)
def bad_request(error=None):
    message = {
        'status': 400,
        'message': 'Bad Request: ' + request.url + '--> Please check your data payload...',
    }
    resp = jsonify(message)
    resp.status_code = 400

    return resp


if __name__ == '__main__':
    app.run()

启动flask web server

python churn_xgb_server.py

postman接口测试,以json格式传入特征,保证pd.read_json读进来的dataframe和训练数据结构一致。

postman测试sklearn模型接口.png
代码整理在 https://github.com/xiaogp/customer_churn_prediction/tree/master/GBDT/python
上一篇下一篇

猜你喜欢

热点阅读