推荐系统之SparkML实现协同过滤ALS
1.Spark MLlib概述
1.是什么:spark MLLib 是基于Spark 引擎实现的机器学习算法库,spark开发应用都是基于RDD的,而MLLib是基于底层RDD实现的上层更高级的API
2.特点:
扩展性和容错性,扩展性完全是基于Spark底层结构以及Spark运行环境来实现的
扩展性:可以扩展的机器学习库,每新发布一个版本都会新增一些机器学习算法
3.算法库:它有一系列的机器学习算法和实用程序组成,目前涵盖了常见的分类、回归、聚类、协同过滤、关联规则等等算法,具体如下:
• 存储:保存和加载算法、模型及管道
• 基本统计:概括统计、相关性、分层取样、假设检验、随机数生成
• 回归包括:线性回归、逻辑回归、岭回归、保序回归。
• 分类算法:主要包括有贝叶斯分类、线性二元SVM分类、逻辑回归分类、决策树、随机森林、梯度增强树
• 协同过滤: (ALS) (交替最小二乘法(ALS) )
• 降维:(SVD) 奇异值分解、 (PCA) 主成分分析
• 聚类:K-means聚类、LDA主题模型
• 关联规则:FP-Growth。
• 特征工程:特征提取、特征转换、特征选择以及降维。
• 管道:构造、评估和调整的管道的工具。
• 存储:保存和加载算法、模型及管道
• 实用工具:线性代数,统计,数据处理等。
• 优化部分:随机梯度下降、(L-BFGS) 短时记忆的BFGS (拟牛顿法中的一种,解决非线性问题)
在这里,我们只讲推荐应用的ALS协同过滤及其实现。
2.MLlib协同过滤算法:ALS
2.1算法原理
详细讲解:
1.什么是ALS:ALS直接翻译过来就是交替最小二乘,目光更多关注在交替这两个字上
2.回顾协同过滤:有基于user-base和item-base的。现在我们希望把这两个角度同时进行考虑,相当于得到的CF不仅可以学到一些用户背后一些隐含的特征,它也可以从物品的角度学习它的隐含特征,所以呢,把这个算法也叫作混合CF,主要是考虑把User和Item两个方向同时考虑进去了。
我们平常见到的UI矩阵如下图:
这个矩阵有很多空格,是个稀疏矩阵,假设这是一个M*N的矩阵,即左边是M个User,上边是N个Item的打分矩阵。我们知道,这个矩阵的尺寸是非常大的,因为对于一个中企来说动不动用户量就是上亿了。这里有个好奇的问题,中国大概才有13亿人口,为什么每家企业都会有上亿的用户?因为可能一个人会有多个账号,有可能手机一个账号,PC一个账号,有可能手机有多个微信,有可能不需要注册账号,每个浏览器背后都有一个账号,所以一个人可能对应着多个不同的账户。
我们面对这么大的数据量呢,其实我们可以把它进行分解,分解成两个部分来组成,即两个小矩阵,一个是UXK的这么一个矩阵,还有一个是KXI这么个矩阵。这两个矩阵中,第一个矩阵的User维度不变,第二个矩阵的Item维度不变,而K可能就比较小,从几维,到几百维,甚至几千维这个尺寸就可以了,甚至再不济给几万维,对于User和Item来说也是小很多的。总之呢,是把UI这个矩阵拆分成UK和KI两个小矩阵相乘的方式。分解之后如下图:
有了两个小矩阵,对于我们的计算来说,就得到了非常的便利和快速。这里,我们不管去看上边这个矩阵,还是下边这个矩阵,都可以方便地表达每个用户和物品。比如说对于左边的UK矩阵,随便给你个User,你都能在这个矩阵上映射到某一行,而且这一行呢,是由K个维度的向量来组成的,有K个特征,而且每个特征向量都是稠密,不像上边的矩阵向量是稀疏的,即每一个User,都可以用K个向量来表示。右边矩阵同理,我们可以对一个物品通过一个K维的向量来表示。
假设说,如果user可以用一个向量来表示(当然它是稠密的),Item也可以用一个向量来表示(当然它也是稠密的),分别为Vector A和Vector B。那么这个user对这个item的打分应该如何计算?应该是AXB,相当于这两个向量做一个内积。每个维度相乘之后,应该把结果再相加。即sum(AXB)=AB
目标
我们得到了UK和KI两个矩阵,那么其实我们可以用两个小矩阵来还原最原始的UI矩阵,但是我们能完美的100%地还原吗?理论上是可以的,实际上有点难度。实际上我们得到这两个矩阵到底长什么样子,只要是维度定义好就可以,然后这两个矩阵相乘得到一个UI矩阵,比如我们真实的打分矩阵R,R'是预测得到一个矩阵,如果R和R'很相似的话,那么我们是不是可以把R'基本的替代R这个矩阵呢。所以我们的目标是尽可能让两个小矩阵相乘,变成一个大矩阵,这个大矩阵和真实的矩阵很相近就可以了。
界定方式
那么相似怎么来界定呢?这里就用误差来表示,两个矩阵的误差越小,代表他们之间越相似,相反越不相似。这个误差叫均方根误差(RMSE),RMSE相当于是两个矩阵,每一个cell之间相减的差别,得到一个距离,然后求平方开根号就可以了。
目标求解
我们的目标有了,界定方式也有了,接下来就是求A(UK)和B(KI)的两个小矩阵了。
如果我们有了A和B,我们可以做什么呢?
我们可以做物品推荐,就用到B矩阵,item-item=IKKI,同理也可以做用户推荐,就用到A矩阵,user-user=UKKU
接下来我们就看如何得到A和B,我们知道K值是远小于M和N的,这样才能达到降维的目的。每一个User都有K个特征,每个特征代表什么含义呢,这个你不需要关心,你只需要认为我这个User是可以用这个向量来表示就可以了,每一个物品也可以用一个向量来表示就可以了。至于它的维度已经是个隐含因子,无法解释了。从两个小矩阵,我们得到一个式子:
大写的R代表评分矩阵
目标,真实矩阵和结果矩阵之间尽可能逼近
将最小化平方误差作为损失函数:
表示某一个用户对物品的一个打分具体单元值,真实值减预测值,我们希望目标越小越好,另外为了让我们的目标进一步稳定,我们可以引入一些正则项,后面加了一个L2正则:
最终,得到结果两个矩阵相乘的方式:
得到矩阵和
为了使上式的损失最小,我们对损失函数求解最优值,求导=0,这里面有两个未知数:每一个x和y
对求导,得到式子如下:
另导数=0,得到式子1如下:
对求导,得到式子2如下:
从一开始就讲到ALS是交替最小二乘法,重点在于交替,在这里,有两个未知数,先固定x值,然后对y做下降,等它下降到一定程度后,再让y固定,让x下降。一直循环下去,直到满足收敛条件(损失最小的条件RMSE)。
这里的x其实是代表user的向量,y代表item的向量,一开始我们只知道他们是两个向量,不知道它的值是什么,所以一开始我们给它随机初始化一下,随机给它一个向量,即求解步骤如下:
- 随机生成X、Y(初始化矩阵)
- 固定Y,更新X(公式1)
- 固定X,更新Y(公式2)
- 第2、3步循环,直到满足收敛条件(RMSE)
2.2算法实现
1)基于ALS(alternating least squares)的协同过滤算法,涉及参数如下:
• numBlocks: 计算并行度(若为-1表示自动化配置)
• Rank:模型中隐含影响因子,默认是10
• Iterations:迭代次数,默认是10
• Lambda:ALS中正则化参数
• implicitPrefs:是否使用显式反馈变量或使用隐式反馈数据的变量
• Alpha:ALS中的一个参数,作用于隐式反馈变量,控制基本的信心度
2)构建流如下:
- 加载数据集
- 将数据集解析成ALS要求的格式
- 将数据集分割成两部分:训练集和测试集
- 运行ALS,产生并评估模型
- 将最终模型用于推荐
3)采用数据集:使用MovieLens下的ml-1m电影数据集
格式:
实现代码:
1.导入库
import sqlContext.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.mllib.recommendation.{ALS,MatrixFactorizationModel,Rating}
2.定义类及转化rdd格式方法
case class Movie(movieId:Int, title:String, genres:Seq[String])
case class User(userId:Int, gender:String, age:Int, occupation:Int, zip:String)
//Define parse function
def parseMovie(str: String): Movie = {
val fields=str.split("::")
assert(fields.size==3)
Movie(fields(0).toInt, fields(1).toString, Seq(fields(2)))
}
def parseUser(str: String): User = {
val fields=str.split("::")
assert(fields.size==5)
User(fields(0).toInt, fields(1).toString, fields(2).toInt, fields(3).toInt, fields(4).toString)
}
def parseRating(str: String): Rating = {
val fields=str.split("::")
assert(fields.size==4)
Rating(fields(0).toInt, fields(1).toInt, fields(2).toInt)
}
3.Ratings 数据简单查看
val ratingText=sc.textFile("file:/root/data/ratings.dat")
ratingText.first()
val ratingRDD=ratingText.map(parseRating).cache()
println("Total number of ratings: "+ratingRDD.count())
println("Total number of movies rated: "+ratingRDD.map(_.product).distinct().count())
println("Total number of users who rated movies: "+ratingRDD.map(_.user).distinct().count())
打印结果如下图:
4.创建 DataFrames,便于使用sql语句做简单分析
val ratingDF=ratingRDD.toDF();
val movieDF=sc.textFile("file:/root/data/movies.dat").map(parseMovie).toDF()
val userDF=sc.textFile("file:/root/data/users.dat").map(parseUser).toDF()
ratingDF.printSchema()
movieDF.printSchema()
userDF.printSchema()
ratingDF.registerTempTable("ratings")
movieDF.registerTempTable("movies")
userDF.registerTempTable("users")
5.利用sql语句,从rating数据集中找出评价次数最多和最少的物品情况
val result=sqlContext.sql("""select title,rmax,rmin,ucnt
from
(select product, max(rating) as rmax, min(rating) as rmin, count(distinct user) as ucnt
from ratings
group by product) ratingsCNT
join movies on product=movieId
order by ucnt desc""")
result.show()
输出结果如下:
6.利用sql查出前10个用户评价物品的次数
val mostActiveUser=sqlContext.sql("""select user, count(*) as cnt
from ratings group by user order by cnt desc limit 10""")
mostActiveUser.show()
输出结果如下:
7.查询user=4169的用户,他所有评分里大于4分的评分物品的标题和评分多少
val result=sqlContext.sql("""select distinct title, rating
from ratings join movies on movieId=product
where user=4169 and rating>4""")
result.show()
输出结果如下:
8.ALS模型构建
val splits=ratingRDD.randomSplit(Array(0.8,0.2), 0L)
val trainingSet=splits(0).cache()
val testSet=splits(1).cache()
trainingSet.count()//结果见下方
testSet.count()//结果见下方
val model=(new ALS().setRank(20).setIterations(10).run(trainingSet))
//简单测试模型
val recomForTopUser=model.recommendProducts(4169,5)
//打印出简单测试的物品标题
val movieTitle=movieDF.map(array=>(array(0),array(1))).collectAsMap();
val recomResult=recomForTopUser.map(rating=>(movieTitle(rating.product),rating.rating)).foreach(println)
trainingSet.count()结果:799809
testSet.count()结果:200400
recomResult结果如下:
9.测试数据准备
val testUserProduct=testSet.map{
case Rating(user,product,rating) => (user,product)
}
10.测试数据代入模型进行预测
val testUserProductPredict=model.predict(testUserProduct)
testUserProductPredict.take(10).mkString("\n")
测试结果如下:
11.使用公式原理,计算平均绝对误差
val testSetPair=testSet.map{
case Rating(user,product,rating) => ((user,product),rating)
}
val predictionsPair=testUserProductPredict.map{
case Rating(user,product,rating) => ((user,product),rating)
}
val joinTestPredict=testSetPair.join(predictionsPair)
val mae=joinTestPredict.map{
case ((user,product),(ratingT,ratingP)) =>
val err=ratingT-ratingP
Math.abs(err)
}.mean()
结果如下:
12.使用公式原理,负样本量(实际为假,预测为真),便于预估准确率
//FP,ratingT<=1, ratingP>=4
val fp=joinTestPredict.filter{
case ((user,product),(ratingT,ratingP)) =>
(ratingT <=1 & ratingP >=4)
}
fp.count()
运行结果:
res17: Long = 550
在测试集为20多万的数据下,只有550个用户预测错误,模型准确率还是相对较高。
12.使用MLlib自带库,计算均方根误差
import org.apache.spark.mllib.evaluation._
val ratingTP=joinTestPredict.map{
case ((user,product),(ratingT,ratingP))=>
(ratingP,ratingT)
}
val evalutor=new RegressionMetrics(ratingTP)
evalutor.meanAbsoluteError
evalutor.rootMeanSquaredError
运行结果如下:
从结果可以看出,平均绝对误差在0.72左右,而均方根有0.94,因为其原理,会比平均绝对误差差距会更大一些。