DeepFM 工程实现 tensorflow
2020-07-26 本文已影响0人
xiaogp
先总结一下代码, 后续再补上原理
数据说明
以一个二分类模型为例, 特征全部是离散变量, 连续变量都做了分箱离散化处理, 预测用户是否会购买某一款商品,部分特征预览如下
特征预览.png
数据预处理
对样本每一行进行转化, 获得特征值和特征索引, 离散变量的特征值都是1, 连续变量的特征值是标准化之后的原始值, 获得特征索引的目的是为了后续做embedding_lookup获得特征的隐向量
先获得特征索引文件, 训练集和测试集以及后续预测集都是以这一份特征索引文件作为特征转化依据
import pandas as pd
df = pd.read_csv("churn_test.csv")
index_start = 0
with open("churn_feature_index.txt", "w", encoding="utf8") as f:
for col in df.columns[1:-1]:
unique_vals = df[col].unique().tolist()
if not "unknow" in unique_vals:
unique_vals += ["unknow"]
for unique_val in unique_vals:
f.write(col + ":" + str(unique_val) + " " + str(index_start) + "\n")
index_start += 1
特征索引文件预览如下, 特征:分类取值 索引值, 索引值从0开始
shop_duration:30以下 0
shop_duration:30-60 1
shop_duration:90-120 2
shop_duration:60-90 3
shop_duration:120以上 4
shop_duration:unknow 5
recent:120以上 6
recent:30以下 7
recent:60-90 8
recent:30-60 9
recent:90-120 10
recent:unknow 11
下一步训练集和测试集的特征转化, 分别获得特征值, 特征索引值和样本值, 由于数据没有连续变量, 因此只要对离散变量获得索引即可, 特征值都是1
import pandas as pd
label_col = ["label"]
categorical_cols = ['shop_duration', 'recent', 'monetary', 'max_amount',
'items_count', 'valid_points_sum', 'CHANNEL_NUM_ID', 'member_day',
'VIP_TYPE_NUM_ID', 'frequence', 'avg_amount', 'item_count_turn',
'avg_piece_amount', 'monetary3', 'max_amount3', 'items_count3',
'frequence3', 'shops_count', 'promote_percent', 'wxapp_diff',
'store_diff', 'shop_channel', 'week_percent', 'infant_group',
'water_product_group', 'meat_group', 'beauty_group', 'health_group',
'fruits_group', 'vegetables_group', 'pets_group', 'snacks_group',
'smoke_group', 'milk_group', 'instant_group', 'grain_group']
def get_feature_index():
feature_index = {}
with open("./churn_feature_index.txt", "r", encoding="utf8") as f:
for line in f.readlines():
items = line.strip()
if items:
key = items.split()[0]
val = items.split()[1]
feature_index[key] = val
return feature_index
def preprocessing(df, label=True):
data_index = []
data_value = []
for line in df.iloc[:, 1:-1].values:
indexs = []
values = []
for col, val in zip(categorical_cols, line):
key = col + ":" + str(val)
value = 1
# 获得索引号, 如果新数据有没有出现过的分类值,用unkonw代替
if key in feature_index:
index = int(feature_index[key])
else:
index = int(feature_index[col + ":" + "unknow"])
indexs.append(index)
values.append(value)
data_index.append(indexs)
data_value.append(values)
if label:
data_label = np.array(df["label"].tolist()).reshape(-1, 1)
return data_index, data_value, data_label
return data_index, data_value
if __name__ == "__main__":
import pickle
feature_index = get_feature_index()
train = pd.read_csv("./churn_train.csv")
test = pd.read_csv("./churn_test.csv")
train_index, train_value, train_label = preprocessing(train)
test_index, test_value, test_label = preprocessing(test)
with open("churn_train.pkl", "wb") as f:
pickle.dump(train_index, f)
pickle.dump(train_value, f)
pickle.dump(train_label, f)
with open("churn_test.pkl", "wb") as f:
pickle.dump(test_index, f)
pickle.dump(test_value, f)
pickle.dump(test_label, f)
构建DeepFM和模型训练
先定义一个DeepFM模型参数的累对象Args, 模型层分别构建FM的一阶+二阶, 以及三层DNN数据网络, 三者横向拼接再加一层全连接, 其中embedding层被FM的二阶参数计算和DNN的输入层共享, 模型采用早停策略, 当连续n=5次测试集没有新的最低loss出现的时候,模型停止训练, 同事ckpt最多存储5份, 模型最终读取最优loss下的ckpt作为评价指标并且将ckpt导出为冻结图pb文件
# -*- coding: utf-8 -*-
import os
import pickle
import random
import shutil
import time
import numpy as np
import tensorflow as tf
from tensorflow.python.saved_model import tag_constants
from utils import get_metrics, early_stop
class Args():
feature_sizes = 100
field_size = 15
embedding_size = 100
deep_layers = [512, 256, 128]
epoch = 1
batch_size = 1024
learning_rate = 1e-3
l2_reg_rate = 0.00
checkpoint_dir = os.path.join("./churn_ckpt/model.ckpt")
is_training = True
class model():
def __init__(self, args):
self.feature_sizes = args.feature_sizes
self.field_size = args.field_size
self.embedding_size = args.embedding_size
self.deep_layers = args.deep_layers
self.l2_reg_rate = args.l2_reg_rate
self.learning_rate = args.learning_rate
self.decaylearning_rate = args.decaylearning_rate
self.deep_activation = tf.nn.relu
self.weight = dict()
self.checkpoint_dir = args.checkpoint_dir
self.build_model()
def build_model(self):
self.feat_index = tf.placeholder(tf.int32, shape=[None, None], name='feature_index')
self.feat_value = tf.placeholder(tf.float32, shape=[None, None], name='feature_value')
self.label = tf.placeholder(tf.float32, shape=[None, None], name='label')
# One-hot编码后的输入层与Dense embeddings层的权值定义,即DNN的输入embedding。注:Dense embeddings层的神经元个数由field_size和决定
self.weight['feature_weight'] = tf.Variable(
# 初始化维度, onehot后特征数量 * embedidng_size
# 标准正太分布, 均值0, 标准差0.01
tf.random_normal([self.feature_sizes, self.embedding_size], 0.0, 0.01),
name='feature_weight')
# FM部分中一次项的权值定义
self.weight['feature_first'] = tf.Variable(
tf.random_normal([self.feature_sizes, 1], 0.0, 1.0),
name='feature_first')
# deep网络部分的weight
num_layer = len(self.deep_layers)
input_size = self.field_size * self.embedding_size
# 初始化标准差修正
init_method = np.sqrt(2.0 / (input_size + self.deep_layers[0]))
# shape (9984,512)
self.weight['layer_0'] = tf.Variable(
# loc, 正太分布均值
# scale, 正太分布的标准差
np.random.normal(loc=0, scale=init_method, size=(input_size, self.deep_layers[0])), dtype=np.float32
)
# shape(1, 512)
self.weight['bias_0'] = tf.Variable(
np.random.normal(loc=0, scale=init_method, size=(1, self.deep_layers[0])), dtype=np.float32
)
# 生成deep network里面每层的weight 和 bias
if num_layer != 1:
for i in range(1, num_layer):
init_method = np.sqrt(2.0 / (self.deep_layers[i - 1] + self.deep_layers[i]))
# shape (512,256) (256,128)
self.weight['layer_' + str(i)] = tf.Variable(
np.random.normal(loc=0, scale=init_method, size=(self.deep_layers[i - 1], self.deep_layers[i])),
dtype=np.float32)
# shape (1,256) (1,128)
self.weight['bias_' + str(i)] = tf.Variable(
np.random.normal(loc=0, scale=init_method, size=(1, self.deep_layers[i])),
dtype=np.float32)
# deep部分output_size + 一次项output_size + 二次项output_size
# field_size 不是 feature_size已经把0值的位置忽略了
last_layer_size = self.deep_layers[-1] + self.field_size + self.embedding_size
init_method = np.sqrt(np.sqrt(2.0 / (last_layer_size + 1)))
# 生成最后一层的结果
self.weight['last_layer'] = tf.Variable(
np.random.normal(loc=0, scale=init_method, size=(last_layer_size, 1)), dtype=np.float32)
self.weight['last_bias'] = tf.Variable(tf.constant(0.01), dtype=np.float32)
# embedding_part
# shape (?,?,256)
# batch_size * feature_sizes(25) * embedding_size(256)
self.embedding_index = tf.nn.embedding_lookup(self.weight['feature_weight'],
self.feat_index) # Batch*F*K
# 把特征值和embedding中每个元素进行相乘
# 不reshape位置对应不上
self.embedding_part = tf.multiply(self.embedding_index,
tf.reshape(self.feat_value, [-1, self.field_size, 1]))
"""
网络传递结构
"""
# FM部分
# 一阶特征
self.embedding_first = tf.nn.embedding_lookup(self.weight['feature_first'],
self.feat_index) # bacth*F*1
# 对应位置相乘, 维度一致
self.embedding_first = tf.multiply(self.embedding_first, tf.reshape(self.feat_value, [-1, self.field_size, 1]))
# shape (?,39)一阶的结果
self.first_order = tf.reduce_sum(self.embedding_first, 2)
# 二阶特征
# 先求和再开方
# embedding_part [None, 16, 128] 特征值和embedding所有元素相乘
self.sum_second_order = tf.reduce_sum(self.embedding_part, 1)
self.sum_second_order_square = tf.square(self.sum_second_order) # [None, 128]
# 先开方后求和
self.square_second_order = tf.square(self.embedding_part)
self.square_second_order_sum = tf.reduce_sum(self.square_second_order, 1) # [None, 128]
# 1/2*((a+b)^2 - a^2 - b^2)=ab
# [None, embedding_size]
self.second_order = 0.5 * tf.subtract(self.sum_second_order_square, self.square_second_order_sum)
self.fm_part = tf.concat([self.first_order, self.second_order], axis=1)
# DNN部分
# shape (?,9984)
# 全部拉直铺开, 神经网络的输入是 filed × embedding_size
# 所有有值特征的embedding拼接在一起
self.deep_embedding = tf.reshape(self.embedding_part, [-1, self.field_size * self.embedding_size])
# print('deep_embedding:', self.deep_embedding)
# 全连接部分
for i in range(0, len(self.deep_layers)):
self.deep_embedding = tf.add(tf.matmul(self.deep_embedding, self.weight["layer_%d" % i]),
self.weight["bias_%d" % i])
self.deep_embedding = self.deep_activation(self.deep_embedding)
# FM输出与DNN输出拼接
din_all = tf.concat([self.fm_part, self.deep_embedding], axis=1)
# 全连接
self.out = tf.add(tf.matmul(din_all, self.weight['last_layer']), self.weight['last_bias'])
# loss部分
self.out = tf.nn.sigmoid(self.out, name='logit')
# 增加auc
self.auc_score = tf.metrics.auc(self.label, self.out)
self.loss = -tf.reduce_mean(
self.label * tf.log(self.out + 1e-24) + (1 - self.label) * tf.log(1 - self.out + 1e-24))
# 正则:sum(w^2)/2*l2_reg_rate
self.loss += tf.nn.l2_loss(self.weight["last_layer"]) * self.l2_reg_rate
for i in range(len(self.deep_layers)):
self.loss += tf.nn.l2_loss(self.weight["layer_%d" % i]) * self.l2_reg_rate
self.global_step = tf.Variable(0, trainable=False)
self.learning_rate_decay = tf.train.exponential_decay(self.learning_rate, self.global_step, 100, self.decaylearning_rate)
opt = tf.train.AdamOptimizer(self.learning_rate_decay)
# opt = tf.train.AdamOptimizer(self.learning_rate)
trainable_params = tf.trainable_variables()
gradients = tf.gradients(self.loss, trainable_params)
clip_gradients, _ = tf.clip_by_global_norm(gradients, 5)
self.train_op = opt.apply_gradients(
zip(clip_gradients, trainable_params), global_step=self.global_step)
self.saver = tf.train.Saver(tf.global_variables(), max_to_keep=5)
def train(self, sess, feat_index, feat_value, label):
loss, _, step, auc_score = sess.run([self.loss, self.train_op, self.global_step, self.auc_score], feed_dict={
self.feat_index: feat_index,
self.feat_value: feat_value,
self.label: label
})
return loss, step, auc_score
def predict(self, sess, feat_index, feat_value):
result = sess.run(self.out, feed_dict={
self.feat_index: feat_index,
self.feat_value: feat_value
})
return result
def evaluate(self, sess, feat_index, feat_value, label):
loss, auc_score = sess.run([self.loss, self.auc_score], feed_dict={
self.feat_index: feat_index,
self.feat_value: feat_value,
self.label: label
})
return loss, auc_score
def save(self, sess, path):
self.saver.save(sess, save_path=path, global_step=self.global_step)
def restore(self, sess, path):
kpt = tf.train.latest_checkpoint("log/")
saver = tf.train.import_meta_graph("{}.meta".format(kpt))
saver.restore(sess, kpt)
def get_batch(data_index, data_value, data_label, epochs=10, batch_size=256):
data_len = len(data_index)
for epoch in range(epochs):
data = list(zip(data_index, data_value, data_label))
random.shuffle(data)
data_index, data_value, data_label = zip(*data)
for batch in range(0, data_len, batch_size):
if batch + batch_size < data_len:
output_data = (data_index[batch: batch + batch_size],
data_value[batch: batch + batch_size],
data_label[batch: batch + batch_size])
else:
output_data = (data_index[batch: data_len],
data_value[batch: data_len],
data_label[batch: data_len])
yield output_data
if __name__ == '__main__':
args = Args()
args.feature_sizes = 171
args.field_size = 36
args.is_training = True
args.epoch = 20
args.batch_size = 2048 * 8
args.learning_rate = 1e-3 * 8
args.decaylearning_rate = 0.95
args.embedding_size = 8
args.deep_layers = [128, 64, 32]
with open("churn_train.pkl", "rb") as f:
train_index = pickle.load(f)
train_value = pickle.load(f)
train_label = pickle.load(f)
with open("churn_test.pkl", "rb") as f:
test_index = pickle.load(f)
test_value = pickle.load(f)
test_label = pickle.load(f)
data_len = len(train_index)
tf.reset_default_graph()
with tf.Session() as sess:
Model = model(args)
sess.run(tf.global_variables_initializer())
sess.run(tf.local_variables_initializer())
loss_list = []
print("{:-^30}".format("training"))
train_data = get_batch(train_index, train_value, train_label, epochs=args.epoch, batch_size=args.batch_size)
for batch_index, batch_value, batch_label in train_data:
loss, step, auc_score = Model.train(sess, batch_index, batch_value, batch_label)
epoch_num = int(step * args.batch_size / data_len) + 1
if step % 10 == 0:
print("epoch: {} setp: {} => loss: {}".format(epoch_num, step, loss))
if step % 50 == 0:
loss, auc_score = Model.evaluate(sess, test_index, test_value, test_label)
print("{:-^30}".format("evaluation"))
print("evaluation => loss: {}".format(loss))
Model.save(sess, args.checkpoint_dir)
# 计算当前loss相比之前的最有loss下降多少
if len(loss_list) == 0:
diff = 0
else:
diff = loss - min(loss_list)
loss_list.append(loss)
print("本轮loss比之前最小loss{}:{}, 当前最小loss: {}".format("上升" if diff > 0 else "下降", abs(diff), min(loss_list)))
print("-" * 40)
# 早停
if early_stop(loss_list, windows=5):
break
print("-" * 30)
# 读取ckpt模型
tf.reset_default_graph()
with tf.Session() as sess:
file_list = os.listdir("./churn_ckpt")
min_index_file = "./churn_ckpt/" + min([".".join(x.split(".")[:2]) for x in file_list if x != "checkpoint"])
print("读取ckpt: {}".format(min_index_file))
saver = tf.train.import_meta_graph("{}.meta".format(min_index_file))
saver.restore(sess, min_index_file)
graph = tf.get_default_graph()
input_xi = graph.get_operation_by_name("feature_index").outputs[0]
input_xv = graph.get_operation_by_name("feature_value").outputs[0]
probs = graph.get_tensor_by_name("logit:0")
pred = sess.run(probs, feed_dict={input_xi: test_index, input_xv: test_value})
# 测试集评价
res = [x[0] for x in pred]
test_label = [x[0] for x in test_label]
get_metrics(test_label, res, 0.5)
# 模型保存
pb_path = os.path.join("./churn_pb", str(int(time.time())))
shutil.rmtree(pb_path, ignore_errors=True)
builder = tf.saved_model.builder.SavedModelBuilder(pb_path)
inputs = {'feature_index': tf.saved_model.utils.build_tensor_info(Model.feat_index),
'feature_value': tf.saved_model.utils.build_tensor_info(Model.feat_value)
}
outputs = {'output': tf.saved_model.utils.build_tensor_info(Model.out)}
signature = tf.saved_model.signature_def_utils.build_signature_def(
inputs=inputs,
outputs=outputs,
method_name=tf.saved_model.signature_constants.PREDICT_METHOD_NAME)
builder.add_meta_graph_and_variables(sess, [tag_constants.SERVING], {'my_signature': signature})
builder.save()
utils.py下的early_stop和get_metrics函数
from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score, f1_score
def early_stop(loss_list, windows=5):
if len(loss_list) <= windows:
return False
latest_loss = loss_list[-windows:]
previous_loss = loss_list[:-windows]
min_previous_loss = min(previous_loss)
min_latest_loss = min(latest_loss)
if min_latest_loss > min_previous_loss:
return True
return False
def get_metrics(labels, predictions, thres):
predictions_label = [1 if x >= thres else 0 for x in predictions]
print("accuracy:", accuracy_score(labels, predictions_label))
print("precision:", precision_score(labels, predictions_label))
print("reall:", recall_score(labels, predictions_label))
print("f1:", f1_score(labels, predictions_label))
print("auc:", roc_auc_score(labels, predictions))
模型最终训练过程输出如下, 设置对打epoch 20轮, 在第15轮早停
epoch: 14 setp: 660 => loss: 0.49427324533462524
epoch: 14 setp: 670 => loss: 0.49196428060531616
epoch: 15 setp: 680 => loss: 0.4925547242164612
epoch: 15 setp: 690 => loss: 0.4977375268936157
epoch: 15 setp: 700 => loss: 0.49650928378105164
----------evaluation----------
evaluation => loss: 0.4999653100967407
本轮loss比之前最小loss上升:0.0009462833404541016, 当前最小loss: 0.4990190267562866
----------------------------------------
------------------------------
读取ckpt: ./churn_ckpt/model.ckpt-500
INFO:tensorflow:Restoring parameters from ./churn_ckpt/model.ckpt-500
accuracy: 0.7584251187687513
precision: 0.7706821837759759
reall: 0.8258112742942573
f1: 0.7972948877964289
auc: 0.8262372477851344
INFO:tensorflow:No assets to save.
INFO:tensorflow:No assets to write.
INFO:tensorflow:SavedModel written to: ./churn_pb\1595775832\saved_model.pb
同时输出5份ckpt文件,其中索引最小的一份是最低loss的参数, 和一些pb文件
ckpt和pb文件.png
模型运用预测
分别此时从ckpt和pb导入模型进行新数据预测, 结果和训练的一致
import os
import pickle
import tensorflow as tf
from tensorflow.python.saved_model import tag_constants
from utils import get_metrics
with open("churn_test.pkl", "rb") as f:
test_index = pickle.load(f)
test_value = pickle.load(f)
test_label = pickle.load(f)
def predict_ckpt():
"""从检查点导入模型"""
with tf.Session() as sess:
# checkpoint_file = tf.train.latest_checkpoint("./ckpt")
# saver = tf.train.import_meta_graph("{}.meta".format(checkpoint_file))
# saver.restore(sess, checkpoint_file)
file_list = os.listdir("./churn_ckpt")
min_index_file = "./churn_ckpt/" + min([".".join(x.split(".")[:2]) for x in file_list if x != "checkpoint"])
saver = tf.train.import_meta_graph("{}.meta".format(min_index_file))
saver.restore(sess, min_index_file)
graph = tf.get_default_graph()
input_xi = graph.get_operation_by_name("feature_index").outputs[0]
input_xv = graph.get_operation_by_name("feature_value").outputs[0]
probs = graph.get_tensor_by_name("logit:0")
pred = sess.run(probs, feed_dict={input_xi: test_index, input_xv: test_value})
return pred
def predict_pb():
"""从pb导入模型"""
max_time = max(os.listdir("./churn_pb"))
with tf.Session(graph=tf.Graph()) as sess:
tf.saved_model.loader.load(sess, [tag_constants.SERVING], os.path.join("./churn_pb", max_time))
graph = tf.get_default_graph()
input_xi = graph.get_operation_by_name("feature_index").outputs[0]
input_xv = graph.get_operation_by_name("feature_value").outputs[0]
probs = graph.get_tensor_by_name("logit:0")
pred = sess.run(probs, feed_dict={input_xi: test_index, input_xv: test_value})
return pred
if __name__ == "__main__":
res = predict_ckpt()
res = [x[0] for x in res]
test_label = [x[0] for x in test_label]
get_metrics(test_label, res, 0.5)
res2 = predict_pb()
res2 = [x[0] for x in res2]
get_metrics(test_label, res2, 0.5)
输入如下
INFO:tensorflow:Restoring parameters from ./churn_ckpt/model.ckpt-500
accuracy: 0.7584251187687513
precision: 0.7706821837759759
reall: 0.8258112742942573
f1: 0.7972948877964289
auc: 0.8262372477851344
WARNING:tensorflow:From C:/Users/14165/Desktop/DeepFM/churn_predict.py:44: load (from tensorflow.python.saved_model.loader_impl) is deprecated and will be removed in a future version.
Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.loader.load or tf.compat.v1.saved_model.load. There will be a new function for importing SavedModels in Tensorflow 2.0.
INFO:tensorflow:Restoring parameters from ./churn_pb\1595775832\variables\variables
accuracy: 0.7584251187687513
precision: 0.7706821837759759
reall: 0.8258112742942573
f1: 0.7972948877964289
auc: 0.8262372477851344