数据挖掘

wide_deep原理和tensorflow实现和tfservi

2020-06-12  本文已影响0人  xiaogp

wide_deep模型理解

分类问题模型一方面需要适配绝大部分样本,学习推演规律, 另一方面对于少数特例也需要有记忆能力. 神经网络和树模型对于连续型特征具有极强的拟合能力作为泛化部分, 线性模型LR和FM对离散特征交叉项进行学习作为记忆部分.也可以直接对预测的两端进行交叉组合输入给模型记忆,比如以用户的已购买商品序列做mutihot再和推荐的商品进行cross, 交给LR或者FM学习


Memorization记忆
面对拥有大规模离散sparse特征的CTR预估问题时,将特征进行非线性转换,然后再使用线性模型是在业界非常普遍的做法,最流行的即「LR+特征叉乘」。Memorization 通过一系列人工的特征叉乘(cross-product)来构造这些非线性特征,捕捉sparse特征之间的高阶相关性,即“记忆” 历史数据中曾共同出现过的特征对。
典型代表是LR模型,使用大量的原始sparse特征和叉乘特征作为输入,很多原始的dense特征通常也会被分桶离散化构造为sparse特征。>Memorization的缺点是:

1.需要更多的人工设计;
2.可能出现过拟合。可以这样理解:如果将所有特征叉乘起来,那么几乎相当于纯粹记住每个训练样本,这个极端情况是最细粒度的叉乘,我们可以通过构造更粗粒度的特征叉乘来增强泛化性

  1. 无法捕捉训练数据中未曾出现过的特征对,一方面交叉作用学习在训练数据,应一个原因是交叉作用采用FTRL是稀疏解大多为0

Generalization泛化
Generalization 为sparse特征学习低维的dense embeddings,将离散特征通过embedding连续化, 另外相比于其他连续化手法,embeddings 本身带有一定的语义信息,等于说模型学习到了离散特征自身的信息,同时考虑到了离散特征的相关性.
Generalization的优点是更少的人工参与,对历史上没有出现的特征组合有更好的泛化性 。此时Memorization就展示了优势,它可以“记住”这些特殊的特征组合。
Memorization根据历史行为数据,产生的推荐通常和用户已有行为的物品直接相关的物品。而Generalization会提高推荐物品的多样性。


模型结构
Wide & Deep模型结合了LR和DNN,其框架图如下所示。

image.png
Wide 该部分是广义线性模型,就是LR
image.png
表示权重和一阶特征和特征交叉组合相乘求和之后加入偏置
其中会对连续特征进行分箱,全部离散化
Deep 该部分是前馈神经网络,网络会对一些sparse特征(如ID类特征)学习一个低维的dense embeddings,然后和一些原始dense特征一起作为网络的输入。实际上就是离散特征embedding之后再和连续特征横向拼接
image.png
wide and deep
image.png
模型的输出是wide端LR和deep段神经网络的加权求和带偏置,套一个sigmoid激活函数实现二分类

联合训练
联合训练(Joint Training)和集成(Ensemble)是不同的,集成是每个模型单独训练,再将模型的结果汇合, 模型的融合发生在最后的预测阶段,集成的每个独立模型都得学得足够好才有利于随后的汇合,因此每个模型的model size也相对更大。联合训练所有模型都是同时训练的,彼此共享误差,模型的融合发生在训练阶段。联合训练的wide部分只需要作一小部分的特征叉乘来弥补deep部分的不足,不需要 一个full-size 的wide 模型,实际上就是重deep轻wide,wide部分是辅助作用,增加模型的记忆能力

优化算法

wide端采用带有L1正则化的FTRL, 由于wide部分使用了商品ID的mutihot再交叉组合, 所以维度爆炸期望L1并且FTRL获得稀疏解, 降低参数复杂度
deep端采用AdaGrad, 也可以直接adam


一个wide deep案例

wide deep案例.png

deep层将所有连续特征和离散特征的embedding拼接, wide端只使用了两个离散特征的交叉组合,最后甲醛求套log loss,联合训练
训练样本约5000亿
Categorical 特征(sparse)会有一个过滤阈值,即至少在训练集中出现m次才会被加入
Continuous 特征(dense)归一化到 [0,1] 之间
Categorical 特征映射到32维embeddings,和原始Continuous特征共1200维作为NN输入
Wide部分只用了一组特征叉乘,即被推荐的app ☓ 用户下载的app, 相当于端到端直接建模, 学习到已经购买的商品序列和推荐商品的关联关系.
线上模型更新时,通过“热启动”重训练,即使用上次的embeddings和模型参数初始化

tf.estimator 下wide and deep实现

以客户营销响应二分类预测为例, 预测用户是否会购买一款新上市的奶粉, 特征如下, 另外label是0不购买, 1购买

data = {'CardNo': 99235390,   ## 用户ID
        'Gender': 1,   # 性别
        'House': 1,   # 是否留下住址
        'actual_price': 682.6999999999998,  # 实际总消费金额 
        'day_diff': 8,   # R值
        'freq': 5,   # F值
        'if_milk': 0,   # 是否买过奶粉
        'max_price': 260.0,   # 最大一次客单价
        'milk_count': 0,   # 奶粉购买次数
        'milk_diff': 999,   # 上一次购买奶粉距今
        'milk_percent': 0.0,   # 奶粉金额占比购买总金额
        'mami': 0,   # 是否妈咪用品客群
        'niaobu': 0,   # 是否尿布客群
        'qingjie': 0,   # 是否婴儿清洁用品客群
        'yongpin': 0,   # 是否婴儿用品客群
        'shipin': 0,   # 是否婴儿食品客群
        'yizhi': 0}  # 是否婴儿益智客群

定义模型配置参数默认值, 包括epochs, batch_size, 检查点路径, 模型类型

import os
import argparse
import shutil

import pandas as pd
from sklearn.model_selection import train_test_split
import tensorflow as tf


def args_parse():
    parse = argparse.ArgumentParser()
    parse.add_argument(
            "--train_epochs", "-te", type=int, default=10, 
            help="[default %(default)s] the train epochs of model training.", 
            metavar="<TE>")
    parse.add_argument(
            "--batch_size", "-bs", type=int, default=32,
            help="[default: %(default)s] Batch size for training and evaluation.",
            metavar="<BS>")
    parse.add_argument(
            "--model_dir", "-mr", type=str, default="tmp",
            help="[default: %(default)s] The location of the model checkpoint files",
            metavar="<MD>")
    parse.add_argument(
            "--model_type", "-mt", type=str, default="wide_deep", 
            choices=['wide', 'deep', 'wide_deep'], 
            help='[default %(default)s] Valid model types: wide, deep, wide_deep.', 
            metavar="<MT>")
    
    parse.set_defaults(
            train_epochs=5, 
            batch_size=128,
            model_dir="widedeep_pandas_model", 
            model_type="wide_deep")
    
    flags = parse.parse_args()
    
    return flags

使用tf.feature_column定义特征类型和做特征工程

所有连续特征定义为连续变量,并且做最大最小归一化
离散特征做词表或者整数映射为离散特征
对总购买金额和最大一次购买金额分箱, 和是否买过奶粉进行离散特征二维交叉
所有连续特征和离散特征的onehot拼接, 构成deep部分
所有离散特征和离散特征交叉拼接, 构成wide部分

def build_model_columns():
    # 定义连续值列
    actual_price = tf.feature_column.numeric_column('actual_price', normalizer_fn=lambda x: (x - 0) / 150000, dtype=tf.float32)
    day_diff = tf.feature_column.numeric_column('day_diff', normalizer_fn=lambda x: (x - 1) / 90, dtype=tf.int64)
    freq = tf.feature_column.numeric_column('freq', normalizer_fn=lambda x: (x - 1) / 700, dtype=tf.int64)
    max_price = tf.feature_column.numeric_column('max_price', normalizer_fn=lambda x: (x - 0) / 45000, dtype=tf.float32)
    milk_count = tf.feature_column.numeric_column('milk_count', normalizer_fn=lambda x: (x - 0) / 70, dtype=tf.int64)
    milk_diff = tf.feature_column.numeric_column('milk_diff', normalizer_fn=lambda x: (x - 1) / 999, dtype=tf.int64)
    milk_percent = tf.feature_column.numeric_column('milk_percent', dtype=tf.float32)
    mami = tf.feature_column.numeric_column('mami', normalizer_fn=lambda x: (x - 0) / 5, dtype=tf.int64)
    niaobu = tf.feature_column.numeric_column('niaobu', normalizer_fn=lambda x: (x - 0) / 25, dtype=tf.int64)
    qingjie = tf.feature_column.numeric_column('qingjie', normalizer_fn=lambda x: (x - 0) / 20, dtype=tf.int64)
    yongpin = tf.feature_column.numeric_column('yongpin', normalizer_fn=lambda x: (x - 0) / 15, dtype=tf.int64)
    shipin = tf.feature_column.numeric_column('shipin', normalizer_fn=lambda x: (x - 0) / 100, dtype=tf.int64)
    yizhi = tf.feature_column.numeric_column('yizhi', normalizer_fn=lambda x: (x - 0) / 2, dtype=tf.int64)
    
    actual_price_raw = tf.feature_column.numeric_column('actual_price')
    max_price_raw = tf.feature_column.numeric_column('max_price')
    
    # 定义离散值列
    gender = tf.feature_column.categorical_column_with_vocabulary_list(
        'Gender', [1, -1, 0], dtype=tf.int64)
    house = tf.feature_column.categorical_column_with_identity(
        'House', 2)
    if_milk = tf.feature_column.categorical_column_with_identity(
        'if_milk', 2)
    
    # 对购买总金额和最大一次购买inx进行分箱
    actual_price_bin = tf.feature_column.bucketized_column(
            actual_price_raw, boundaries=[100, 250, 550, 1300])
    max_price_bin = tf.feature_column.bucketized_column(
            max_price_raw, boundaries=[30, 60, 120, 240])
    
    # 定义基础离散特征
    base_columns = [gender, house, if_milk]
    
    # 定义交叉组合特征
    cross_columns = [
            tf.feature_column.crossed_column([if_milk, actual_price_bin], hash_bucket_size=10),
            tf.feature_column.crossed_column([if_milk, max_price_bin], hash_bucket_size=10), 
            tf.feature_column.crossed_column([actual_price_bin, max_price_bin], hash_bucket_size=25)
    ]
    
    # wide部分的特征是0 1稀疏向量, 走LR, 采用全部离散特征和某些离散特征的交叉
    wide_columns = base_columns + cross_columns
    
    # 所有特征都走deep部分, 连续特征+离散特征onehot或者embedding
    deep_columns = [
            actual_price, day_diff, freq, max_price, milk_count, milk_diff, 
            milk_percent, mami, niaobu, qingjie, yongpin, shipin, yizhi, 
            tf.feature_column.indicator_column(gender), 
            tf.feature_column.indicator_column(house),
            tf.feature_column.indicator_column(if_milk)
            ]
            
    return wide_columns, deep_columns

构建wide and deep模型, 采用tf.estimator.DNNLinearCombinedClassifier现成接口

def build_estimator(model_dir, model_type, warm_start_from=None):
    """按照指定的模型生成估算器对象."""
    # 特征工程后的列对象组成的list
    wide_columns, deep_columns = build_model_columns()
    # deep 每一层全连接隐藏层单元个数, 4层每一层的激活函数是relu
    hidden_units = [100, 75, 50, 25]

    run_config = tf.estimator.RunConfig().replace(  # 将GPU个数设为0,关闭GPU运算。因为该模型在CPU上速度更快
        session_config=tf.ConfigProto(device_count={'GPU': 0}),
        save_checkpoints_steps=100, 
        keep_checkpoint_max=2)

    if model_type == 'wide':  # 生成带有wide模型的估算器对象
        return tf.estimator.LinearClassifier(
            model_dir=model_dir,
            feature_columns=wide_columns,
            config=run_config)
    elif model_type == 'deep':  # 生成带有deep模型的估算器对象
        return tf.estimator.DNNClassifier(
            model_dir=model_dir,
            feature_columns=deep_columns,
            hidden_units=hidden_units,
            config=run_config)
    else:
        return tf.estimator.DNNLinearCombinedClassifier(  # 生成带有wide和deep模型的估算器对象
            model_dir=model_dir,
            linear_feature_columns=wide_columns,
            dnn_feature_columns=deep_columns,
            dnn_hidden_units=hidden_units,
            config=run_config,
            warm_start_from=warm_start_from)

采用tf.data.Dataset读取和处理数据, 将pandas数据转化为list, tf.data.Dataset.from_tensor_slices处理内存中的数据进行shuffle和batch_size

def read_pandas(data_file):
    """pandas将数据读取内存"""
    assert os.path.exists(data_file), ("%s not found." % data_file)
    df = pd.read_csv(data_file).dropna()
    train, test = train_test_split(df, test_size=0.15, random_state=1)
    y_train = train.pop("label")
    y_test = test.pop("label")
    
    return train, test, y_train, y_test


def input_fn(X, y, shuffle, batch_size, predict=False):  # 定义估算器输入函数
    """估算器的输入函数."""
    if predict == True:
        # from_tensor_slices 从内存引入数据
        dataset = tf.data.Dataset.from_tensor_slices(X.to_dict(orient='list')) # 创建dataset数据集
    else:
        dataset = tf.data.Dataset.from_tensor_slices((X.to_dict(orient='list'), y)) # 创建dataset数据集

    if shuffle:  # 对数据进行乱序操作
        dataset = dataset.shuffle(buffer_size=64)  # 越大shuffle程度越大

    dataset = dataset.batch(batch_size)  # 将数据集按照batch_size划分
    dataset = dataset.prefetch(1)  # 预取数据,buffer_size=1 在多数情况下就足够了
    return dataset

训练模型和使用模型预测, 分别采用模型下的train, evaluate, predict接口, 输入指定input_fn函数

def trainmain(train, y_train, test, y_test):
    flags = args_parse()
    
    shutil.rmtree(flags.model_dir, ignore_errors=True)
    model = build_estimator(flags.model_dir, flags.model_type)  # 生成估算器对象

    def train_input_fn():  
        return input_fn(train, y_train, True, flags.batch_size, predict=False)

    def eval_input_fn(): 
        return input_fn(test, y_test, False, flags.batch_size, predict=False)
    
    # 在外部指定repeat 不在dataset中
    for n in range(flags.train_epochs):
        model.train(input_fn=train_input_fn)
        results = model.evaluate(input_fn=eval_input_fn)
        
        print('{0:-^30}'.format('evaluate at epoch %d' % ((n + 1))))
        # results 是一个字典
        print(pd.Series(results).to_frame('values'))
    
    # 导出模型
    export_model(model, "tfserving")
            
    
def premain(predict_data):
    flags = args_parse()

    def predict_input_fn():  # 定义预测集样本输入函数
        return input_fn(predict_data, None, False, flags.batch_size, predict=True)  # 该输入函数按照batch_size批次,不使用乱序处理

    model2 = build_estimator(flags.model_dir, flags.model_type) # 从检查点载入模型

    predictions = model2.predict(input_fn=predict_input_fn)

    # 数据下载
    predict_proba = list(map(lambda x: x['logistic'][0], predictions))
    predict_data['predict_proba'] = predict_proba
    predict_data.to_csv("milk_widedeep_res.csv", index=False)

模型导出为冻结图, 主要接口是tf.estimator.export.build_raw_serving_input_receiver_fn

def export_model(model, export_dir):
    features = {
        "CardNo": tf.placeholder(dtype=tf.int64, shape=1, name='CardNo'),
        "Gender": tf.placeholder(dtype=tf.int64, shape=1, name='Gender'),
        "House": tf.placeholder(dtype=tf.int64, shape=1, name='CardNo'),
        "actual_price": tf.placeholder(dtype=tf.float32, shape=1, name='Gender'),
        "day_diff": tf.placeholder(dtype=tf.int64, shape=1, name='CardNo'),
        "freq": tf.placeholder(dtype=tf.int64, shape=1, name='Gender'),
        "if_milk": tf.placeholder(dtype=tf.int64, shape=1, name='CardNo'),
        "max_price": tf.placeholder(dtype=tf.float32, shape=1, name='Gender'),
        "milk_count": tf.placeholder(dtype=tf.int64, shape=1, name='CardNo'),
        "milk_diff": tf.placeholder(dtype=tf.int64, shape=1, name='Gender'),
        "milk_percent": tf.placeholder(dtype=tf.float32, shape=1, name='CardNo'),
        "mami": tf.placeholder(dtype=tf.int64, shape=1, name='Gender'),
        "niaobu": tf.placeholder(dtype=tf.int64, shape=1, name='Gender'),
        "qingjie": tf.placeholder(dtype=tf.int64, shape=1, name='CardNo'),
        "yongpin": tf.placeholder(dtype=tf.int64, shape=1, name='Gender'),
        "shipin": tf.placeholder(dtype=tf.int64, shape=1, name='CardNo'),
        "yizhi": tf.placeholder(dtype=tf.int64, shape=1, name='Gender')                
    }
    
    example_input_fn = tf.estimator.export.build_raw_serving_input_receiver_fn(features)

    model.export_savedmodel(export_dir, example_input_fn, as_text=True)

模型训练过程, 5轮epochs后的验证集评价

-----evaluate at epoch 5------
                          values
accuracy                0.750373
accuracy_baseline       0.502239
auc                     0.818089
auc_precision_recall    0.850911
average_loss            0.489565
label/mean              0.497761
loss                   62.477852
precision               0.869034
prediction/mean         0.509669
recall                  0.586957
global_step           595.000000

模型部署

模型导出冻结图后的tfserving目录下的结构

.
└── 1592275239
    ├── saved_model.pbtxt
    └── variables
        ├── variables.data-00000-of-00002
        ├── variables.data-00001-of-00002
        └── variables.index

使用saved_model_cli工具查看pb文件的参数内容, 可以看到pb的前面是predict, 以及每个inputs的key, 类型, shape

saved_model_cli show --all --dir 1592275239/
signature_def['predict']:
  The given SavedModel SignatureDef contains the following input(s):
    inputs['CardNo'] tensor_info:
        dtype: DT_INT64
        shape: (-1)
        name: CardNo_16:0
    inputs['Gender'] tensor_info:
        dtype: DT_INT64
        shape: (-1)
        name: Gender_19:0
    inputs['House'] tensor_info:
        dtype: DT_INT64
        shape: (-1)
        name: CardNo_17:0
    inputs['actual_price'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
        name: Gender_20:0
    inputs['day_diff'] tensor_info:
        dtype: DT_INT64
        shape: (-1)
        name: CardNo_18:0
    inputs['freq'] tensor_info:
        dtype: DT_INT64
        shape: (-1)
        name: Gender_21:0
    inputs['if_milk'] tensor_info:
        dtype: DT_INT64
        shape: (-1)
        name: CardNo_19:0
    inputs['mami'] tensor_info:
        dtype: DT_INT64
        shape: (-1)
        name: Gender_24:0
    inputs['max_price'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
        name: Gender_22:0
    inputs['milk_count'] tensor_info:
        dtype: DT_INT64
        shape: (-1)
        name: CardNo_20:0
    inputs['milk_diff'] tensor_info:
        dtype: DT_INT64
        shape: (-1)
        name: Gender_23:0
    inputs['milk_percent'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
        name: CardNo_21:0
    inputs['niaobu'] tensor_info:
        dtype: DT_INT64
        shape: (-1)
        name: Gender_25:0
    inputs['qingjie'] tensor_info:
        dtype: DT_INT64
        shape: (-1)
        name: CardNo_22:0
    inputs['shipin'] tensor_info:
        dtype: DT_INT64
        shape: (-1)
        name: CardNo_23:0
    inputs['yizhi'] tensor_info:
        dtype: DT_INT64
        shape: (-1)
        name: Gender_27:0
    inputs['yongpin'] tensor_info:
        dtype: DT_INT64
        shape: (-1)
        name: Gender_26:0
  The given SavedModel SignatureDef contains the following output(s):
    outputs['class_ids'] tensor_info:
        dtype: DT_INT64
        shape: (-1, 1)
        name: head/predictions/ExpandDims:0
    outputs['classes'] tensor_info:
        dtype: DT_STRING
        shape: (-1, 1)
        name: head/predictions/str_classes:0
    outputs['logistic'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 1)
        name: head/predictions/logistic:0
    outputs['logits'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 1)
        name: add:0
    outputs['probabilities'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 2)
        name: head/predictions/probabilities:0
  Method name is: tensorflow/serving/predict

启动tfserving http服务

sudo docker run -t --rm -p 8502:8501 -v "/home/wide_deep/tfserving/:/models/wide_deep/" -e MODEL_NAME=wide_deep tensorflow/serving

接口测试

import requests
data = {'CardNo': 99235390, 
        'Gender': 1, 
        'House': 1, 
        'actual_price': 682.6999999999998, 
        'day_diff': 8, 
        'freq': 5, 
        'if_milk': 0, 
        'max_price': 260.0, 
        'milk_count': 0, 
        'milk_diff': 999, 
        'milk_percent': 0.0, 
        'mami': 0, 
        'niaobu': 0, 
        'qingjie': 0, 
        'yongpin': 0, 
        'shipin': 0, 
        'yizhi': 0}
res = requests.post("http://127.0.0.1:8502/v1/models/wide_deep:predict", 
                    json={"instances": [data], "signature_name": "predict"})
print(res.text)

接口返回如下, logistic就是预测概率

{
    "predictions": [
        {
            "logistic": [0.542829514],
            "probabilities": [0.457170457, 0.542829514],
            "class_ids": [1],
            "logits": [0.171739012],
            "classes": ["1"]
        }
    ]
}
上一篇 下一篇

猜你喜欢

热点阅读