多任务学习:MMOE网络解析和模型实战
关键字:多任务学习
,MMOE
,multi-task learning
,tensorflow
内容提要
- 多任务学习的优势
- 多任务学习场景举例
- MMOE网络结构解析
- 使用tensorflow1搭建MMOE模型
- MMOE训练过程参数跟踪
- MMOE和单任务建模树模型结果对比
多任务学习简述
-
定义
基于共享表示(shared representation),把多个相关的任务放在一起学习的一种机器学习方法 -
优势
1.充分利用多个任务之间的共性信息
:模型参数在多个任务间共享,可以学习多个任务的共性,避免单独建模形成信息割裂
2.任务细化充分学习到任务间差异信息
:模型网络可以容忍任务特征有差异甚至互斥,模型参数还可以学习任务间的差异,因提高每个任务的预测泛化能力
3.一定程度上解决样本不足学习问题
:当某任务(主任务)数据不足时,可以引入相关的任务(辅助任务)进行多任务学习,期望其他任务样本有知识可以迁移到主任务,提高主任务的预测能力
4.解决样本偏差问题
:局部独立建模样本是有偏的,多任务建模输入是全局客群,避免训练和预测样本空间不一致
5.减少算法的开发部署工作量
:往往需要预测多个业务指标进行决策优化,如果多个业务线单独建模会大大增加开发,计算,维护成本 -
和传统学习的区别
传统机器学习每个任务分别建模,多任务学习联合多个任务一起联合建模,可以在不同任务迁移共享知识,同时学到任务之间的差异 -
举例
多任务学习举例
以相亲婚恋推荐为例,将推荐成功,结婚,离婚三个任务一起进行多任务学习,三个任务之间有共性知识,三个任务也有递进关系,同时样本空间逐渐缩小
MMOE网络结构
MMOE(Multi-gate Mixture-of-Experts)是一种多任务学习的实现方式。无门控单专家多任务,单门控多专家多任务(MOE),多门控多专家多任务(MMOE)网络结构分别如下
MMOE(a)无门空单专家多任务学习:多个任务共享底层,顶层单独学习,如果多个任务不相关甚至存在特征冲突,则底层学习只能中庸,一个专家对多任务的表达能力有限
(b)单门控多专家多任务学习:多任务共享多个专家网络,每个专家的输出是每个任务的一部分,同时引入一个门控学习每个任务的众多专家组合权重。多个专家的汇总输出增强了多任务的表征能力,门相当于注意力机制考虑了专家的权重
(c)多门控多专家多任务学习:由于MOE只有一个门,因此每个任务的专家组合是基于全部任务的折衷,如果某个任务样本少或者和大部分其他任务样本形成互斥,则门会偏向于大部分主流的任务而忽略某些特殊任务。MMOE给每个任务都配备了一个门控网络,对于每个特定的任务,都能有一组个性化的专家组合去进行预测
模型代码实战
(1)业务和数据背景
需要对好坏客户进行分类,就是个风控任务,但是原始的好客户为4784个(群体A),坏客户300个(群体B),样本比例极不均衡正样本太少,考虑使用另一批坏样本1000个(群体C)进行增补,但是群体C和群体B的坏有些许定义差异,但是两者有相似的坏的共性,因此考虑将学习B作为主任务,学习C作为辅助任务,两者共同学习用共享知识来弥补B群体样本不足,同时分别打标提高模型的泛化能力。
(2)数据处理
对数据做简单统计,有12个客户既是B的坏,也是C的坏
内部坏客户: 300 外部坏客户: 1000 内部好客户: 5000 内外都在的坏客户: 12 全部客户数: 6000
总计特征数量: 291
训练label统计
total:4169,good:3257,bad:233,external_bad:687
测试label统计
total:1862,good:1453,bad:101,external_bad:312
将特征数据处理为向量格式,特征一共291维,全部都做了连续话和最大最小变换处理,将label数据处理为一个一维列表,分别记录是否是A,是否是C,另外需要对标签做mask,比如内部的坏客户不代表不是外部坏客户,而是由于两方数据范围不一致导致,因此内部坏客户应该在外部数据上被mask掉,标签和mask的数据定义如下
好客户: [0, 0], mask: [1, 1]
是A坏客户: [1, 0], mask:[1, 0]
是C坏客户: [0, 1], mask:[0, 1]
同时是A和B的坏客户: [1, 1], mask:[1, 1]
最后把数据根据7:3分别训练和测试,不做任何样本均衡处理。
(3)模型搭建
MMOE使用tensorflow1的静态图搭建如下
class MMOE_MASK:
def __init__(self, feature_size, num_task, num_export, export_dim, tower_dim, learning_rate=0.01,
l2_regularizer_scale=0.01, task_weight="1,1"):
self.input_x = tf.placeholder(tf.float32, [None, feature_size], name="input_x")
self.input_y = tf.placeholder(tf.float32, [None, num_task], name="input_y")
self.input_mask = tf.placeholder(tf.float32, [None, num_task], name="mask")
self.num_task = num_task
self.num_export = num_export
self.export_dim = export_dim
self.tower_dim = tower_dim
self.learning_rate = learning_rate
self.l2_regularizer_scale = l2_regularizer_scale
self.loss = 0
self.global_step = tf.Variable(0, name="global_step", trainable=False)
self.train_step = None
self.prob_out = None
self.task_weight = get_task_weight(task_weight)
def build(self):
print("task weight:", self.task_weight)
export_res = [] # num_export
gate_res = [] # num_task
# export
for i in range(self.num_export):
# [None, feature_size] => [None, export_dim]
one_export_out = tf.layers.dense(self.input_x, self.export_dim, activation=tf.nn.relu,
kernel_regularizer=tf.contrib.layers.l2_regularizer(
self.l2_regularizer_scale))
# 升1维,为合并再加权求和做准备
one_export_out_expand = tf.expand_dims(one_export_out, 2)
# list( [None, export_dim, 1],[None, export_dim, 1]...), size=num_export
export_res.append(one_export_out_expand)
# 预测结果合并为一个矩阵 [None, export_dim, num_export]
export_res_out = tf.concat(export_res, axis=2)
# gate
for i in range(self.num_task):
# [None, feature_size] => [None, num_export]
one_gate_out = tf.layers.dense(self.input_x, self.num_export, activation=tf.nn.softmax,
kernel_regularizer=tf.contrib.layers.l2_regularizer(
self.l2_regularizer_scale))
# 升1维,准备相乘
one_gate_out_expand = tf.expand_dims(one_gate_out, 1)
# [None, export_dim, num_export] * [None, 1, num_export] => [None, export_dim, num_export]
# [None, export_dim, num_export] => [None, export_dim]
one_task_gate_export_out = tf.reduce_sum(one_gate_out_expand * export_res_out, 2)
gate_res.append(one_task_gate_export_out)
# tower
prob_res = []
for index, gate_out in enumerate(gate_res):
# [None, export_dim] => [None, tower_dim]
one_tower_dense = tf.layers.dense(gate_out, self.tower_dim, activation=tf.nn.relu,
kernel_regularizer=tf.contrib.layers.l2_regularizer(
self.l2_regularizer_scale))
# [None, tower_dim] => [None, 1]
one_tower_out = tf.layers.dense(one_tower_dense, 1, activation=None)
# prob
prob = tf.sigmoid(one_tower_out)
prob_res.append(prob)
# label
one_task_label = tf.slice(self.input_y, [0, index], [-1, 1])
# mask [None, 1]
one_task_mask = tf.slice(self.input_mask, [0, index], [-1, 1])
# one_task_mask /= tf.reduce_mean(one_task_mask)
# loss
self.loss += tf.reduce_mean(
tf.nn.sigmoid_cross_entropy_with_logits(labels=one_task_label, logits=one_tower_out) * one_task_mask) * \
self.task_weight[index]
tf.nn.sigmoid_cross_entropy_with_logits(labels=one_task_label, logits=one_tower_out))
self.loss += tf.losses.get_regularization_loss()
# list( [None, 1], [None, 1] ) => [None, 2]
self.prob_out = tf.concat(prob_res, axis=1, name="prob")
# 优化器
optimizer = tf.train.AdamOptimizer(learning_rate=self.learning_rate)
self.train_step = optimizer.minimize(self.loss, global_step=self.global_step)
代码的流程是使用循环构建多个export层,gate层,tower层,构建的层存储到列表,再搭配tf.concat
将列表转化为tensor对象完成下面的流转工作,其中export,gate,tower都是简单的全连接。
最关键的一行代码为门输出的softmax权重和所有专家输出的隐藏层状态进行加权求和
one_task_gate_export_out = tf.reduce_sum(one_gate_out_expand * export_res_out, 2)
通过以上代码两个向量对应位置相乘再求和完成。
在loss计算环节,初始化一个loss再使用循环将每个任务的loss累加进去,同时简单的配置了任务权重,支持对主任务加权。
(4)模型训练
实例化MMOE如下
model = MMOE(feature_size=291, num_task=2, num_export=4, export_dim=128, tower_dim=8, l2_regularizer_scale=0.00,
learning_rate=0.01, task_weight="2,1")
model.build()
设置291维,2个任务,4个专家网络,每个专家网络输出的隐藏层维度128,塔层的全连接输出维度8。
训练代码如下
saver = tf.train.Saver(tf.global_variables(), max_to_keep=1)
with tf.Session() as sess:
init_op = tf.group(tf.global_variables_initializer())
sess.run(init_op)
shutil.rmtree(os.path.join(BASIC_PATH, "./summary"), ignore_errors=True)
writer = tf.summary.FileWriter(os.path.join(BASIC_PATH, "./summary"), sess.graph)
batches = get_batch(100, 1024, train_x, train_y)
# 验证
val_feed_dict = {model.input_x: test_x, model.input_y: test_y}
train_loss = []
val_loss_list = []
for batch in batches:
epoch, batch_x, batch_y = batch
feed_dict = {model.input_x: batch_x, model.input_y: batch_y}
_, step, loss_val, train_prob = sess.run([model.train_step, model.global_step, model.loss, model.prob_out],
feed_dict=feed_dict)
# writer.add_summary(merged, step)
if step % 1 == 0:
batch_metrics = get_metrics(train_prob, batch_y)
# print(batch_metrics)
print("epoch:", epoch + 1, "step:", step, "loss:", round(loss_val, 4), "task_1_auc:",
round(batch_metrics["task_1_auc"], 4), "task_1_ks:", round(batch_metrics["task_1_ks"], 4),
"task_2_auc:", round(batch_metrics["task_2_auc"], 4), "task_2_ks:",
round(batch_metrics["task_2_ks"], 4))
train_loss.append(loss_val)
if step % 3 == 0:
loss_val, val_prob = sess.run([model.loss, model.prob_out], feed_dict=val_feed_dict)
batch_metrics = get_metrics(val_prob, test_y)
print("{:-^30}".format("evaluation"))
print("[evaluation]", "loss:", loss_val, "task_1_auc:",
round(batch_metrics["task_1_auc"], 4), "task_1_ks:", round(batch_metrics["task_1_ks"], 4),
"task_2_auc:", round(batch_metrics["task_2_auc"], 4), "task_2_ks:",
round(batch_metrics["task_2_ks"], 4), "\n")
# 计算当前loss相比之前的最有loss下降多少
diff = (loss_val - min(val_loss_list)) if len(val_loss_list) else 0
val_loss_list.append(loss_val)
print("本轮loss比之前最小loss{}:{}, 当前最小loss: {}".format("上升" if diff > 0 else "下降", abs(diff),
min(val_loss_list)))
if diff <= 0:
saver.save(sess, os.path.join(BASIC_PATH, "./ckpt1/ckpt"))
print("[save checkpoint]")
print("-" * 40)
if early_stop(val_loss_list, windows=20):
print("{:-^30}".format("early stop!"))
break
设置在测试集20步早停,在每一步的测试和训练中将预测结果run出来使用sklearn.metrics得到ks和auc,允许结果如下
----------------------------------------
epoch: 23 step: 115 loss: 0.1192 task_1_auc: 0.9257 task_1_ks: 0.8261 task_2_auc: 1.0 task_2_ks: 1.0
epoch: 24 step: 116 loss: 0.1031 task_1_auc: 0.9168 task_1_ks: 0.6903 task_2_auc: 0.9855 task_2_ks: 0.9216
epoch: 24 step: 117 loss: 0.0862 task_1_auc: 0.9152 task_1_ks: 0.6923 task_2_auc: 0.9974 task_2_ks: 0.9551
----------evaluation----------
[evaluation] loss: 0.20613107 task_1_auc: 0.7482 task_1_ks: 0.4488 task_2_auc: 0.9555 task_2_ks: 0.8238
本轮loss比之前最小loss上升:0.04107639193534851, 当前最小loss: 0.16505467891693115
----------------------------------------
epoch: 24 step: 118 loss: 0.0889 task_1_auc: 0.8997 task_1_ks: 0.6846 task_2_auc: 0.9936 task_2_ks: 0.9271
epoch: 24 step: 119 loss: 0.1083 task_1_auc: 0.9004 task_1_ks: 0.6667 task_2_auc: 0.9848 task_2_ks: 0.9112
epoch: 24 step: 120 loss: 0.0658 task_1_auc: 0.954 task_1_ks: 0.8333 task_2_auc: 1.0 task_2_ks: 1.0
----------evaluation----------
[evaluation] loss: 0.21031862 task_1_auc: 0.7333 task_1_ks: 0.4354 task_2_auc: 0.9551 task_2_ks: 0.8219
本轮loss比之前最小loss上升:0.04526394605636597, 当前最小loss: 0.16505467891693115
----------------------------------------
---------early stop!----------
(5)跟踪训练过程中的loss和专家权重
对MMOE类稍加改造就可以单独将两个任务的loss和门输出的权重拿出来,在和训练的step一起作图,首先看下门输出的专家的权重
任务一:主任务的专家权重
对于一个batch比如512的输入,因为有4个专家,因此门的权重输出(512, 4),相当于一组训练模型给每个专家出了512个权重结果,对每一个专家上结果求均值得到最终这一组的权重4个值。
从图上来看对于任务一,专家4的权重最高,主要以专家4和专家3在发挥对任务一的决策作用。
再看任务2,图如下
任务二中专家2权重最大,以专家2和专家4的声音最大。
从两个任务的专家权重来看,不同任务之间专家权重出现差异,对于任务一专家4主导,对于任务二专家2主导,另外随着训练步长的深入,专家的组合权重也越来越稳定,趋势明显,没有出现任何波动的情况,说明起到了专家各领风骚的目的。
再看下两个任务的loss收敛情况
(6)MMOE和单任务建模随机森林的对比
先构建预测部分的代码,训练部分将模型保存为了pb,预测代码如下
def predict_pb(input_x, pb_file_no=None):
"""从pb导入模型"""
max_time = pb_file_no
if max_time is None:
max_time = max(os.listdir(os.path.join(BASIC_PATH, "./tfserving")))
print("读取pb版本:", max_time)
with tf.Session(graph=tf.Graph()) as sess:
tf.saved_model.loader.load(sess, [tag_constants.SERVING], os.path.join(BASIC_PATH, "./tfserving", max_time))
graph = tf.get_default_graph()
input_self = graph.get_operation_by_name("input_x").outputs[0]
probs = graph.get_tensor_by_name("prob:0")
pred = sess.run(probs, feed_dict={input_self: input_x})
return pred
同样使用循环打印出每个任务的auc和ks
test_ents, test_x, test_y = pickle.load(open(os.path.join(BASIC_PATH, get_string("test_data_path")), "rb"))
pred = predict_pb(test_x)
# metrics
for i in range(2):
one_task_pro = pred[:, i]
one_labels = [x[i] for x in test_y]
auc = roc_auc_score(one_labels, one_task_pro)
fpr, tpr, threshold = roc_curve(one_labels, one_task_pro)
ks = abs(fpr - tpr).max()
print("task_{} auc:{} ks:{}".format(i + 1, auc, ks))
模型在测试集的结果如下
task_1 auc:0.8298755769955191 ks:0.5452797409212812
task_2 auc:0.9601592224979321 ks:0.8219727047146402
我们再构建一个树模型,使用sklearn的随机森林,代码如下
train_ents, train_x, train_y = pickle.load(open(os.path.join(BASIC_PATH, get_string("train_data_path")), "rb"))
test_ents, test_x, test_y = pickle.load(open(os.path.join(BASIC_PATH, get_string("test_data_path")), "rb"))
bad_ents = list(set([line.strip() for line in open(os.path.join(BASIC_PATH, get_string("bad_ent_path")))]))
external_bad_ents = list(
set([line.strip() for line in open(os.path.join(BASIC_PATH, get_string("external_bad_ent_path")))]))
# 丢弃掉因为任务2加入的客户
train_one_task_ents = []
train_one_task_x = []
train_one_task_y = []
test_one_task_ents = []
test_one_task_x = []
test_one_task_y = []
for i, j, z in zip(train_ents, train_x, train_y):
if i in external_bad_ents and i not in bad_ents:
continue
train_one_task_ents.append(i)
train_one_task_x.append(j)
train_one_task_y.append(z[0])
print("------train-------")
print("good:{}, bad: {}".format(len(train_one_task_y) - sum(train_one_task_y), sum(train_one_task_y)))
for i, j, z in zip(test_ents, test_x, test_y):
if i in external_bad_ents and i not in bad_ents:
continue
test_one_task_ents.append(i)
test_one_task_x.append(j)
test_one_task_y.append(z[0])
print("------test-------")
print("good:{}, bad: {}".format(len(test_one_task_y) - sum(test_one_task_y), sum(test_one_task_y)))
clf = RandomForestClassifier(max_depth=15, n_estimators=300)
# 只训练一个任务
clf.fit(train_one_task_x, train_one_task_y)
probs = clf.predict_proba(test_one_task_x)[:, 1]
auc = roc_auc_score(test_one_task_y, probs)
fpr, tpr, threshold = roc_curve(test_one_task_y, probs)
ks = abs(fpr - tpr).max()
print("auc:{} ks:{}".format(auc, ks))
仅对label的第一维(任务一)和所有特征输入树模型,输出如下
------train-------
good:3257, bad: 233
------test-------
good:1453, bad: 101
auc:0.8478838592737457 ks:0.5450518899102573
稍作修改可以看随机森林对任务2单独的分类效果,汇总如下
MMOE对比单任务建模可以看到多任务学习比单任务分别用随机森林建模KS和AUC都得到了提升