Recommendation System
本文结构安排
- Item-Based Collaboration Filtering
- Slope One
- Matrix Factorization
Item-based Collaboration Filtering
推荐系统本质是在用户需求不明确的情况下,解决信息过载的问题,联系用户和信息,一方面帮助用户发现对自己有价值的信息,另一方面让信息能够展现在对它感兴趣的用户面前,从而实现信息消费者和信息生产者的双赢(这里的信息的含义可以非常广泛,比如咨询、电影和商品等,统称为item)
协同过滤主要分为基于邻域以及基于隐语义模型。基于邻域的算法中,Item-based CF应用广泛,其主要思想为“喜欢item A的用户大都喜欢用户 item B”,通过挖掘用户历史的操作日志,利用群体智慧,生成item的候选推荐列表。
原理是通过将用户和其他用户的数据进行比对来实现推荐的。比对的具体方法就是通过计算两个用户数据之间的相似性,通过相似性的计算来说明两个用户数据之间的相似程度。相似度函数的设计必须满足度量空间的三点要求,即非负性,对称性和三角不等性。常用的相似度的计算方法有:欧式距离法、皮尔逊相关系数法和夹角余弦相似度法。
User-based的基本思想是如果用户A喜欢物品a,用户B喜欢物品a、b、c,用户C喜欢a和c,那么认为用户A与用户B和C相似,因为他们都喜欢a,而喜欢a的用户同时也喜欢c,所以把c推荐给用户A。该算法用最近邻居(nearest-neighbor)算法找出一个用户的邻居集合,该集合的用户和该用户有相似的喜好,算法根据邻居的偏好对该用户进行预测。
User-based算法存在两个重大问题:1. 数据稀疏性。一个大型的电子商务推荐系统一般有非常多的物品,用户可能买的其中不到1%的物品,不同用户之间买的物品重叠性较低,导致算法无法找到一个用户的邻居,即偏好相似的用户。2. 算法扩展性。最近邻居算法的计算量随着用户和物品数量的增加而增加,不适合数据量大的情况使用。
Iterm-based的基本思想是预先根据所有用户的历史偏好数据计算物品之间的相似性,然后把与用户喜欢的物品相类似的物品推荐给用户。还是以之前的例子为例,可以知道物品a和c非常相似,因为喜欢a的用户同时也喜欢c,而用户A喜欢a,所以把c推荐给用户A。
因为物品直接的相似性相对比较固定,所以可以预先在线下计算好不同物品之间的相似度,把结果存在表中,当推荐时进行查表,计算用户可能的打分值,可以同时解决上面两个问题。
Item-based算法详细过程:
1、相似度计算:Item-based算法首选计算物品之间的相似度,计算相似度的方法有以下几种:
(1). 基于余弦(Cosine-based)的相似度计算,通过计算两个向量之间的夹角余弦值来计算物品之间的相似性
(2). 基于关联(Correlation-based)的相似度计算,计算两个向量之间的Pearson-r关联度
2、\textbf{预测值计算}:加权求和. 用过对用户u已打分的物品的分数进行加权求和,权值为各个物品与物品i的相似度,然后对所有物品相似度的和求平均,计算得到用户u对物品i打分
Slope One
简单高效的协同过滤算法。Slope One 和其它类似算法相比, 它的最大优点在于算法很简单, 易于实现, 执行效率高, 同时推荐的准确性相对很高。
Slope One算法是基于不同物品之间的评分差的线性算法,预测用户对物品评分的个性化算法。主要两步:
Step1:计算物品之间的评分差的均值,记为物品间的评分偏差(两物品同时被评分);
其中,是用户u对物品i的评分,是用户u对物品j的评分,是对物品i评分过的用户,是对物品i和物品j都评分过的用户,是对物品i和物品j都评分过的用户数量。
Step2:根据物品间的评分偏差和用户的历史评分,预测用户对未评分的物品的评分。
其中是用户u购买过的物品,为集合中的元素个数
Step3:将预测评分排序,取top-K对应的物品推荐给用户。
Matrix factorization
评分预测过程通常包括用户项目矩阵,
其中用户users对应于行和项目items到列。矩阵条目指定user对item的评分情况。我们研究的问题是如何通过使用可用的值来准确预测用户项目矩阵中的missing values。
矩阵分解是指一组算法,其中矩阵被分解成两个矩阵的乘积。
当应用矩阵分解来解决我们的研究问题时,前提是影响用户观察到的Web服务失败概率的因素很少,
用户观察到的Web服务失败概率取决于每个因素如何适用于用户,
Web服务上的用户失败概率值对应于这些因素与用户特定系数的线性组合。
为m行n列的user-item矩阵,l-factor模型旨在使W和H矩阵的乘积能够近似P
为m行l列矩阵,为l行n列矩阵,
是因素的个数,矩阵分解产生的W的每一行都是用户特定的用户系数,
矩阵分解产生的H的每一列都是包含Web服务的l因子值的因子向量。
\textbf{Gradient Descent}
P和WH之间差异的最常见量度是总和误差,其可以通过以下公式计算:
在用户项目矩阵 P中(表示Web服务j先前已由用户调用),如果中有值,则等于1,否则等于0,
是矩阵W的第i行(代表特定的用户i的系数),而是矩阵H的第j列(表示itemj的向量因子)。
使用梯度下降法近似矩阵P。目标函数是:
正则化项可以这样求:
采用误差反向传播的方法迭代更新矩阵W和H。
是学习率,直到更新到收敛为止。
Key Code on Spark
Item-based Collaboration Filtering Spark实现
Spark采用Scala语言编写,建立在统一抽象的RDD(分布式内存抽象)之上,使得它可以以基本一致的方式应对不同的大数据处理场景。Spark提供广泛的数据集操作类型(20+种),不像Hadoop只提供了Map和Reduce两种操作。Spark可以与Hadoop无缝连接,使用YARN作为他的集群管理器。
本次实验灵活spark操作算子,在设计推荐算法时,我的宗旨时坚决不使用for-loop,while-loop,并且借鉴数据库select的思想完成本次实验,Spark提交任务至集群运行,使得实验中提供的较大规模的数据集也能高效的跑完。
\subsection{Item-based Collaboration Filtering}
数据预处理阶段:配置spark,设置为集群模式,导入数据源(训练集和测试集),并对数据进行分割操作,其中movielens数据集以"::"分割,并且我只需要提取user,item和preference,不需要timestamp,所以在预处理时我直接过滤了时间戳的数据,并设置userid,itemid的数据为Long,设置评分preference为Double,至此TrainSetRDD和TestSetRDD的RDD构成都为
(user,item,preference),实现如下:
val conf = new SparkConf().setAppName("IBCF")
val sc = new SparkContext(conf)
val datasource1 = "file:///usr/local/data/test.txt"
val Mini2TestFile = sc.textFile(datasource1,3)
val datasource2 = "file:///usr/local/data/train.txt"
val Mini2TrainFile = sc.textFile(datasource2,3)
val TrainSetRDD = Mini2TrainFile.map(line => {
val fields = line.split("::")
(fields(0).toLong, fields(1).toLong, fields(2).toDouble)
})
val TestSetRDD = Mini2TestFile.map(line => {
val fields = line.split("::")
(fields(0).toLong, fields(1).toLong, fields(2).toDouble)
})
\end{lstlisting}
对训练集做处理,按照用户id分组,并且组内按照用户评分的大小进行排序,实现如下:
\begin{lstlisting}
var ratings = TrainSetRDD.groupBy(k=>k._1).flatMap(x=>(x._2.toList.sortWith((x,y)=>x._3>y._3)))
首先,以item为key,将训练集以item进行分组。item2manyUser中包含该item对应曾经购买过这个item的userid和对该item的评分信息的集合
然后,以item为key,统计购买过该item
的用户数量。numRatersPerItem中包含itemid和购买过该item的user数量。
最后,对item2manyUser和numRatersPerItem
进行连接操作,目的是为了得到这样结构的RDD:
(user,item,preference,size)其中size对应的是对item评过分的user数量,计算这个值是为了之后求相似度做准备。实现如下:
//(item,((user,item,prefs),(user,item,prefs),...))
val item2manyUser = ratings.groupBy(tup => tup._2)
//(item,ratingsize)
val numRatersPerItem = item2manyUser.map(grouped => (grouped._1, grouped._2.size))
//(user,item,prefs,ratingsize)
val ratingsWithSize = item2manyUser.join(numRatersPerItem).flatMap(
joined => {
joined._2._1.map(f => (f._1, f._2, f._3, joined._2._2))
})
因为在IBCF算法中,最重要的是要求得item之间的相似度,
想要得到成对的,
可以借用数据库的表连接的思想,
使ratingsWithSize以userid为key进行自连接操作,效果类似笛卡儿积,我们可以得到任意.
选取的rdd,因为相似度计算满足对称性,所以为了减少计算,我们计算一半的值即可。
ratingPairs这个RDD所蕴含的意义是user历史曾经对和都有进行评分。实现如下:
//(user,(user,item,prefs,size))
val ratings2 = ratingsWithSize.keyBy(tup => tup._1)
//(user,((user,item,prefs,size),(user,item,prefs,size)))
val ratingPairs =ratings2.join(ratings2).filter(f => f._2._1._2 < f._2._2._2)
以余弦相似度为例,
计算两个item相似度时,是使用对这个itemPair都进行过评分的用户评分数据决定的。
我设计的是先计算itemPair相似度公式中的某些中间变量,然后再进行求和操作。
从余弦计算公式中可以观察到,可以计算的中间变量有:分子部分的点乘(dotProduct)和分母部分的平方量。
对ratingPairs进行map操作,可以对每一行数据都进行相同的操作,提取出
作为key,中间变量作为values,得到一个新的RDD。
值得注意的是,ratingPairs的key是userid,而tempVectorCalcs的key是,
从ratingPairs中提取出来,并且没有选取userid的信息,这意味着的key不唯一,因为多个user可以对都进行评分。
// ((item1,item2), (tempValues))
val tempVectorCalcs = ratingPairs.map(data => {
val key = (data._2._1._2, data._2._2._2) //(item1,item2)
val values =
(data._2._1._3 * data._2._2._3, // prefs 1 * prefs 2
data._2._1._3, // item 1 prefs
data._2._2._3, // item 2 prefs
math.pow(data._2._1._3, 2), // square of item 1 prefs
math.pow(data._2._2._3, 2), // square of item 2 prefs
data._2._1._4, // item 1 ratingsize
data._2._2._4) // item 2 ratingsize
(key, values)
})
使用groupByKey 将(item,item)key相同的rdd分组,并且对中间变量进行处理,然后定义余弦相似度的函数,形如dotProduct(A, B)/(norm(A) * norm(B)),最终得到相似度结果
item之间的相似度保存在一个新的RDD中,(itemi,(itemj,Sim(i,j))),至此,Item-basedCF的第一步——计算物品间的相似性已经完成。实现如下:
//((item1,item2), (size, dotProduct, ratingSum, rating2Sum, ratingSq, rating2Sq, numRaters, numRaters2))
val vectorCalcs = tempVectorCalcs.groupByKey().map(data => {
val key = data._1 //(item1,item2)
val vals = data._2 //stats
val size = vals.size // the number of users rating(item1,item2)
val dotProduct = vals.map(f => f._1).sum // sum of prefs 1 * prefs 2 = dotProduct
val ratingSum = vals.map(f => f._2).sum // sum of prefs 1
val rating2Sum = vals.map(f => f._3).sum // sum of prefs 2
val ratingSq = vals.map(f => f._4).sum // sum of square prefs 1
val rating2Sq = vals.map(f => f._5).sum // sum of square prefs 2
val numRaters = vals.map(f => f._6).max
val numRaters2 = vals.map(f => f._7).max
(key, (size, dotProduct, ratingSum, rating2Sum, ratingSq, rating2Sq, numRaters, numRaters2))
})
//(itemi,(itemj,Sim(i,j)))
val tempSimilarities = vectorCalcsTotal.map(fields => {
val key = fields._1
val (size, dotProduct, ratingSum, rating2Sum, ratingNormSq, rating2NormSq, numRaters, numRaters2) = fields._2
val cosSim = cosineSimilarity(dotProduct, scala.math.sqrt(ratingNormSq), scala.math.sqrt(rating2NormSq))*size/(numRaters*math.log10(numRaters2+10))
(key._1,(key._2, cosSim))
})
val similarities = tempSimilarities.groupByKey().flatMap(x => { x._2.map(temp => (x._1,(temp._1,temp._2))).toList.sortWith((a,b)=>a._2._2>b._2._2).take(50)
})
其中cosineSimilarity可以定义函数进行计算,这样也比较方便替换不同相似性度量的方法:
// *************************
// * SIMILARITY MEASURES
// *************************
// [n * dotProduct(A, B) - sum(A) * sum(B)] / sqrt{ [n * norm(A)^2 - sum(A)^2] [n * norm(B)^2 - sum(B)^2] }
def correlation(size : Double, dotProduct : Double, ratingSum : Double, rating2Sum : Double, ratingNormSq : Double, rating2NormSq : Double) = {
val numerator = size * dotProduct - ratingSum * rating2Sum
val denominator = scala.math.sqrt(size * ratingNormSq - ratingSum * ratingSum) * scala.math.sqrt(size * rating2NormSq - rating2Sum * rating2Sum)
numerator / denominator
}
//The cosine similarity between two vectors A, B is
//dotProduct(A, B) / (norm(A) * norm(B))
def cosineSimilarity(dotProduct : Double, ratingNorm : Double, rating2Norm : Double) = {
dotProduct / (ratingNorm * rating2Norm)
}
//The Jaccard Similarity between two sets A, B is
//|Intersection(A, B)| / |Union(A, B)|
def jaccardSimilarity(usersInCommon : Double, totalUsers1 : Double, totalUsers2 : Double) = {
val union = totalUsers1 + totalUsers2 - usersInCommon
usersInCommon / union
}
接下来是预测评分的部分,根据预测公式
需要预测的是用户u对商品i的评分(itemi用户没有购买过),也可以先得到某些中间变量如:sim(i,j)*ratings然后求和再相除。
//(item,(user,prefs))
val ratingsInverse = ratings.map(x => (x._2,(x._1,x._3)))
//join : (item1,(user,prefs)) <- (item1,(item2,sim)) ==>> ((user,item2),(sim,sim*prefs))
val statistics = ratingsInverse.join(similarities).map( x=> ((x._2._1._1,x._2._2._1),(x._2._2._2,x._2._1._2*x._2._2._2)))
val predictResult = statistics.reduceByKey((x,y) => ((x._1+y._1),(x._2+y._2))).map(x=>(x._1,x._2._2/x._2._1))
val filterItem = TrainSetRDD.map(x=>((x._1,x._2),Double.NaN))
val totalScore = predictResult ++ filterItem
val finalResult = totalScore.reduceByKey(_+_).filter(x=> !(x._2 equals(Double.NaN))).map( x =>
(x._1._1,x._1._2,x._2)).groupBy(x=>x._1).flatMap(x=>(x._2.toList.sortWith((a,b)=>a._3>b._3)))
最后计算测试集中真实评分和预测评分的RMSE,评估算法的效果。
val joinTestSetRDD = TestSetRDD.map(x => ((x._1,x._2),(x._3)))
val joinFinalResult = finalResult.map(x => ((x._1,x._2),(x._3)))
val measure = joinTestSetRDD.join(joinFinalResult)
val rmseSize = measure.count()
val rmse = measure.map(x => x._2._1-x._2._2).map(y => y*y).reduce(_+_)/rmseSize
Slope One Spark实现
数据预处理部分和IBCF一样,本部分省略。
Step1-1:计算物品之间的评分差的均值,记为物品间的评分偏差(两物品同时被评分);
其中,是用户u对物品i的评分,是用户u对物品j的评分,是对物品i评分过的用户,是对物品i和物品j都评分过的用户,是对物品i和物品j都评分过的用户数量。
也是利用自连接的思想,找出所有的itemPairs,和它们之间的评分差(prefsi-prefsj)。
因为一对商品(itemi,itemj)可以被多个用户进行评分,所以这就提供了计算物品间平均偏差的机会。当以(itemi,itemj)进行Group操作时,
同时购买过(itemi,itemj)的用户会被聚合在同一行数据中,也就是。
DevOfItemsPairs-all就是(itemi,itemj)的偏差结果,rdd形如 ((itemi,itemj),dev(i,j)))
var ratings = TrainSetRDD.groupBy(k=>k._1).flatMap(x=>(x._2.toList.sortWith((x,y)=>x._3>y._3)))
val ratings2 = ratings.keyBy(tup => tup._1)
// itemi<itemj
val ratingPairs =ratings2.join(ratings2).filter(f => f._2._1._2 < f._2._2._2)
//(usertest,itemj)
val TestUser = TestSetRDD.map(line => (line._1,line._2))
//Pairs(itemi,itemj)
val ItemsPairs = ratingPairs.map(data => {
val key = (data._2._1._2, data._2._2._2) //(item1,item2)
val stats = (data._2._1._3-data._2._2._3) //(prefs1-prefs2)
(key, stats)
})
//((item1,item2),dev))
val DevOfItemsPairs = ItemsPairs.groupByKey().map(data => {
val key = data._1
val values = data._2
val Card = values.size
val Ruj_Rui = values.sum
var R_Card = Ruj_Rui/Card
(key,R_Card)
})
val DevOfItemsPairs_inverse = DevOfItemsPairs.map(x => ((x._1._2,x._1._1),x._2))
//((item1,item2),dev))
val DevOfItemsPairs_all = DevOfItemsPairs ++ DevOfItemsPairs_inverse
//(user,Sertof(Items))
val GroupByUser_ItemSets = TrainSetRDD.map(line => (line._1,line._2)).groupByKey()
// (usertest,itemj) join (user,Sertof(Items)) => (user,(itemj,Setof(Items))) => ((itemj,items),user)
val TestUser_ItemSets = TestUser.join(GroupByUser_ItemSets).flatMap(data => data._2._2.map(x => ((data._2._1,x),data._1)))
//((itemj,item2)(usertest,dev_j2)) => ((usertest,itemj),(dev_j2)) =groupByKey=> Sum/Card
val DevOfTestItemPairs = TestUser_ItemSets.join(DevOfItemsPairs_all).map(x => ((x._2._1,x._1._1),x._2._2)).groupByKey().map(data => {
val key = data._1
val values = data._2
val Size = values.size
val Sum = values.sum
val avg_dev = Sum/Size
(key,avg_dev)
})
Step1-2:计算用户历史评分的均值
以user为key进行聚合操作,得到的集合是用户对所有购买过的item的评分情况。对该评分集合求均值即是用户的历史评分
//(user,Setof(prefs))
val RatingPrefs = ratings.map(data => (data._1,data._3)).groupByKey()
//(usertest,itemj)
val TestUser = TestSetRDD.map(line => (line._1,line._2))
//after join:(user,(item,Setof(prefs)))
val JoinedByUser = TestUser.join(RatingPrefs).map(data => {
val key = (data._1,data._2._1) //(user,item)
val value = data._2._2 //Setof(prefs)
(key,value)
})
//key = (user,item) value = the average rating of user
val RatingByUser = JoinedByUser.map(data => {
val key = data._1
val values = data._2
val Card = values.size
val SumRatings = values.sum
val avg_user_rating = SumRatings/Card
(key,avg_user_rating)
})
Step2:根据物品间的评分偏差和用户的历史评分,预测用户对未评分的物品的评分。
其中是用户u购买过的物品,为集合中的元素个数
将Step1里的物品间的评分偏差(DevOfTestItemPairs)和用户的历史评分(RatingByUser)结合进行评分的预测并评估算法。
val predict = DevOfTestItemPairs.join(RatingByUser).map(data => (data._1,data._2._1 + data._2._2))
val joinTestSetRDD = TestSetRDD.map(data => ((data._1,data._2),data._3))
val measure = predict.join(joinTestSetRDD)
val rmseSize = measure.count()
val rmse = measure.map(x => x._2._1-x._2._2).map(y => y*y).reduce(_+_)/rmseSize
MF - Matlab实现
首先矩阵分解的目的是为了填补缺失值,然后应用在推荐系统中的作用便是填补用户对未购买过的商品进行评分值,从而根据预测值向用户推荐item。
数据预处理与前两个算法不同,需要构造user-item评分矩阵,并且原来有评分的位置保持不变,没有评分的位置置零。在计算LossFunction时,只需要比较原矩阵有值的rmse,原矩阵没有值的地方不需要计算误差。
function [rating,prerating]=MF(data,K,alpha)
[rating,num]=translate_line_to_matrix(data);
[m,n]=size(rating);
u=rand(m,K);
v=rand(K,n);
e=zeros(m,n);
distance=100000000;
while(1)
for i=1:m
for j=1:n
if(rating(i,j)>0)
error=0;
for k=1:K
error=error+u(i,k)*v(k,j);
end
e(i,j)=rating(i,j)-error;
for k=1:K
u(i,k)=u(i,k)+2*alpha*e(i,j)*v(k,j);
v(k,j)=v(k,j)+2*alpha*e(i,j)*u(i,k);
end
end
end
end
MFLossFunc=sum(sum(e));
if(distance-MFLossFunc<0.0000000001)
break;
else
distance=MFLossFunc;
end
end
prerating=u*v;
end
Algorithm Comparison and Hyperparameter
Item-based算法的预测结果比User-based算法的质量要高一点。由于Item-based算法可以预先计算好物品的相似度,所以在线的预测性能要比User-based算法的高。
基于用户之间评级相似性的早期协作过滤系统(称user-user协同过滤)存在以下几个问题:(1)当他们有很多项目但收视率相对较低时,系统表现不佳
(2)计算所有用户对之间的相似性非常昂贵
(3)用户配置文件变化很快,整个系统模型必须重新计算
item-based解决了用户数量多于项目的系统中的这些问题。 item-item模型使用每个项目的评级分布,而不是每个用户。 如果用户数量多于商品数量,则每个商品的评分往往比每位用户都多,因此商品的平均评分通常不会很快发生变化。 这导致模型中的评级分布更加稳定,所以模型不需要经常重建。 当用户使用并评估某个项目时,该项目的相似项目将从现有系统模型中挑选出来并添加到用户的个性化推荐中。
Slope One是项目协同过滤算法家族中的一员,旨在减少模型过度拟合问题。 可以说,它是基于评分的非平凡基于项目的协同过滤(non-trivial item-based collaborative filtering based on ratings)的最简单形式。 它们的简单性使其特别易于有效地实现它们,而其精度通常与更复杂且计算量更大的算法相当, 它也被用作改进其他算法的构建块。
SlopeOne易于实现和维护,可以轻松解释所有的聚合数据,并且算法易于实现和测试。具有实时性, 运行时可更新,新增一个评分项,应该对预测结果即时产生影响。有高效率的查询响应,快速的执行查询,可能需要付出更多的空间占用作为代价,对初次访问者要求少:对于一个评分项目很少的用户,也应该可以获得有效的推荐。与最准确的方法相比,SlopeOne应该是有竞争力的,不仅算法简单高效,效果也不赖。
矩阵分解在推荐系统中的应用我觉得非常神奇。隐语义模型(Latent Factor Model,LFM)在推荐系统中的应用越来越广泛,矩阵分解方法也是基于这个隐语义模型。设置K为因子factor的数量。矩阵分解的思想简单来说就是每一个用户和每一个物品都会有自己的一些特性
(feature/factor),用矩阵分解的方法可以从评分矩阵中分解出user-factor,factor-item矩阵,这样做的好处是得到了用户的偏好和每件物品的特性。用户对电影来举例子就是:每个用户看电影的时候都有偏好,这些偏好可以直观理解成:恐怖,喜剧,动作,爱情等。用户——特性矩阵表示的就是用户对这些因素的喜欢程度。同样,每一部电影也可以用这些因素描述,因此特性——物品矩阵表示的就是每一部电影这些因素的含量,也就是电影的类型。这样子两个矩阵相乘就会得到用户对这个电影的喜欢程度。