xgboost+LR
2019-11-21 本文已影响0人
一个菜鸟的自我修养
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import sys
import subprocess
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd
from pyspark import SparkConf
from pyspark.sql import SparkSession
from sklearn.model_selection import train_test_split
conf1 = SparkConf().setAppName("101") \
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.set("spark.sql.shuffle.partitions", "100") \
.set("spark.sql.autoBroadcastJoinThreshold", "100485760") \
.set("spark.sql.inMemoryColumnarStorage.compressed", "true") \
.set("spark.shuffle.file.buffer", "128k") \
.set("reducer.maxSizeInFlight", "96m").set("hive.exec.dynamic.partition.mode", "nonstrict")
spark = SparkSession.builder.config(conf=conf1).enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel('error')
从HIVE表中读取数据并且进行采样,再进行数据的拼接
data = spark.sql("select * from tmp.tmp_shop_feature_tag where shop_id <>-1 and label<>-1")
df = data.toPandas() # j将sparkdataframe转成pandas的dataframe
all_feature = list(df.columns.values) # 得到feature列
# 按正负样本的比例划分训练集和测试集
print(df[:1])
data_pos = df[df['label'] == 1]
data_neg = df[df['label'] == 0]
print(data_pos[:1])
X_data_pos = data_pos.iloc[:,2:101].values
y_data_pos = data_pos.iloc[:,101:102].values
X_data_neg = data_neg.iloc[:,2:101].values
y_data_neg = data_neg.iloc[:,101:102].values
X_train_pos, X_test_pos, y_train_pos, y_test_pos = train_test_split(X_data_pos, y_data_pos, test_size=0.3, random_state=10)
X_train_neg, X_test_neg, y_train_neg, y_test_neg = train_test_split(X_data_neg, y_data_neg, test_size=0.3, random_state=10)
print("打印X_train_pos的数据类型--------")
print(X_train_pos.shape)
print("打印X_train_neg的数据类型--------")
print(X_train_neg.shape)
# 将数据集进行拼接
X_train = np.vstack((X_train_pos, X_train_neg))
X_test = np.vstack((X_test_pos, X_test_neg) )
y_train = np.vstack((y_train_pos, y_train_neg))
y_test = np.vstack((y_test_pos, y_test_neg) )
y_train = y_train.astype(np.float64)
y_test = y_test.astype(np.float64)
# 打印结果
print("打印X_train和X_test及y_train和y_test的数据类型-------")
print(X_train.shape)
print(X_test.shape)
print(y_train[:3])
print(y_test[:3])
X = df.iloc[:,2:123].values
y = df.iloc[:,124:125].values.astype(np.float64)
import xgboost as xgb
dall = xgb.DMatrix(X, y)
print(dall)
# 模型参数设置。寻找最优的参数
# 取数值型特征构建训练集
dtrain = xgb.DMatrix(X_train, y_train)
dtest = xgb.DMatrix(X_test, y_test)
# 自定义xgboost 参数搜索函数
def model_fit(params, dtrain, max_round=500, cv_folds=5, n_stop_round=50):
"""对一组参数进行交叉验证,并返回最优迭代次数和最优的结果。
Args:
params: dict, xgb 模型参数。
见 xgb_grid_search_cv 函数
Returns:
n_round: 最优迭代次数
mean_auc: 最优的结果
"""
cv_result = xgb.cv(params, dtrain, max_round, nfold=cv_folds,
metrics='auc', early_stopping_rounds=n_stop_round, show_stdv=False)
n_round = cv_result.shape[0] # 最优模型,最优迭代次数
mean_auc = cv_result['test-auc-mean'].values[-1] # 最好的 AUC
return n_round, mean_auc
def xgb_grid_search_cv(params, key, search_params, dtrain, max_round=500, cv_folds=5,
n_stop_round=10, return_best_model=True, verbose=True):
"""自定义 grid_search_cv for xgboost 函数。
Args:
params: dict, xgb 模型参数。
key: 待搜寻的参数。
search_params:list, 待搜寻的参数list。
dtrain: 训练数据
max_round: 最多迭代次数
cv_folds: 交叉验证的折数
early_stopping_rounds: 迭代多少次没有提高则停止。
return_best_model: if True, 在整个训练集上使用最优的参数训练模型。
verbose:if True, 打印训练过程。
Returns:
cv_results: dict,所有参数组交叉验证的结果。
- mean_aucs: 每组参数对应的结果。
- n_rounds: 每组参数最优迭代轮数。
- list_params: 搜寻的每一组参数。
- best_mean_auc: 最优的结果。
- best_round: 最优迭代轮数。
- best_params: 最优的一组参数。
best_model: XGBoostClassifer()
"""
import time
mean_aucs = list()
n_rounds = list()
list_params = list()
print('Searching parameters: %s %s' % (key, str(values)))
tic = time.time()
for search_param in search_params:
params[key] = search_param
list_params.append(params.copy())
n_round, mean_auc = model_fit(params, dtrain, max_round, cv_folds, n_stop_round)
if verbose:
print('%s=%s: n_round=%d, mean_auc=%g. Time cost %gs' % (key, str(search_param), n_round, mean_auc, time.time() - tic))
mean_aucs.append(mean_auc)
n_rounds.append(n_round)
best_mean_auc = max(mean_aucs)
best_index = mean_aucs.index(best_mean_auc) # 最优的一组
best_round = n_rounds[best_index]
best_params = list_params[best_index]
cv_result = {'mean_aucs': mean_aucs, 'n_rounds': n_rounds, 'list_params': list_params,
'best_mean_auc': best_mean_auc, 'best_round': best_round, 'best_params': best_params}
if return_best_model:
best_model = xgb.train(best_params, dtrain, num_boost_round=best_round)
else:
best_model = None
if verbose:
print('best_mean_auc = %g' % best_mean_auc)
print('best_round = %d' % best_round)
print('best_params = %s' % str(best_params))
return cv_result, best_model
params = {'booster': 'gbtree',
'objective': 'binary:logistic',
'subsample': 1,
#'colsample_bytree': 1,
'eta': 0.1,
'max_depth': 8,
'min_child_weight': 1,
'gamma': 0.0,
'silent': 1,
'seed': 0,
'eval_metric': 'auc',
'njob':8
}
首先寻找最佳的步长和轮数
key = 'eta'
values = [0.1, 0.2, 0.3]
cv_result, best_model = xgb_grid_search_cv(params, key, values, dtrain)
print('%s: %s' % (key, str(values)))
print('n_round = %s' % str(cv_result['n_rounds']))
print('mean_aucs = %s' % str(cv_result['mean_aucs']))
在上一个参数最优的基础上,寻找其他参数的最优值.虽然这样并没有完全进行 grid search,但是一般来说,结果不会太差,更重要的是节省时间。
params = cv_result['best_params']
key = 'max_depth'
values = [4, 5, 6, 7, 8]
cv_result, best_model = xgb_grid_search_cv(params, key, values, dtrain)
print('%s: %s' % (key, str(values)))
print('n_round = %s' % str(cv_result['n_rounds']))
使用 xgb_model.predict() 返回每个样本分为 正类 1 的概率
y_pred_prob = best_model.predict(dtest)
print(X_test.shape, y_pred_prob.shape)
print(y_pred_prob[0])
# 设置 pred_leaf=True, 返回每个样本在每棵树上的叶子的 ID
y_pred_prob = best_model.predict(dtest, pred_leaf=True)
print(X_test.shape, y_pred_prob.shape)
print(y_pred_prob[0])
将所有的数值特征转为 one-hot 类型,和原始的类别型特征进行拼接
from sklearn.preprocessing import OneHotEncoder
from sklearn.linear_model import LogisticRegression
# 编码成 one-hot 类型的数据
grd_enc = OneHotEncoder()
# LR 分类器
grd_lm = LogisticRegression()
# 每棵树的叶子编码
dall = xgb.DMatrix(X, y)
all_leafs = best_model.predict(dall, pred_leaf=True)
train_leafs = best_model.predict(dtrain, pred_leaf=True)
test_leafs = best_model.predict(dtest, pred_leaf=True)
# 使用所有特征确定编码规则
grd_enc.fit(all_leafs)
# one-hot 编码
oh_train = grd_enc.transform(train_leafs).toarray()
oh_test = grd_enc.transform(test_leafs).toarray()
# 所有特征
X_train_oh = np.hstack([oh_train, X_train])
X_test_oh = np.hstack([oh_test, X_test])
print('X_train_oh.shape=%s' % str(X_train_oh.shape))
print('X_test_oh.shape=%s' % str(X_test_oh.shape))
使用LR进行训练
import time
from sklearn.model_selection import cross_val_score
# 使用 LR 进行训练
C_params = np.linspace(0.001, 0.05, 10) # 线性划分参数,0.001--5 ,划分20等分 # 0.015 最好
LR_aucs = [] # 存储cv值
print(C_params)
tic = time.time()
for C_param in C_params:
model = LogisticRegression(C = C_param, penalty='l1', max_iter=300)
scores = cross_val_score(model, X_train_oh, y_train, cv=5, scoring='roc_auc')
LR_aucs.append(scores.mean())
print('C=%g, mean_auc = %g. Time passed %gs' % (C_param, scores.mean(), time.time() - tic))
print("mean_aucs,", LR_aucs)
print("参数,", params)
best_index = LR_aucs.index(max(LR_aucs))
print("最好的参数:", C_params[best_index])
print("best_auc = %g" % max(LR_aucs))
# 使用最好的参数训练最后的模型
LR = LogisticRegression(C=C_params[best_index], penalty='l1', max_iter=100)
LR.fit(X_train_oh, y_train)