召回算法:基于spark技术完成协同过滤算法(ItemCF)
写在前面:随着自己已经开始接触spark,hive等大数据技术,现在来写一篇如何用spark来完成ItemCF的文章。
一、 ItemCF的一些概念
在进入代码环节之前,我们先再来熟悉下ItemCF的一些定义和方法。
基于物品的协同过滤算法的核心思想是:给用户推荐和其过去感兴趣的物品相似的物品,比如要给A做个性化推荐,那就给他推荐跟他以前喜欢的物品的相似物品。
算法主要包括两个步骤:
(1)计算物品之间的相似度
ItemCF算法并不利用物品的内容属性计算物品之间的相似度,它主要通过分析用户的行为记录计算物品之间的相似度。该算法认为,物品A和物品B具有很大的相似度是因为喜欢物品A的用户大都也喜欢物品 B。
每个用户的兴趣都局限在某几个方面,如果两个物品属于一个用户的兴趣列表,那么两个物品可能就属于有限的几个领域,而如果两个物品同属于很多用户的兴趣列表,那么他们就可能属于同一个领域,因而有很大的相似度。因此ItemUser利用下面公式计算物品之间的相似度:
这里同样会出现一个用户购买了很多商品的问题,也就是活跃用户的问题,比如一个人是做口红批发生意的,他从淘宝网上以批发价买了所有的口红,从前面对ItemCF的讨论可以看到,这意味着因为存在这么一个用户,所有口红的两两之间就产生了相似度。这个用户虽然活跃,但是买这些口红并非都是出于自身的兴趣,而且这些口红覆盖了淘宝网口红品牌的很多领域,所以这个用户对于他所购买口红的两两相似度的贡献应该远远小于一个只买了几十个口红的女生。也就是让这些活跃用户对相似度计算造成的影响降低,计算如下:
(2)计算用户的兴趣
在得到物品j相似的k个物品S(j,K)后,通过下面的公式计算用户u对物品j的兴趣
其中P(u,j)表示用户u对物品j的兴趣,S(j,k)表示和物品j最相似的k个物品,N(u)表示用户产生过行为的物品集合,
表示用户u对物品i是否产生过行为(对于评分系统,可以表示u对i的评分。
不进行惩罚的例子:给用户U1推荐物品,K取3,用户U1没有对e,f产生过行为:
S(e,3) = {a,f,b(c)}, S(f,3) = {e,b,c}
如果要给用户U1只推荐一个物品,由于P(U1,e)<P(U1,f),推荐物品f。
精度不和k成线性关系,选择合适的k比较重要。
二、 代码实战环节
我们采用pysaprk来完成这个代码。
数据集采用的是movielens,这个数据集可以自己在网上搜一下下载,下载下来后找到rating.rat这个文件,这个文件是用户对电影的评分,长这样:
其中第一列是user_id,第二列是item_id,第三列是评分,评分总共分为5个等级。
首先,我们倒入一下几个包:
然后准备一下几个函数,用于启动pypark:
其中sparkConf用于定义我们的AppName,这个可以随便取,因为这里都默认是在本地的环境跑,不需要跟公司里面的集群环境一下,最好有一个独特的name。
第二个函数用于设置pyspark的日志
第三个函数用于设置这个任务是在集群上跑还是本地跑,这里我设置的是local,当然有集群环境的也可以在这里修改。
接下来就是ItemCF的计算过程了:
1.取user-item的RDD文件
这里我们默认只要用户对电影有过评分就是user click过这个item,也就是他们有过交互行为,当然这是一种简单的方式,你也可以把评分小于3的剔除掉,当作用户不喜欢这个物品。
具体的操作就是: 对每一行都只选取user item
2.计算user:[item...]的rdd
得到user点击过那些item。
这个rdd操作的意思是通过groupByKey这个操作把用户所有点击过的物品聚合成一个list
3.计算item:[user...]的rdd
得到item被那些user点击过。
操作跟2中的操作一样。
4.计算item的相似度矩阵
首先我们用item_user这个rdd跟自己作笛卡尔积,这么做的原因是可以计算不同item之间的相似度。
笛卡尔积的定义:
所以这里item_user的结果通过filter过滤掉了自己跟自己笛卡尔的部分。
接下来就是计算item-item相似度矩阵了
其中calculate函数是:
这里跟我们前面说的对热门item降权不一样,主要是为了方便讲解,如果想在这里把log降权加进去的话可以自己加
最好通过一个map跟一个groupby操作,最终得到这样的w,w中的一行长这样:
user_id, [(item_0, score0),(item_1, score1),...]
5.推荐
计算完item的相似度矩阵后,就可以进行推荐了
、其中recommend的函数是这样:
给定一个user_id,相似度矩阵W,用户的交互矩阵user_item
find函数长这样:
get函数长这样:
推荐的逻辑跟我们在前面itemcf里面的方法是一样的,可以看一下这些rdd的操作:
(1)user_see_item计算的是这个用户交互过的item
(2)sim_item 计算的是user_see_item这些item在w中所在的行
(3) recommed_item计算的是:
a.找到user_see_item在w所在的行
b.取前k个最相近的
c.求总得分
(4) 排序取前k个物品
这里我是直接采用了相同的k,事实上在(3)b中的k跟最后的topk中的k是可以不同的,一般可以这么取
第一个k取500,第二个k取50这样,当然也可以取100,50,都是可以的。
三、 总结
上面的代码只是简单的实现了下Itemcf,其实还是有几个需要修改的地方,下面列举一下:
1. 最后的结果没有过滤用户点击过的物品
2.只是对一个user进行推荐,如果需要在真实的工业界进行协同过滤的话,需要对所有的user_id同时进行计算,也就是要优化rdd的计算时间,想一下,如果一个个算的话那么spark的并行计算就失去了优势,因此需要改成所有的user一起并行计算,然后将离线计算的任务例行化,每天都定时执行,将离线计算的结果存储到hdfs或者hive中。