js css html算法工程迁移学习和多任务学习

多任务学习:MMOE网络解析和模型实战

2023-04-20  本文已影响0人  xiaogp

关键字:多任务学习MMOEmulti-task learningtensorflow

内容提要


多任务学习简述


MMOE网络结构

MMOE(Multi-gate Mixture-of-Experts)是一种多任务学习的实现方式。无门控单专家多任务,单门控多专家多任务(MOE),多门控多专家多任务(MMOE)网络结构分别如下

MMOE

(a)无门空单专家多任务学习:多个任务共享底层,顶层单独学习,如果多个任务不相关甚至存在特征冲突,则底层学习只能中庸,一个专家对多任务的表达能力有限
(b)单门控多专家多任务学习:多任务共享多个专家网络,每个专家的输出是每个任务的一部分,同时引入一个门控学习每个任务的众多专家组合权重。多个专家的汇总输出增强了多任务的表征能力,门相当于注意力机制考虑了专家的权重
(c)多门控多专家多任务学习:由于MOE只有一个门,因此每个任务的专家组合是基于全部任务的折衷,如果某个任务样本少或者和大部分其他任务样本形成互斥,则门会偏向于大部分主流的任务而忽略某些特殊任务。MMOE给每个任务都配备了一个门控网络,对于每个特定的任务,都能有一组个性化的专家组合去进行预测


模型代码实战

(1)业务和数据背景

需要对好坏客户进行分类,就是个风控任务,但是原始的好客户为4784个(群体A),坏客户300个(群体B),样本比例极不均衡正样本太少,考虑使用另一批坏样本1000个(群体C)进行增补,但是群体C和群体B的坏有些许定义差异,但是两者有相似的坏的共性,因此考虑将学习B作为主任务,学习C作为辅助任务,两者共同学习用共享知识来弥补B群体样本不足,同时分别打标提高模型的泛化能力。

(2)数据处理

对数据做简单统计,有12个客户既是B的坏,也是C的坏

内部坏客户: 300 外部坏客户: 1000 内部好客户: 5000 内外都在的坏客户: 12 全部客户数: 6000
总计特征数量: 291
训练label统计
total:4169,good:3257,bad:233,external_bad:687
测试label统计
total:1862,good:1453,bad:101,external_bad:312

将特征数据处理为向量格式,特征一共291维,全部都做了连续话和最大最小变换处理,将label数据处理为一个一维列表,分别记录是否是A,是否是C,另外需要对标签做mask,比如内部的坏客户不代表不是外部坏客户,而是由于两方数据范围不一致导致,因此内部坏客户应该在外部数据上被mask掉,标签和mask的数据定义如下

好客户: [0, 0], mask: [1, 1]
是A坏客户: [1, 0], mask:[1, 0]
是C坏客户: [0, 1], mask:[0, 1]
同时是A和B的坏客户: [1, 1], mask:[1, 1]

最后把数据根据7:3分别训练和测试,不做任何样本均衡处理。


(3)模型搭建

MMOE使用tensorflow1的静态图搭建如下

class MMOE_MASK:
    def __init__(self, feature_size, num_task, num_export, export_dim, tower_dim, learning_rate=0.01,
                 l2_regularizer_scale=0.01, task_weight="1,1"):
        self.input_x = tf.placeholder(tf.float32, [None, feature_size], name="input_x")
        self.input_y = tf.placeholder(tf.float32, [None, num_task], name="input_y")
        self.input_mask = tf.placeholder(tf.float32, [None, num_task], name="mask")
        self.num_task = num_task
        self.num_export = num_export
        self.export_dim = export_dim
        self.tower_dim = tower_dim
        self.learning_rate = learning_rate
        self.l2_regularizer_scale = l2_regularizer_scale
        self.loss = 0
        self.global_step = tf.Variable(0, name="global_step", trainable=False)
        self.train_step = None
        self.prob_out = None
        self.task_weight = get_task_weight(task_weight)

    def build(self):
        print("task weight:", self.task_weight)
        export_res = []  # num_export
        gate_res = []  # num_task
        # export
        for i in range(self.num_export):
            # [None, feature_size] => [None, export_dim]
            one_export_out = tf.layers.dense(self.input_x, self.export_dim, activation=tf.nn.relu,
                                             kernel_regularizer=tf.contrib.layers.l2_regularizer(
                                                 self.l2_regularizer_scale))
            # 升1维,为合并再加权求和做准备
            one_export_out_expand = tf.expand_dims(one_export_out, 2)
            # list(  [None, export_dim, 1],[None, export_dim, 1]...), size=num_export
            export_res.append(one_export_out_expand)
        # 预测结果合并为一个矩阵 [None, export_dim, num_export]
        export_res_out = tf.concat(export_res, axis=2)

        # gate
        for i in range(self.num_task):
            # [None, feature_size] => [None, num_export]
            one_gate_out = tf.layers.dense(self.input_x, self.num_export, activation=tf.nn.softmax,
                                           kernel_regularizer=tf.contrib.layers.l2_regularizer(
                                               self.l2_regularizer_scale))
            # 升1维,准备相乘
            one_gate_out_expand = tf.expand_dims(one_gate_out, 1)
            # [None, export_dim, num_export] * [None, 1, num_export] => [None, export_dim, num_export]
            # [None, export_dim, num_export] => [None, export_dim]
            one_task_gate_export_out = tf.reduce_sum(one_gate_out_expand * export_res_out, 2)
            gate_res.append(one_task_gate_export_out)

        # tower
        prob_res = []
        for index, gate_out in enumerate(gate_res):
            # [None, export_dim] => [None, tower_dim]
            one_tower_dense = tf.layers.dense(gate_out, self.tower_dim, activation=tf.nn.relu,
                                              kernel_regularizer=tf.contrib.layers.l2_regularizer(
                                                  self.l2_regularizer_scale))
            # [None, tower_dim] => [None, 1]
            one_tower_out = tf.layers.dense(one_tower_dense, 1, activation=None)
            # prob
            prob = tf.sigmoid(one_tower_out)
            prob_res.append(prob)
            # label
            one_task_label = tf.slice(self.input_y, [0, index], [-1, 1])
            # mask [None, 1]
            one_task_mask = tf.slice(self.input_mask, [0, index], [-1, 1])
            # one_task_mask /= tf.reduce_mean(one_task_mask)
            # loss
            self.loss += tf.reduce_mean(
                tf.nn.sigmoid_cross_entropy_with_logits(labels=one_task_label, logits=one_tower_out) * one_task_mask) * \
                         self.task_weight[index]

tf.nn.sigmoid_cross_entropy_with_logits(labels=one_task_label, logits=one_tower_out))

        self.loss += tf.losses.get_regularization_loss()
        # list( [None, 1], [None, 1] ) => [None, 2]
        self.prob_out = tf.concat(prob_res, axis=1, name="prob")

        # 优化器
        optimizer = tf.train.AdamOptimizer(learning_rate=self.learning_rate)
        self.train_step = optimizer.minimize(self.loss, global_step=self.global_step)

代码的流程是使用循环构建多个export层,gate层,tower层,构建的层存储到列表,再搭配tf.concat将列表转化为tensor对象完成下面的流转工作,其中export,gate,tower都是简单的全连接。
最关键的一行代码为门输出的softmax权重和所有专家输出的隐藏层状态进行加权求和

one_task_gate_export_out = tf.reduce_sum(one_gate_out_expand * export_res_out, 2)

通过以上代码两个向量对应位置相乘再求和完成。
在loss计算环节,初始化一个loss再使用循环将每个任务的loss累加进去,同时简单的配置了任务权重,支持对主任务加权。


(4)模型训练

实例化MMOE如下

model = MMOE(feature_size=291, num_task=2, num_export=4, export_dim=128, tower_dim=8, l2_regularizer_scale=0.00,
                 learning_rate=0.01, task_weight="2,1")
model.build()

设置291维,2个任务,4个专家网络,每个专家网络输出的隐藏层维度128,塔层的全连接输出维度8。
训练代码如下

    saver = tf.train.Saver(tf.global_variables(), max_to_keep=1)
    with tf.Session() as sess:
        init_op = tf.group(tf.global_variables_initializer())
        sess.run(init_op)
        shutil.rmtree(os.path.join(BASIC_PATH, "./summary"), ignore_errors=True)
        writer = tf.summary.FileWriter(os.path.join(BASIC_PATH, "./summary"), sess.graph)
        batches = get_batch(100, 1024, train_x, train_y)
        # 验证
        val_feed_dict = {model.input_x: test_x, model.input_y: test_y}

        train_loss = []
        val_loss_list = []
        for batch in batches:
            epoch, batch_x, batch_y = batch
            feed_dict = {model.input_x: batch_x, model.input_y: batch_y}
            _, step, loss_val, train_prob = sess.run([model.train_step, model.global_step, model.loss, model.prob_out],
                                                     feed_dict=feed_dict)

            # writer.add_summary(merged, step)
            if step % 1 == 0:
                batch_metrics = get_metrics(train_prob, batch_y)
                # print(batch_metrics)
                print("epoch:", epoch + 1, "step:", step, "loss:", round(loss_val, 4), "task_1_auc:",
                      round(batch_metrics["task_1_auc"], 4), "task_1_ks:", round(batch_metrics["task_1_ks"], 4),
                      "task_2_auc:", round(batch_metrics["task_2_auc"], 4), "task_2_ks:",
                      round(batch_metrics["task_2_ks"], 4))
                train_loss.append(loss_val)

            if step % 3 == 0:
                loss_val, val_prob = sess.run([model.loss, model.prob_out], feed_dict=val_feed_dict)
                batch_metrics = get_metrics(val_prob, test_y)
                print("{:-^30}".format("evaluation"))
                print("[evaluation]", "loss:", loss_val, "task_1_auc:",
                      round(batch_metrics["task_1_auc"], 4), "task_1_ks:", round(batch_metrics["task_1_ks"], 4),
                      "task_2_auc:", round(batch_metrics["task_2_auc"], 4), "task_2_ks:",
                      round(batch_metrics["task_2_ks"], 4), "\n")

                # 计算当前loss相比之前的最有loss下降多少
                diff = (loss_val - min(val_loss_list)) if len(val_loss_list) else 0
                val_loss_list.append(loss_val)
                print("本轮loss比之前最小loss{}:{}, 当前最小loss: {}".format("上升" if diff > 0 else "下降", abs(diff),
                                                                  min(val_loss_list)))
                if diff <= 0:
                    saver.save(sess, os.path.join(BASIC_PATH, "./ckpt1/ckpt"))
                    print("[save checkpoint]")
                print("-" * 40)
                if early_stop(val_loss_list, windows=20):
                    print("{:-^30}".format("early stop!"))
                    break

设置在测试集20步早停,在每一步的测试和训练中将预测结果run出来使用sklearn.metrics得到ks和auc,允许结果如下

----------------------------------------
epoch: 23 step: 115 loss: 0.1192 task_1_auc: 0.9257 task_1_ks: 0.8261 task_2_auc: 1.0 task_2_ks: 1.0
epoch: 24 step: 116 loss: 0.1031 task_1_auc: 0.9168 task_1_ks: 0.6903 task_2_auc: 0.9855 task_2_ks: 0.9216
epoch: 24 step: 117 loss: 0.0862 task_1_auc: 0.9152 task_1_ks: 0.6923 task_2_auc: 0.9974 task_2_ks: 0.9551
----------evaluation----------
[evaluation] loss: 0.20613107 task_1_auc: 0.7482 task_1_ks: 0.4488 task_2_auc: 0.9555 task_2_ks: 0.8238 

本轮loss比之前最小loss上升:0.04107639193534851, 当前最小loss: 0.16505467891693115
----------------------------------------
epoch: 24 step: 118 loss: 0.0889 task_1_auc: 0.8997 task_1_ks: 0.6846 task_2_auc: 0.9936 task_2_ks: 0.9271
epoch: 24 step: 119 loss: 0.1083 task_1_auc: 0.9004 task_1_ks: 0.6667 task_2_auc: 0.9848 task_2_ks: 0.9112
epoch: 24 step: 120 loss: 0.0658 task_1_auc: 0.954 task_1_ks: 0.8333 task_2_auc: 1.0 task_2_ks: 1.0
----------evaluation----------
[evaluation] loss: 0.21031862 task_1_auc: 0.7333 task_1_ks: 0.4354 task_2_auc: 0.9551 task_2_ks: 0.8219 

本轮loss比之前最小loss上升:0.04526394605636597, 当前最小loss: 0.16505467891693115
----------------------------------------
---------early stop!----------
(5)跟踪训练过程中的loss和专家权重

对MMOE类稍加改造就可以单独将两个任务的loss和门输出的权重拿出来,在和训练的step一起作图,首先看下门输出的专家的权重


任务一:主任务的专家权重

对于一个batch比如512的输入,因为有4个专家,因此门的权重输出(512, 4),相当于一组训练模型给每个专家出了512个权重结果,对每一个专家上结果求均值得到最终这一组的权重4个值。
从图上来看对于任务一,专家4的权重最高,主要以专家4和专家3在发挥对任务一的决策作用。
再看任务2,图如下

任务二:辅助任务

任务二中专家2权重最大,以专家2和专家4的声音最大。
从两个任务的专家权重来看,不同任务之间专家权重出现差异,对于任务一专家4主导,对于任务二专家2主导,另外随着训练步长的深入,专家的组合权重也越来越稳定,趋势明显,没有出现任何波动的情况,说明起到了专家各领风骚的目的。
再看下两个任务的loss收敛情况

两个任务的loss收敛情况
(6)MMOE和单任务建模随机森林的对比

先构建预测部分的代码,训练部分将模型保存为了pb,预测代码如下

def predict_pb(input_x, pb_file_no=None):
    """从pb导入模型"""
    max_time = pb_file_no
    if max_time is None:
        max_time = max(os.listdir(os.path.join(BASIC_PATH, "./tfserving")))
    print("读取pb版本:", max_time)
    with tf.Session(graph=tf.Graph()) as sess:
        tf.saved_model.loader.load(sess, [tag_constants.SERVING], os.path.join(BASIC_PATH, "./tfserving", max_time))
        graph = tf.get_default_graph()
        input_self = graph.get_operation_by_name("input_x").outputs[0]
        probs = graph.get_tensor_by_name("prob:0")
        pred = sess.run(probs, feed_dict={input_self: input_x})

    return pred

同样使用循环打印出每个任务的auc和ks

test_ents, test_x, test_y = pickle.load(open(os.path.join(BASIC_PATH, get_string("test_data_path")), "rb"))
    pred = predict_pb(test_x)

# metrics
for i in range(2):
    one_task_pro = pred[:, i]
    one_labels = [x[i] for x in test_y]
    auc = roc_auc_score(one_labels, one_task_pro)
    fpr, tpr, threshold = roc_curve(one_labels, one_task_pro)
    ks = abs(fpr - tpr).max()
    print("task_{} auc:{} ks:{}".format(i + 1, auc, ks))

模型在测试集的结果如下

task_1 auc:0.8298755769955191 ks:0.5452797409212812
task_2 auc:0.9601592224979321 ks:0.8219727047146402

我们再构建一个树模型,使用sklearn的随机森林,代码如下

train_ents, train_x, train_y = pickle.load(open(os.path.join(BASIC_PATH, get_string("train_data_path")), "rb"))
test_ents, test_x, test_y = pickle.load(open(os.path.join(BASIC_PATH, get_string("test_data_path")), "rb"))
bad_ents = list(set([line.strip() for line in open(os.path.join(BASIC_PATH, get_string("bad_ent_path")))]))
external_bad_ents = list(
    set([line.strip() for line in open(os.path.join(BASIC_PATH, get_string("external_bad_ent_path")))]))

# 丢弃掉因为任务2加入的客户
train_one_task_ents = []
train_one_task_x = []
train_one_task_y = []
test_one_task_ents = []
test_one_task_x = []
test_one_task_y = []
for i, j, z in zip(train_ents, train_x, train_y):
    if i in external_bad_ents and i not in bad_ents:
        continue
    train_one_task_ents.append(i)
    train_one_task_x.append(j)
    train_one_task_y.append(z[0])
print("------train-------")
print("good:{}, bad: {}".format(len(train_one_task_y) - sum(train_one_task_y), sum(train_one_task_y)))
for i, j, z in zip(test_ents, test_x, test_y):
    if i in external_bad_ents and i not in bad_ents:
        continue
    test_one_task_ents.append(i)
    test_one_task_x.append(j)
    test_one_task_y.append(z[0])
print("------test-------")
print("good:{}, bad: {}".format(len(test_one_task_y) - sum(test_one_task_y), sum(test_one_task_y)))

clf = RandomForestClassifier(max_depth=15, n_estimators=300)
# 只训练一个任务
clf.fit(train_one_task_x, train_one_task_y)
probs = clf.predict_proba(test_one_task_x)[:, 1]
auc = roc_auc_score(test_one_task_y, probs)
fpr, tpr, threshold = roc_curve(test_one_task_y, probs)
ks = abs(fpr - tpr).max()
print("auc:{} ks:{}".format(auc, ks))

仅对label的第一维(任务一)和所有特征输入树模型,输出如下

------train-------
good:3257, bad: 233
------test-------
good:1453, bad: 101
auc:0.8478838592737457 ks:0.5450518899102573

稍作修改可以看随机森林对任务2单独的分类效果,汇总如下

MMOE对比单任务建模

可以看到多任务学习比单任务分别用随机森林建模KS和AUC都得到了提升

上一篇 下一篇

猜你喜欢

热点阅读