使用Spark创建搜索引擎
1、背景介绍
1.1 项目背景
假设有一个在线电影网站,会员可以付费在线观赏电影,公司希望能偶运用大数据分析推荐引擎,增加会员观看影片的次数以增加营业收入,主要有以下两个目的:
针对用户推荐感兴趣的电影:可以针对每一个会员,定期发送短信或者E-mail或会员登录时,推荐给他/她可能会感兴趣的电影。
针对电影推荐给感兴趣的用户:当想要促销某些电影时,也可以找出可能会对这些电影感兴趣的会员。
2、数据背景
我们使用https://grouplens.org/datasets/movielens/网站提供的电影评分数据,如下图所示,我们选择ml-100k数据。
2、ALS算法介绍
该算法介绍可以参考博客:http://blog.csdn.net/oucpowerman/article/details/49847979,这里就不再详细介绍。
3、数据查看
我们使用spark2.1和scala2.11版本来实现我们的推荐系统,我们使用spark-shell命令来进入我们Spark的交互环境,创建一个SparkSession:
接下来,我们导入我们的评分数据,并查看数据的一些特征:
可以看懂,我们的数据一共分为4列,分别为用户id,电影id,评分以及日期时间,我们可以看一下每个字段的统计特征:
4、使用ALS训练模型
我们查看了原始的训练数据,原始的数据有四列,但是我们真正需要的只是其中的前三列,所以这里需要进行一个转换,要想使用ALS进行模型的训练,我们需要进一步将数据转换为RDD[Rating]类型,最后,我们可以进行训练,得到我们的模型。
引入相关类库
提取数据的前三列
我们这里使用一个map的方法,原始数据的一行是一个字符串,我们需要使用split进行分割,分割之后我们提取前三个字段即可:
可以看到,这里我们使用了匿名函数和匿名参数的方式,_就代表我们传入的每一行数据。
转换为RDD[Rating]
接下来,我们仍然使用map方法,将RDD中的每一行进行一个转换:
模型训练
接下来我们就可以使用ALS.train方法进行模型的训练了,返回的模型的类型是org.apache.spark.mllib.recommendation.MatrixFactorizationModel
train方法接收的参数如下表:
参数 | 说明 |
---|---|
Ratings:RDD[Rating] | 输入的训练数据,格式为:Rating(UserID,productID,rating)的RDD |
rank:Int | rank指当进行矩阵分解时,将原本矩阵A(mn)分解为X(mrank) 和 Y(rank*n)中的rank值 |
Iterations:Int | ALS算法迭代的次数 |
lambda:Double | 建议值0.01 |
针对用户推荐电影
我们已经完成了数据训练,接下来我们就可以对用户推荐电影,比如我们要对id为196的用户推荐5部电影:
可见,该方法返回的是一个数组,我们进一步使用foreach方法打印每一行,返回的一条是Rating类型的,第一个代表用户id,第二个代表电影id,第三个代表推荐评分,评分最高说明越值得推荐。
查看针对用户推荐产品的评分
我们可以查询系统对用户推荐产品的评分,使用predict方法:
针对电影推荐给用户
当我们想要促销某些电影时,可以找出可能会对这些电影感兴趣的会员,我们可以使用model.recommendUsers方法针对某一部电影推荐相关的会员:
可以看到,返回数据跟给用户推荐电影的格式是相同的。
将电影名称和id进行绑定
首先我们读取数据:
读取后的每一条数据仍然是一个字符串,我们需要使用split进行分割,由于|是scala正则表达式中的一个特殊字符,所以我们需要进行转义,最后使用collectAsMap方法,将RDD转换成一个类似Map的数据,这样我们就可以通过电影id来查找电影的名字。
最后,我们只需要做一个简单的map运算,用电影的名称替换电影的id即可。
5、Demo完整代码
package mllib.ALSrecommend
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession
object RecommendSystem {
def main(args:Array[String]) = {
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
// .enableHiveSupport()
//.config("spark.some.config.option", "some-value")
.getOrCreate()
val rawUserData = spark.sparkContext.textFile("data/ml-100k/u.data")
println(rawUserData)
println(rawUserData.first())
rawUserData.take(5).foreach(println)
println(rawUserData.map(_.split("\t")(0).toDouble).stats())
println(rawUserData.map(_.split("\t")(1).toDouble).stats())
println(rawUserData.map(_.split("\t")(2).toDouble).stats())
println(rawUserData.map(_.split("\t")(3).toDouble).stats())
val rawRatings = rawUserData.map(_.split("\t").take(3))
val ratingsRDD = rawRatings.map{case Array(user,movie,rating)=>Rating(user.toInt,movie.toInt,rating.toDouble)}
val model = ALS.train(ratingsRDD,10,10,0.01)
println(model)
model.recommendProducts(196,5).foreach(println)
println(model.predict(196,1643))
model.recommendUsers(464,5).foreach(println)
val itemRDD = spark.sparkContext.textFile("data/ml-100k/u.item")
val movieTitle = itemRDD.map(line=>line.split("\\|").take(2)).map(array=>(array(0).toInt,array(1))).collectAsMap()
movieTitle.take(5).foreach(println)
println(movieTitle(146))
model.recommendProducts(196,5).map(ratings=>(ratings.product,movieTitle(ratings.product),ratings.rating)).foreach(println)
}
}
6、创建Recommend项目
上面几部分从整体上介绍了如何通过Spark构造一个推荐引擎,但复用性不是很好,所以,我们将上面的代码进行整理,构造一个推荐系统的类,类中几个重要的函数如下:
读取数据
读取数据函数我们返回两部分,首先是RDD[Rating]类型的训练数据,其次是Map类型的电影id和电影名称的对应数据表。
def PrepareData():(RDD[Rating],Map[Int,String])={
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
// .enableHiveSupport()
//.config("spark.some.config.option", "some-value")
.getOrCreate()
print("开始读取用户评分数据")
val rawUserData = spark.sparkContext.textFile("data/ml-100k/u.data")
val ratings = rawUserData.map(_.split("\t").take(3))
val ratingsRDD = ratings.map{case Array(user,movie,rating)=>Rating(user.toInt,movie.toInt,rating.toDouble)}
println("共计"+ratingsRDD.count.toString()+"条ratings")
print("开始读取电影数据")
val itemRDD = spark.sparkContext.textFile("data/ml-100k/u.item")
val movieTitle = itemRDD.map(line=>line.split("\\|").take(2)).map(array=>(array(0).toInt,array(1))).collect().toMap
//显示数据记录数
val numRatings = ratingsRDD.count()
val numUsers = ratingsRDD.map(_.user).distinct().count()
val numMovies = ratingsRDD.map(_.product).distinct().count()
println("共计:ratings:"+numRatings+" User:"+numUsers+" Movies:"+numMovies)
return (ratingsRDD,movieTitle)
}
电影推荐和用户推荐
接下来是电影推荐和用户推荐的函数:
def RecommendMovies(model:MatrixFactorizationModel,movieTitle:Map[Int,String],inputUserID:Int):Unit= {
val RecommendMovie = model.recommendProducts(inputUserID,10)
var i = 1
println("针对用户"+inputUserID+" 推荐下列电影:")
RecommendMovie.foreach{r => println(i.toString()+"."+movieTitle(r.product)+" 评分:"+r.rating.toString());i+=1}
}
def RecommendUsers(model:MatrixFactorizationModel,movieTitle:Map[Int,String],inputMovieID:Int):Unit= {
val RecommendUser = model.recommendUsers(inputMovieID,10)
var i = 1
println("针对电影"+inputMovieID+" 推荐下列用户:")
RecommendUser.foreach{r => println(i.toString()+"."+movieTitle(r.user)+" 评分:"+r.rating.toString());i+=1}
}
推荐入口
def recommend(model:MatrixFactorizationModel,movieTitle:Map[Int,String]):Unit={
var choose=""
while(choose!="3"){
print("请选择推荐类型,1:针对用户推荐电影,2:针对电影推荐给感兴趣的用户,3:离开")
choose = scala.io.StdIn.readLine()
if (choose=="1"){
print("请输入用户id?")
val inputUserID = scala.io.StdIn.readLine()
RecommendMovies(model,movieTitle,inputUserID.toInt)
}
else if(choose=="2") {
print("请输入电影的id?")
val inputMovieID = scala.io.StdIn.readLine()
RecommendUsers(model, movieTitle, inputMovieID.toInt)
}
}
}
主函数
def main(args:Array[String]):Unit={
setLogger
val (ratings,movieTitle) = PrepareData()
val model = ALS.train(ratings,5,20,0.1)
recommend(model,movieTitle)
}
完整代码
package mllib.ALSrecommend
import org.apache.spark.mllib.recommendation.{ALS, Rating,MatrixFactorizationModel}
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import scala.io.StdIn
import org.apache.log4j.Logger
import org.apache.log4j.Level
object Recommend {
def setLogger ={
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("com").setLevel(Level.OFF)
System.setProperty("spark.ui.showConsoleProgress","false")
Logger.getRootLogger().setLevel(Level.OFF)
}
def PrepareData():(RDD[Rating],Map[Int,String])={
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
// .enableHiveSupport()
//.config("spark.some.config.option", "some-value")
.getOrCreate()
print("开始读取用户评分数据")
val rawUserData = spark.sparkContext.textFile("data/ml-100k/u.data")
val ratings = rawUserData.map(_.split("\t").take(3))
val ratingsRDD = ratings.map{case Array(user,movie,rating)=>Rating(user.toInt,movie.toInt,rating.toDouble)}
println("共计"+ratingsRDD.count.toString()+"条ratings")
print("开始读取电影数据")
val itemRDD = spark.sparkContext.textFile("data/ml-100k/u.item")
val movieTitle = itemRDD.map(line=>line.split("\\|").take(2)).map(array=>(array(0).toInt,array(1))).collect().toMap
//显示数据记录数
val numRatings = ratingsRDD.count()
val numUsers = ratingsRDD.map(_.user).distinct().count()
val numMovies = ratingsRDD.map(_.product).distinct().count()
println("共计:ratings:"+numRatings+" User:"+numUsers+" Movies:"+numMovies)
return (ratingsRDD,movieTitle)
}
def RecommendMovies(model:MatrixFactorizationModel,movieTitle:Map[Int,String],inputUserID:Int):Unit= {
val RecommendMovie = model.recommendProducts(inputUserID,10)
var i = 1
println("针对用户"+inputUserID+" 推荐下列电影:")
RecommendMovie.foreach{r => println(i.toString()+"."+movieTitle(r.product)+" 评分:"+r.rating.toString());i+=1}
}
def RecommendUsers(model:MatrixFactorizationModel,movieTitle:Map[Int,String],inputMovieID:Int):Unit= {
val RecommendUser = model.recommendUsers(inputMovieID,10)
var i = 1
println("针对电影"+inputMovieID+" 推荐下列用户:")
RecommendUser.foreach{r => println(i.toString()+"."+movieTitle(r.user)+" 评分:"+r.rating.toString());i+=1}
}
def recommend(model:MatrixFactorizationModel,movieTitle:Map[Int,String]):Unit={
var choose=""
while(choose!="3"){
print("请选择推荐类型,1:针对用户推荐电影,2:针对电影推荐给感兴趣的用户,3:离开")
choose = scala.io.StdIn.readLine()
if (choose=="1"){
print("请输入用户id?")
val inputUserID = scala.io.StdIn.readLine()
RecommendMovies(model,movieTitle,inputUserID.toInt)
}
else if(choose=="2") {
print("请输入电影的id?")
val inputMovieID = scala.io.StdIn.readLine()
RecommendUsers(model, movieTitle, inputMovieID.toInt)
}
}
}
def main(args:Array[String]):Unit={
setLogger
val (ratings,movieTitle) = PrepareData()
val model = ALS.train(ratings,5,20,0.1)
recommend(model,movieTitle)
}
}
7、ALS.train方法参数调优
机器学习中很重要的一个部分就是对代码参数进行调优,调优的两个重要衡量指标是代码的运行时间以及运行的结果好坏,前面我们已经介绍了,ALS.train方法主要涉及了三个参数,分别为:rank,iterations和lambda,那么这三个参数分别如何影响代码的性能,以及如何选择三者的组合来达到最好的效率呢,这是本节主要讨论的目的。
准备数据
和上面有所不同的是,这里我们要准备数据,将数据分为训练集,验证集和测试集,返回三个RDD[Rating]类型的数据:
def PrepareData():(RDD[Rating],RDD[Rating],RDD[Rating])={
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
// .enableHiveSupport()
//.config("spark.some.config.option", "some-value")
.getOrCreate()
print("开始读取用户评分数据")
val rawUserData = spark.sparkContext.textFile("data/ml-100k/u.data")
val ratings = rawUserData.map(_.split("\t").take(3))
val ratingsRDD = ratings.map{case Array(user,movie,rating)=>Rating(user.toInt,movie.toInt,rating.toDouble)}
println("共计"+ratingsRDD.count.toString()+"条ratings")
println("开始读取电影数据")
val itemRDD = spark.sparkContext.textFile("data/ml-100k/u.item")
val movieTitle = itemRDD.map(line=>line.split("\\|").take(2)).map(array=>(array(0).toInt,array(1))).collect().toMap
//显示数据记录数
val numRatings = ratingsRDD.count()
val numUsers = ratingsRDD.map(_.user).distinct().count()
val numMovies = ratingsRDD.map(_.product).distinct().count()
println("共计:ratings:"+numRatings+" User:"+numUsers+" Movies:"+numMovies)
println("以随机数的方式将数据分为3个部分并返回")
val Array(trainData,validationData,testData) = ratingsRDD.randomSplit(Array(0.8,0.8,0.1))
println("trainData:"+trainData.count()+",validationData:"+validationData.count()+",testData:"+testData.count())
return (trainData,validationData,testData)
}
训练评估
def trainValidation(trainData:RDD[Rating],validationData:RDD[Rating]):MatrixFactorizationModel={
println("------评估rank参数使用------")
evaluateParameter(trainData,validationData,"rank",Array(5,10,15,20,50,100),Array(10),Array(0.1))
println("------评估numIterations参数使用------")
evaluateParameter(trainData,validationData,"numIterations",Array(10),Array(5,10,15,20,25),Array(0.1))
println("------评估lambda参数使用------")
evaluateParameter(trainData,validationData,"lambda",Array(10),Array(10),Array(0.05,0.1,1,5,10))
println("------所有参数交叉评估找出最好的参数组合------")
val bestModel = evaluateAllParameter(trainData,validationData,Array(5,10,15,20,50,100),Array(5,10,15,20,25),Array(0.05,0.1,1,5,10))
return bestModel
可以看到,训练评估函数中涉及了两个方法,分别用于对单个参数进行测试和三个参数整体进行调优,两个方法分别如下:
def evaluateParameter(trainData:RDD[Rating],validationData:RDD[Rating],evaluateParameter:String,rankArray:Array[Int],numIterationsArray:Array[Int],lambdaArray:Array[Double])={
val dataBarChart = new DefaultCategoryDataset()
val dataLineChart = new DefaultCategoryDataset()
for(rank <- rankArray;numIterations <- numIterationsArray;lambda <- lambdaArray){
val (rmse,time) = trainModel(trainData,validationData,rank,numIterations,lambda)
val parameterData = evaluateParameter match{
case "rank" => rank;
case "numIterations" => numIterations;
case "lambda" => lambda
}
dataBarChart.addValue(rmse,evaluateParameter,parameterData.toString())
dataLineChart.addValue(time,"Time",parameterData.toString())
}
Chart.plotBarLineChart("ALS evaluations " + evaluateParameter,evaluateParameter,"RMSE",0.58,5,"Time",dataBarChart,dataLineChart)
}
def evaluateAllParameter(trainData: RDD[Rating], validationData: RDD[Rating], rankArray: Array[Int], numIterationsArray: Array[Int], lambdaArray: Array[Double]):MatrixFactorizationModel={
val evaluations = for(rank <- rankArray;numIterations <- numIterationsArray;lambda <- lambdaArray) yield {
val (rmse,time) = trainModel(trainData,validationData,rank,numIterations,lambda)
(rank,numIterations,lambda,rmse)
}
val Eval = (evaluations.sortBy(_._4))
val BestEval = Eval(0)
println("最佳model参数:rank:"+BestEval._1+",iterations:"+BestEval._2+"lambda:"+BestEval._3+"结果RMSE="+BestEval._4)
val bestModel = ALS.train(trainData,BestEval._1,BestEval._2,BestEval._3)
(bestModel)
}
其中,训练模型和计算RMSE的函数分别如下:
def computeRmse(model:MatrixFactorizationModel,ratingRDD:RDD[Rating]):Double={
val num = ratingRDD.count()
val predictedRDD = model.predict(ratingRDD.map(line=>(line.user,line.product)))
val kvRatingRDD = ratingRDD.map(r=>((r.user,r.product),r.rating))
val predictedAndRatings = predictedRDD.map(p=>((p.user,p.product),p.rating)).join(kvRatingRDD).values
math.sqrt(predictedAndRatings.map(x=>(x._1-x._2) * (x._1-x._2)).reduce((_+_))/num)
}
def trainModel(trainData:RDD[Rating],validationData:RDD[Rating],rank:Int,numIterations:Int,lambda:Double):(Double,Double) ={
val startTime = new DateTime()
val model = ALS.train(trainData,rank,numIterations,lambda)
val endTime = new DateTime()
val Rmse = computeRmse(model,validationData)
val duration = new Duration(startTime,endTime)
println(f"训练参数:rank:$rank%3d,iterations;$numIterations%.2f;lambda:$lambda%.2f 结果 RMSE=$Rmse%.2f"+"训练需要时间为:"+duration.getMillis+"毫秒")
return (Rmse,duration.getStandardSeconds)
}
画图类
在测试单个参数的性能时,我们引入了画图类,画图类的代码如下所示:
package mllib.ALSrecommend
import org.jfree.chart._
import org.jfree.data.xy._
import org.jfree.data.category.DefaultCategoryDataset
import org.jfree.chart.axis.NumberAxis
import org.jfree.chart.axis._
import java.awt.Color
import org.jfree.chart.renderer.category.LineAndShapeRenderer;
import org.jfree.chart.plot.DatasetRenderingOrder;
import org.jfree.chart.labels.StandardCategoryToolTipGenerator;
import java.awt.BasicStroke
object Chart {
def plotBarLineChart(Title: String, xLabel: String, yBarLabel: String, yBarMin: Double, yBarMax: Double, yLineLabel: String, dataBarChart : DefaultCategoryDataset, dataLineChart: DefaultCategoryDataset): Unit = {
//画出Bar Chart
val chart = ChartFactory
.createBarChart(
"", // Bar Chart 标题
xLabel, // X轴标题
yBarLabel, // Bar Chart 标题 y轴标题l
dataBarChart , // Bar Chart数据
org.jfree.chart.plot.PlotOrientation.VERTICAL,//画图方向垂直
true, // 包含 legend
true, // 显示tooltips
false // 不要URL generator
);
//取得plot
val plot = chart.getCategoryPlot();
plot.setBackgroundPaint(new Color(0xEE, 0xEE, 0xFF));
plot.setDomainAxisLocation(AxisLocation.BOTTOM_OR_RIGHT);
plot.setDataset(1, dataLineChart); plot.mapDatasetToRangeAxis(1, 1)
//画直方图y轴
val vn = plot.getRangeAxis(); vn.setRange(yBarMin, yBarMax); vn.setAutoTickUnitSelection(true)
//画折线图y轴
val axis2 = new NumberAxis(yLineLabel); plot.setRangeAxis(1, axis2);
val renderer2 = new LineAndShapeRenderer()
renderer2.setToolTipGenerator(new StandardCategoryToolTipGenerator());
//设置先画直方图,再画折线图以免折线图被盖掉
plot.setRenderer(1, renderer2);plot.setDatasetRenderingOrder(DatasetRenderingOrder.FORWARD);
//创建画框
val frame = new ChartFrame(Title,chart); frame.setSize(500, 500);
frame.pack(); frame.setVisible(true)
}
}
主函数
def main(args:Array[String]) = {
setLogger
println("========数据准备阶段========")
val (trainData,validationData,testData) = PrepareData()
trainData.persist()
validationData.persist()
testData.persist()
println("========训练验证阶段========")
val bestModel = trainValidation(trainData,validationData)
println("========测试阶段========")
val testRmse = computeRmse(bestModel,testData)
println("使用testData测试bestModel的结果RMSE="+testRmse)
//trainData.unpersist()
// validationData.unpersist()
// testData.unpersist()
}
完整代码
package mllib.ALSrecommend
import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.recommendation.{ALS, Rating,MatrixFactorizationModel}
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.jfree.data.category.DefaultCategoryDataset
import org.joda.time.format._
import org.joda.time._
import org.joda.time.Duration
object AlsEvaluation {
def setLogger ={
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("com").setLevel(Level.OFF)
System.setProperty("spark.ui.showConsoleProgress","false")
Logger.getRootLogger().setLevel(Level.OFF)
}
def PrepareData():(RDD[Rating],RDD[Rating],RDD[Rating])={
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
// .enableHiveSupport()
//.config("spark.some.config.option", "some-value")
.getOrCreate()
print("开始读取用户评分数据")
val rawUserData = spark.sparkContext.textFile("data/ml-100k/u.data")
val ratings = rawUserData.map(_.split("\t").take(3))
val ratingsRDD = ratings.map{case Array(user,movie,rating)=>Rating(user.toInt,movie.toInt,rating.toDouble)}
println("共计"+ratingsRDD.count.toString()+"条ratings")
println("开始读取电影数据")
val itemRDD = spark.sparkContext.textFile("data/ml-100k/u.item")
val movieTitle = itemRDD.map(line=>line.split("\\|").take(2)).map(array=>(array(0).toInt,array(1))).collect().toMap
//显示数据记录数
val numRatings = ratingsRDD.count()
val numUsers = ratingsRDD.map(_.user).distinct().count()
val numMovies = ratingsRDD.map(_.product).distinct().count()
println("共计:ratings:"+numRatings+" User:"+numUsers+" Movies:"+numMovies)
println("以随机数的方式将数据分为3个部分并返回")
val Array(trainData,validationData,testData) = ratingsRDD.randomSplit(Array(0.8,0.8,0.1))
println("trainData:"+trainData.count()+",validationData:"+validationData.count()+",testData:"+testData.count())
return (trainData,validationData,testData)
}
def computeRmse(model:MatrixFactorizationModel,ratingRDD:RDD[Rating]):Double={
val num = ratingRDD.count()
val predictedRDD = model.predict(ratingRDD.map(line=>(line.user,line.product)))
val kvRatingRDD = ratingRDD.map(r=>((r.user,r.product),r.rating))
val predictedAndRatings = predictedRDD.map(p=>((p.user,p.product),p.rating)).join(kvRatingRDD).values
math.sqrt(predictedAndRatings.map(x=>(x._1-x._2) * (x._1-x._2)).reduce((_+_))/num)
}
def trainModel(trainData:RDD[Rating],validationData:RDD[Rating],rank:Int,numIterations:Int,lambda:Double):(Double,Double) ={
val startTime = new DateTime()
val model = ALS.train(trainData,rank,numIterations,lambda)
val endTime = new DateTime()
val Rmse = computeRmse(model,validationData)
val duration = new Duration(startTime,endTime)
println(f"训练参数:rank:$rank%3d,iterations;$numIterations%.2f;lambda:$lambda%.2f 结果 RMSE=$Rmse%.2f"+"训练需要时间为:"+duration.getMillis+"毫秒")
return (Rmse,duration.getStandardSeconds)
}
def evaluateParameter(trainData:RDD[Rating],validationData:RDD[Rating],evaluateParameter:String,rankArray:Array[Int],numIterationsArray:Array[Int],lambdaArray:Array[Double])={
val dataBarChart = new DefaultCategoryDataset()
val dataLineChart = new DefaultCategoryDataset()
for(rank <- rankArray;numIterations <- numIterationsArray;lambda <- lambdaArray){
val (rmse,time) = trainModel(trainData,validationData,rank,numIterations,lambda)
val parameterData = evaluateParameter match{
case "rank" => rank;
case "numIterations" => numIterations;
case "lambda" => lambda
}
dataBarChart.addValue(rmse,evaluateParameter,parameterData.toString())
dataLineChart.addValue(time,"Time",parameterData.toString())
}
Chart.plotBarLineChart("ALS evaluations " + evaluateParameter,evaluateParameter,"RMSE",0.58,5,"Time",dataBarChart,dataLineChart)
}
def evaluateAllParameter(trainData: RDD[Rating], validationData: RDD[Rating], rankArray: Array[Int], numIterationsArray: Array[Int], lambdaArray: Array[Double]):MatrixFactorizationModel={
val evaluations = for(rank <- rankArray;numIterations <- numIterationsArray;lambda <- lambdaArray) yield {
val (rmse,time) = trainModel(trainData,validationData,rank,numIterations,lambda)
(rank,numIterations,lambda,rmse)
}
val Eval = (evaluations.sortBy(_._4))
val BestEval = Eval(0)
println("最佳model参数:rank:"+BestEval._1+",iterations:"+BestEval._2+"lambda:"+BestEval._3+"结果RMSE="+BestEval._4)
val bestModel = ALS.train(trainData,BestEval._1,BestEval._2,BestEval._3)
(bestModel)
}
def trainValidation(trainData:RDD[Rating],validationData:RDD[Rating]):MatrixFactorizationModel={
println("------评估rank参数使用------")
evaluateParameter(trainData,validationData,"rank",Array(5,10,15,20,50,100),Array(10),Array(0.1))
println("------评估numIterations参数使用------")
evaluateParameter(trainData,validationData,"numIterations",Array(10),Array(5,10,15,20,25),Array(0.1))
println("------评估lambda参数使用------")
evaluateParameter(trainData,validationData,"lambda",Array(10),Array(10),Array(0.05,0.1,1,5,10))
println("------所有参数交叉评估找出最好的参数组合------")
val bestModel = evaluateAllParameter(trainData,validationData,Array(5,10,15,20,50,100),Array(5,10,15,20,25),Array(0.05,0.1,1,5,10))
return bestModel
}
def main(args:Array[String]) = {
setLogger
println("========数据准备阶段========")
val (trainData,validationData,testData) = PrepareData()
trainData.persist()
validationData.persist()
testData.persist()
println("========训练验证阶段========")
val bestModel = trainValidation(trainData,validationData)
println("========测试阶段========")
val testRmse = computeRmse(bestModel,testData)
println("使用testData测试bestModel的结果RMSE="+testRmse)
//trainData.unpersist()
// validationData.unpersist()
// testData.unpersist()
}
}
结果展示
我们首先来看rank对模型训练的影响,可以看到,rank对模型的误差影响不大,但随着rank 的增加,模型训练时间会有所增加
接下来我们看numIterations,该参数对模型的误差影响也不大,但随着参数数值的增大,模型运行时间变长。
最后是lambda参数,lambda参数对模型的运行时间没有影响,但是随着lambda的增大,模型的训练误差逐渐增大。
那么使用什么样的参数组合能够带来最佳的模型训练效果呢,答案如下:
这里答案不是唯一的,每次训练可能得到不同的结果。
8、代码知识点整理
根据上面的两段完整的代码,我整理了一些我第一次接触到的,不是太熟悉的地方:
设置打印输出
spark运行时会显示很多log信息,信息太多会和正常信息混合在一起,所以可以根据如下代码设置不显示log信息:
def setLogger ={
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("com").setLevel(Level.OFF)
System.setProperty("spark.ui.showConsoleProgress","false")
Logger.getRootLogger().setLevel(Level.OFF)
}
randomSplit函数切分RDD
使用randomSplit函数对RDD进行切分:
val Array(trainData,validationData,testData) = ratingsRDD.randomSplit(Array(0.8,0.8,0.1))
打印输出的花样
println(f"训练参数:rank:$rank%3d,iterations;$numIterations%.2f;lambda:$lambda%.2f 结果 RMSE=$Rmse%.2f"+"训练需要时间为:"+duration.getMillis+"毫秒")
scala中使用yield
Scala中的yield的主要作用是记住每次迭代中的有关值,并逐一存入到一个数组中。用法如下:for {子句} yield {变量或表达式}
val evaluations = for(rank <- rankArray;numIterations <- numIterationsArray;lambda <- lambdaArray) yield {
val (rmse,time) = trainModel(trainData,validationData,rank,numIterations,lambda)
(rank,numIterations,lambda,rmse)
}