神经协同过滤NCF(二)之代码实战篇

2020-12-02  本文已影响0人  HaloZhang

简介

本文将使用PyTorch框架来实现神经协同过滤(NCF)算法的三个模型,分别是GMF、MLP、NeuMF。关于NCF相关内容可以参考神经协同过滤NCF(一)之模型介绍篇

数据预处理

这里采用的是MovieLens中的ml-1m数据集,并且只使用了其中的ratings.dat文件,即用户对电影的评分数据。关于这个数据集可以参考推荐系统数据集之MovieLens
数据预处理主要包含以下几项功能:

完整代码如下:

import random
import pandas as pd
import numpy as np
from copy import deepcopy

random.seed(0)

class DataProcess(object):
    def __init__(self, filename):
        self._filename = filename
        self._loadData()
        self._preProcess()
        self._binarize(self._originalRatings)
        # 对'userId'这一列的数据,先去重,然后构成一个用户列表
        self._userPool = set(self._originalRatings['userId'].unique())
        self._itemPool = set(self._originalRatings['itemId'].unique())
        print("user_pool size: ", len(self._userPool))
        print("item_pool size: ", len(self._itemPool))

        self._select_Negatives(self._originalRatings)
        self._split_pool(self._preprocessRatings)

    def _loadData(self):
        self._originalRatings = pd.read_csv(self._filename, sep='::', header=None, names=['uid', 'mid', 'rating', 'timestamp'],
                                            engine='python')
        return self._originalRatings

    def _preProcess(self):
        """
        对user和item都重新编号,这里这么做的原因是因为,模型的输入是one-hot向量,需要把user和item都限制在Embedding的长度之内,
        模型的两个输入的长度分别是user和item的数量,所以要重新从0编号。
        """
        # 1. 新建名为"userId"的列,这列对用户从0开始编号
        user_id = self._originalRatings[['uid']].drop_duplicates().reindex()
        user_id['userId'] = np.arange(len(user_id)) #根据user的长度创建一个数组
        # 将原先的DataFrame与user_id按照"uid"这一列进行合并
        self._originalRatings = pd.merge(self._originalRatings, user_id, on=['uid'], how='left')

        # 2. 对物品进行重新排列
        item_id = self._originalRatings[['mid']].drop_duplicates()
        item_id['itemId'] = np.arange(len(item_id))
        self._originalRatings = pd.merge(self._originalRatings, item_id, on=['mid'], how='left')

        # 按照['userId', 'itemId', 'rating', 'timestamp']的顺序重新排列
        self._originalRatings = self._originalRatings[['userId', 'itemId', 'rating', 'timestamp']]
        print(self._originalRatings)
        print('Range of userId is [{}, {}]'.format(self._originalRatings.userId.min(), self._originalRatings.userId.max()))
        print('Range of itemId is [{}, {}]'.format(self._originalRatings.itemId.min(), self._originalRatings.itemId.max()))

    def _binarize(self, ratings):
        """
        binarize data into 0 or 1 for implicit feedback
        """
        ratings = deepcopy(ratings)
        ratings['rating'][ratings['rating'] > 0] = 1.0
        self._preprocessRatings = ratings
        # print("binary: \n", self._preprocessRatings)

    def _select_Negatives(self, ratings):
        """
        Select al;l negative samples and 100 sampled negative items for each user.
        """
        # 构造user-item表
        interact_status = ratings.groupby('userId')['itemId'].apply(set).reset_index().rename(
            columns={'itemId': 'interacted_items'})
        print("interact_status: \n", interact_status)

        # 把与用户没有产生过交互的样本都当做是负样本
        interact_status['negative_items'] = interact_status['interacted_items'].apply(lambda x: self._itemPool - x)

        # 从上面的全部负样本中随机选99个出来
        interact_status['negative_samples'] = interact_status['negative_items'].apply(lambda x: random.sample(x, 99))
        print("after sampling interact_status: \n", interact_status)

        print("select and rearrange columns")
        self._negatives = interact_status[['userId', 'negative_items', 'negative_samples']]

    def _split_pool(self, ratings):
        """leave one out train/test split """
        print("sort by timestamp descend")
        # 先按照'userID'进行分组,然后根据时间戳降序排列
        ratings['rank_latest'] = ratings.groupby(['userId'])['timestamp'].rank(method='first', ascending=False)
        print(ratings)

        # 选取排名第一的数据作为测试集,也就是最新的那个数据
        test = ratings[ratings['rank_latest'] == 1]
        # 选取所有排名靠后的,也就是历史数据当做训练集
        train = ratings[ratings['rank_latest'] > 1]
        # print("test: \n", test)
        # print("train: \n", train)

        print("size of test {0}, size of train {1}".format(len(test), len(train)))

        # 确保训练集和测试集的userId是一样的
        assert train['userId'].nunique() == test['userId'].nunique()

        self.train_ratings = train[['userId', 'itemId', 'rating']]
        self.test_ratings = test[['userId', 'itemId', 'rating']]

    def sample_generator(self, num_negatives):
        # 合并之后的train_ratings的列包括['userId','itemId','rating','negative_items']
        train_ratings = pd.merge(self.train_ratings, self._negatives[['userId', 'negative_items']], on='userId')
        # 从用户的全部负样本集合中随机选择num_negatives个样本当做负样本,并产生一个新的名为"negatives"的列
        train_ratings['negatives'] = train_ratings['negative_items'].apply(lambda x: random.sample(x, num_negatives))
        print(train_ratings)

        # 构造模型所需要的数据,分别是输入user、items以及目标分值ratings。
        users, items, ratings = [], [], []
        for row in train_ratings.itertuples():
            # 构造正样本,分别是userId, itemId以及目标分值1
            users.append(int(row.userId))
            items.append(int(row.itemId))
            ratings.append(float(row.rating))
            # 为每个用户构造num_negatives个负样本,分别是userId, itemId以及目标分值0
            for i in range(num_negatives):
                users.append(int(row.userId))
                items.append(int(row.negatives[i]))
                ratings.append(float(0)) # 负样本的ratings为0,直接强行设置为0

        return users, items, ratings

NCF框架

NCF框架是本文要实现的3个模型的主体结构,因此在代码设计的时候,可以将3个模型公共的部分放到这里,因此我将它定义为一个基类。它主要定义了Embedding层,全连接层,激活函数等。由于每一个模型的线性模型的输入是不一致的,故将其下放到子类中去定义。 NCF框架

代码如下:

class NCF(object):
    def __init__(self, config, latent_dim_gmf=8, latent_dim_mlp=8):
        self._config = config
        self._num_users = config['num_users']
        self._num_items = config['num_items']
        self._latent_dim_gmf = latent_dim_gmf
        self._latent_dim_mlp = latent_dim_mlp

        # 建立MLP模型的user Embedding层和item Embedding层,输入的向量长度分别为用户的数量,item的数量,输出都是隐式空间的维度latent dim
        self._embedding_user_mlp = torch.nn.Embedding(num_embeddings=self._num_users, embedding_dim=self._latent_dim_mlp)
        self._embedding_item_mlp = torch.nn.Embedding(num_embeddings=self._num_users, embedding_dim=self._latent_dim_mlp)
        # 建立GMP模型的user Embedding层和item Embedding层,输入的向量长度分别为用户的数量,item的数量,输出都是隐式空间的维度latent dim
        self._embedding_user_gmf = torch.nn.Embedding(num_embeddings=self._num_users, embedding_dim=self._latent_dim_gmf)
        self._embedding_item_gmf = torch.nn.Embedding(num_embeddings=self._num_users, embedding_dim=self._latent_dim_gmf)

        # 全连接层
        self._fc_layers = torch.nn.ModuleList()
        for idx, (in_size, out_size) in enumerate(zip(config['layers'][:-1], config['layers'][1:])):
            self._fc_layers.append(torch.nn.Linear(in_size, out_size))

        # 激活函数
        self._logistic = nn.Sigmoid()

    @property
    def fc_layers(self):
        return self._fc_layers

    @property
    def embedding_user_gmf(self):
        return self._embedding_user_gmf

    @property
    def embedding_item_gmf(self):
        return self._embedding_item_gmf

    @property
    def embedding_user_mlp(self):
        return self._embedding_user_mlp

    @property
    def embedding_item_mlp(self):
        return self._embedding_item_mlp

    def saveModel(self):
        torch.save(self.state_dict(), self._config['model_name'])

    @abstractmethod
    def load_preTrained_weights(self):
        pass

GMF模型

GMF模型的结构如下图所示:

GMF模型
可以看到它主要包含user和item两个embedding层,一个线性模型外加一个Sigmoid激活函数,PyTorch中提供了现成的Embedding模块线性模块,以及Sigmoid模块。可以直接使用。它分别继承自NCF基类和torch.nn.Module。
代码如下:
class GMF(NCF, nn.Module):
    def __init__(self, config, latent_dim_gmf):
        nn.Module.__init__(self)
        NCF.__init__(self, config=config, latent_dim_gmf=latent_dim_gmf)
        # 创建一个线性模型,输入为潜在特征向量,输出向量长度为1
        self._affine_output = nn.Linear(in_features=self._latent_dim_gmf, out_features=1)

    @property
    def affine_output(self):
        return self._affine_output

    def forward(self, user_indices, item_indices):
        """
        前向传播
        :param user_indices: user Tensor
        :param item_indices: item Tensor
        :return: predicted rating
        """
        # 先将user和item转换为对应的Embedding表示,注意这个支持Tensor操作,即传入的是一个user列表,对其中每一个user都会执行Embedding操作,即都会使用Embedding表示
        user_embedding = self._embedding_user_gmf(user_indices)
        item_embedding = self._embedding_item_gmf(item_indices)
        # 对user_embedding和user_embedding进行逐元素相乘, 这一步其实就是MF算法的实现
        element_product = torch.mul(user_embedding, item_embedding)
        # 将逐元素的乘积的结果通过一个S型神经元
        logits = self._affine_output(element_product)
        rating = self._logistic(logits)
        return rating

    def load_preTrained_weights(self):
        pass

MLP模型

MLP与GMP一样,只需要定义自己的线性模型即可。 MLP模型

代码如下:


class MLP(NCF, nn.Module):
    def __init__(self, config, latent_dim_mlp):
        nn.Module.__init__(self)
        NCF.__init__(self, config=config, latent_dim_mlp=latent_dim_mlp)
        # 创建一个线性模型,输入为潜在特征向量,输出向量长度为1
        self._affine_output = torch.nn.Linear(in_features=config['layers'][-1], out_features=1)

    @property
    def affine_output(self):
        return self._affine_output

    def forward(self, user_indices, item_indices):
        """
        :param user_indices: user Tensor
        :param item_indices: item Tensor
        """
        # 先将user和item转换为对应的Embedding表示,注意这个支持Tensor操作,即传入的是一个user列表,
        # 对其中每一个user都会执行Embedding操作,即都会使用Embedding表示
        user_embedding = self._embedding_user_mlp(user_indices)
        item_embedding = self._embedding_item_mlp(item_indices)
        vector = torch.cat([user_embedding, item_embedding], dim=-1) # concat latent vector
        for idx, _ in enumerate(range(len(self._fc_layers))):
            vector = self._fc_layers[idx](vector)
            vector = torch.nn.ReLU()(vector)
            ##  Batch normalization
            # vector = torch.nn.BatchNorm1d()(vector)
            ## DroupOut layer
            # vector = torch.nn.Dropout(p=0.5)(vector)
        logits = self._affine_output(vector)
        rating = self._logistic(logits)
        return rating

    def load_preTrained_weights(self):
        config = self._config
        gmf_model = GMF(config, config['latent_dim_gmf'])
        if config['use_cuda'] is True:
            gmf_model.cuda()
        # 加载GMF模型参数到指定的GPU上
        state_dict = torch.load(self._config['pretrain_gmf'])
                                #map_location=lambda storage, loc: storage.cuda(device=self._config['device_id']))
                                #map_location = {'cuda:0': 'cpu'})
        gmf_model.load_state_dict(state_dict, strict=False)

        self._embedding_item_mlp.weight.data = gmf_model.embedding_item_gmf.weight.data
        self._embedding_user_mlp.weight.data = gmf_model.embedding_user_gmf.weight.data

NeuMF模型

NeuMF模型是集成了前两个模型,因此前向传播部分会稍微复杂点,需要先将GMF模型和MLP模型的输出连接起来之后再输入到线性模型中去。 NeuMF模型

代码如下:

class NeuMF(NCF, nn.Module):
    def __init__(self, config, latent_dim_gmf, latent_dim_mlp):
        nn.Module.__init__(self)
        NCF.__init__(self, config, latent_dim_gmf, latent_dim_mlp)

        # 创建一个线性模型,输入为GMF模型和MLP模型的潜在特征向量长度之和,输出向量长度为1
        self._affine_output = torch.nn.Linear(in_features=config['layers'][-1] + config['latent_dim_gmf'], out_features=1)

    @property
    def affine_output(self):
        return self._affine_output

    def forward(self, user_indices, item_indices):
        user_embedding_mlp = self._embedding_user_mlp(user_indices)
        item_embedding_mlp = self._embedding_item_mlp(item_indices)
        user_embedding_gmf = self._embedding_user_gmf(user_indices)
        item_embedding_gmf = self._embedding_item_gmf(item_indices)

        # concat the two latent vector
        mlp_vector = torch.cat([user_embedding_mlp, item_embedding_mlp], dim=-1)
        # multiply the two latent vector
        gmf_vector = torch.mul(user_embedding_gmf, item_embedding_gmf)

        for idx, _ in enumerate(range(len(self._fc_layers))):
            mlp_vector = self._fc_layers[idx](mlp_vector)
            mlp_vector = torch.nn.ReLU()(mlp_vector)

        vector = torch.cat([mlp_vector, gmf_vector], dim=-1)
        logits = self._affine_output(vector)
        rating = self._logistic(logits)
        return rating

    def load_preTrained_weights(self):
        # 加载MLP模型参数
        mlp_model = MLP(self._config['mlp_config'], self._config['mlp_config']['latent_dim_mlp'])
        if self._config['use_cuda'] is True:
            mlp_model.cuda()
        state_dict = torch.load(self._config['pretrain_mlp'])
                                # map_location=lambda storage, loc: storage.cuda(device=self._config['device_id']))
                                # map_location = {'cuda:0': 'cpu'})
        mlp_model.load_state_dict(state_dict, strict=False)

        self._embedding_item_mlp.weight.data = mlp_model.embedding_item_mlp.weight.data
        self._embedding_user_mlp.weight.data = mlp_model.embedding_user_mlp.weight.data
        for idx in range(len(self._fc_layers)):
            self._fc_layers[idx].weight.data = mlp_model.fc_layers[idx].weight.data

        # 加载GMF模型参数
        gmf_model = GMF(self._config['gmf_config'], self._config['gmf_config']['latent_dim_gmf'])
        if self._config['use_cuda'] is True:
            gmf_model.cuda()
        state_dict = torch.load(self._config['pretrain_gmf'])
                                # map_location=lambda storage, loc: storage.cuda(device=self._config['device_id']))
                                # map_location = {'cuda:0': 'cpu'})
        mlp_model.load_state_dict(state_dict, strict=False)

        self._embedding_item_gmf.weight.data = gmf_model.embedding_item_gmf.weight.data
        self._embedding_user_gmf.weight.data = gmf_model.embedding_user_gmf.weight.data

        self._affine_output.weight.data = self._config['alpha'] * torch.cat([mlp_model.affine_output.weight.data, gmf_model.affine_output.weight.data], dim=-1)
        self._affine_output.bias.data = self._config['alpha'] * (mlp_model.affine_output.bias.data + gmf_model.affine_output.bias.data)

训练器

单独定义了一个Trainer的类,用来训练不同的模型,这个类里面包含了优化器,损失函数的定义,以及前向传播、反向传播、参数更新、模型保存等操作。
代码如下:

class Trainer(object):
    def __init__(self, model, config):
        self._config = config
        self._model = model
        # 选择优化器
        self._optimizer = pick_optimizer(self._model, self._config)
        # 定义损失函数,对于隐反馈数据,这里使用交叉熵损失函数
        self._crit = torch.nn.BCELoss()

    def _train_single_batch(self, users, items, ratings):
        """
        对单个小批量数据进行训练
        :param users: user Tensor
        :param items: item Tensor
        :param ratings: rating Tensor
        :return:
        """
        if self._config['use_cuda'] is True:
            # 将这些数据由CPU迁移到GPU
            users, items, ratings = users.cuda(), items.cuda(), ratings.cuda()

        # 先将梯度清零,如果不清零,那么这个梯度就和上一个mini-batch有关
        self._optimizer.zero_grad()
        # 模型的输入users, items,调用forward进行前向传播
        ratings_pred = self._model(users, items)
        # 通过交叉熵损失函数来计算损失, ratings_pred.view(-1)代表将预测结果摊平,变成一维的结构。
        loss = self._crit(ratings_pred.view(-1), ratings)
        # 反向传播计算梯度
        loss.backward()
        # 梯度下降等优化器 更新参数
        self._optimizer.step()
        # 将loss的值提取成python的float类型
        loss = loss.item()
        return loss

    def _train_an_epoch(self, train_loader, epoch_id):
        """
        训练一个Epoch,即将训练集中的所有样本全部都过一遍
        :param train_loader: Torch的DataLoader
        :param epoch_id: 训练轮次Id
        :return:
        """
        # 告诉模型目前处于训练模式,启用dropout以及batch normalization
        self._model.train()
        total_loss = 0
        # 从DataLoader中获取小批量的id以及数据
        for batch_id, batch in enumerate(train_loader):
            assert isinstance(batch[0], torch.LongTensor)
            # 这里的user, item, rating大小变成了1024维了,因为batch_size是1024,即每次选取1024个样本数据进行训练
            user, item, rating = batch[0], batch[1], batch[2]
            rating = rating.float()
            loss = self._train_single_batch(user, item, rating)
            print('[Training Epoch {}] Batch {}, Loss {}'.format(epoch_id, batch_id, loss))
            total_loss += loss
        print('Training Epoch: {}, TotalLoss: {}'.format(epoch_id, total_loss))

    def train(self, sampleGenerator):
        # 是否使用GPU加速
        self.use_cuda()
        # 是否使用预先训练好的参数
        self.load_preTrained_weights()

        for epoch in range(self._config['num_epoch']):
            print('-' * 20 + ' Epoch {} starts '.format(epoch) + '-' * 20)
            # 每个轮次都重新随机产生样本数据集
            users, items, ratings = sampleGenerator(num_negatives=self._config['num_negative'])
            # 构造一个DataLoader
            data_loader = Construct_DataLoader(users=users, items=items, ratings=ratings,
                                               batchsize=self._config['batch_size'])
            # 训练一个轮次
            self._train_an_epoch(data_loader, epoch_id=epoch)

    def use_cuda(self):
        if self._config['use_cuda'] is True:
            assert torch.cuda.is_available(), 'CUDA is not available'
            torch.cuda.set_device(self._config['device_id'])
            self._model.cuda()

    def load_preTrained_weights(self):
        if self._config['pretrain'] is True:
            self._model.load_preTrained_weights()

    def save(self):
        self._model.saveModel()

测试代码

主要是分为了训练和测试两个阶段,先训练模型,然后进行一些简单的测试。
代码如下:


if __name__ == "__main__":
    ####################################################################################
    # NCF 神经协同过滤算法
    ####################################################################################

    # 加载和预处理数据
    dp = DataProcess("../Data/ml-1m/ratings.dat")

    # 初始化GMP模型
    # config = gmf_config
    # model = GMF(config, config['latent_dim_gmf'])

    # # 初始化MLP模型
    # config = mlp_config
    # model = MLP(config, config['latent_dim_mlp'])

    # 初始化NeuMF模型
    config = neumf_config
    model = NeuMF(config, config['latent_dim_gmf'], config['latent_dim_mlp'])

    # ###############################################################
    # 模型训练阶段
    # ###############################################################
    trainer = Trainer(model=model, config=config)
    trainer.train(dp.sample_generator)
    trainer.save()

    # ###############################################################
    # 模型测试阶段
    # ###############################################################

    # 加载数据集
    dp = DataProcess("../Data/ml-1m/ratings.dat")

    config = neumf_config
    neumf = NeuMF(config, config['latent_dim_gmf'], config['latent_dim_mlp'])
    state_dict = torch.load("../Models/NCF_NeuMF.model", map_location=torch.device('cpu'))
    neumf.load_state_dict(state_dict, strict=False)

    print(neumf.forward(torch.LongTensor([1]), torch.LongTensor([1193])))
    print(neumf.forward(torch.LongTensor([1]),torch.LongTensor([661])))
    print(neumf.forward(torch.LongTensor([1]),torch.LongTensor([914])))
    print(neumf.forward(torch.LongTensor([1]),torch.LongTensor([3408])))

    print(neumf.forward(torch.LongTensor([1]),torch.LongTensor([1245])))
    print(neumf.forward(torch.LongTensor([1]),torch.LongTensor([32])))
    print(neumf.forward(torch.LongTensor([1]),torch.LongTensor([4])))
    print(neumf.forward(torch.LongTensor([1]),torch.LongTensor([62])))

完整代码见https://github.com/HeartbreakSurvivor/RsAlgorithms

参考

上一篇 下一篇

猜你喜欢

热点阅读