spark||flink||scala程序员半栈工程师

使用Spark创建搜索引擎

2017-09-01  本文已影响952人  文哥的学习日记

1、背景介绍

1.1 项目背景

假设有一个在线电影网站,会员可以付费在线观赏电影,公司希望能偶运用大数据分析推荐引擎,增加会员观看影片的次数以增加营业收入,主要有以下两个目的:
针对用户推荐感兴趣的电影:可以针对每一个会员,定期发送短信或者E-mail或会员登录时,推荐给他/她可能会感兴趣的电影。
针对电影推荐给感兴趣的用户:当想要促销某些电影时,也可以找出可能会对这些电影感兴趣的会员。

2、数据背景

我们使用https://grouplens.org/datasets/movielens/网站提供的电影评分数据,如下图所示,我们选择ml-100k数据。

2、ALS算法介绍

该算法介绍可以参考博客:http://blog.csdn.net/oucpowerman/article/details/49847979,这里就不再详细介绍。

3、数据查看

我们使用spark2.1和scala2.11版本来实现我们的推荐系统,我们使用spark-shell命令来进入我们Spark的交互环境,创建一个SparkSession:

接下来,我们导入我们的评分数据,并查看数据的一些特征:

可以看懂,我们的数据一共分为4列,分别为用户id,电影id,评分以及日期时间,我们可以看一下每个字段的统计特征:

4、使用ALS训练模型

我们查看了原始的训练数据,原始的数据有四列,但是我们真正需要的只是其中的前三列,所以这里需要进行一个转换,要想使用ALS进行模型的训练,我们需要进一步将数据转换为RDD[Rating]类型,最后,我们可以进行训练,得到我们的模型。

引入相关类库

提取数据的前三列
我们这里使用一个map的方法,原始数据的一行是一个字符串,我们需要使用split进行分割,分割之后我们提取前三个字段即可:

可以看到,这里我们使用了匿名函数和匿名参数的方式,_就代表我们传入的每一行数据。

转换为RDD[Rating]
接下来,我们仍然使用map方法,将RDD中的每一行进行一个转换:

模型训练
接下来我们就可以使用ALS.train方法进行模型的训练了,返回的模型的类型是org.apache.spark.mllib.recommendation.MatrixFactorizationModel

train方法接收的参数如下表:

参数 说明
Ratings:RDD[Rating] 输入的训练数据,格式为:Rating(UserID,productID,rating)的RDD
rank:Int rank指当进行矩阵分解时,将原本矩阵A(mn)分解为X(mrank) 和 Y(rank*n)中的rank值
Iterations:Int ALS算法迭代的次数
lambda:Double 建议值0.01

针对用户推荐电影
我们已经完成了数据训练,接下来我们就可以对用户推荐电影,比如我们要对id为196的用户推荐5部电影:

可见,该方法返回的是一个数组,我们进一步使用foreach方法打印每一行,返回的一条是Rating类型的,第一个代表用户id,第二个代表电影id,第三个代表推荐评分,评分最高说明越值得推荐。

查看针对用户推荐产品的评分
我们可以查询系统对用户推荐产品的评分,使用predict方法:

针对电影推荐给用户
当我们想要促销某些电影时,可以找出可能会对这些电影感兴趣的会员,我们可以使用model.recommendUsers方法针对某一部电影推荐相关的会员:

可以看到,返回数据跟给用户推荐电影的格式是相同的。

将电影名称和id进行绑定
首先我们读取数据:

读取后的每一条数据仍然是一个字符串,我们需要使用split进行分割,由于|是scala正则表达式中的一个特殊字符,所以我们需要进行转义,最后使用collectAsMap方法,将RDD转换成一个类似Map的数据,这样我们就可以通过电影id来查找电影的名字。

最后,我们只需要做一个简单的map运算,用电影的名称替换电影的id即可。

5、Demo完整代码

package mllib.ALSrecommend

import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession


object RecommendSystem {

  def main(args:Array[String]) = {
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
     // .enableHiveSupport()
      //.config("spark.some.config.option", "some-value")
      .getOrCreate()
    val rawUserData = spark.sparkContext.textFile("data/ml-100k/u.data")
    println(rawUserData)
    println(rawUserData.first())
    rawUserData.take(5).foreach(println)
    println(rawUserData.map(_.split("\t")(0).toDouble).stats())
    println(rawUserData.map(_.split("\t")(1).toDouble).stats())
    println(rawUserData.map(_.split("\t")(2).toDouble).stats())
    println(rawUserData.map(_.split("\t")(3).toDouble).stats())

    val rawRatings = rawUserData.map(_.split("\t").take(3))
    val ratingsRDD = rawRatings.map{case Array(user,movie,rating)=>Rating(user.toInt,movie.toInt,rating.toDouble)}

    val model = ALS.train(ratingsRDD,10,10,0.01)
    println(model)

    model.recommendProducts(196,5).foreach(println)
    println(model.predict(196,1643))

    model.recommendUsers(464,5).foreach(println)

    val itemRDD = spark.sparkContext.textFile("data/ml-100k/u.item")
    val movieTitle = itemRDD.map(line=>line.split("\\|").take(2)).map(array=>(array(0).toInt,array(1))).collectAsMap()
    movieTitle.take(5).foreach(println)

    println(movieTitle(146))

    model.recommendProducts(196,5).map(ratings=>(ratings.product,movieTitle(ratings.product),ratings.rating)).foreach(println)

  }
}

6、创建Recommend项目

上面几部分从整体上介绍了如何通过Spark构造一个推荐引擎,但复用性不是很好,所以,我们将上面的代码进行整理,构造一个推荐系统的类,类中几个重要的函数如下:

读取数据
读取数据函数我们返回两部分,首先是RDD[Rating]类型的训练数据,其次是Map类型的电影id和电影名称的对应数据表。

 def PrepareData():(RDD[Rating],Map[Int,String])={
      val spark = SparkSession
        .builder()
        .appName("Spark SQL basic example")
        // .enableHiveSupport()
        //.config("spark.some.config.option", "some-value")
        .getOrCreate()
      print("开始读取用户评分数据")
      val rawUserData = spark.sparkContext.textFile("data/ml-100k/u.data")
      val ratings = rawUserData.map(_.split("\t").take(3))
      val ratingsRDD = ratings.map{case Array(user,movie,rating)=>Rating(user.toInt,movie.toInt,rating.toDouble)}
      println("共计"+ratingsRDD.count.toString()+"条ratings")

      print("开始读取电影数据")
      val itemRDD = spark.sparkContext.textFile("data/ml-100k/u.item")
      val movieTitle = itemRDD.map(line=>line.split("\\|").take(2)).map(array=>(array(0).toInt,array(1))).collect().toMap
      //显示数据记录数
      val numRatings = ratingsRDD.count()
      val numUsers = ratingsRDD.map(_.user).distinct().count()
      val numMovies = ratingsRDD.map(_.product).distinct().count()
      println("共计:ratings:"+numRatings+" User:"+numUsers+" Movies:"+numMovies)
      return (ratingsRDD,movieTitle)
    }

电影推荐和用户推荐
接下来是电影推荐和用户推荐的函数:

def RecommendMovies(model:MatrixFactorizationModel,movieTitle:Map[Int,String],inputUserID:Int):Unit= {
      val RecommendMovie = model.recommendProducts(inputUserID,10)
      var i = 1
      println("针对用户"+inputUserID+" 推荐下列电影:")
      RecommendMovie.foreach{r => println(i.toString()+"."+movieTitle(r.product)+" 评分:"+r.rating.toString());i+=1}
    }

def RecommendUsers(model:MatrixFactorizationModel,movieTitle:Map[Int,String],inputMovieID:Int):Unit= {
      val RecommendUser = model.recommendUsers(inputMovieID,10)
      var i = 1
      println("针对电影"+inputMovieID+" 推荐下列用户:")
      RecommendUser.foreach{r => println(i.toString()+"."+movieTitle(r.user)+" 评分:"+r.rating.toString());i+=1}
    }

推荐入口

def recommend(model:MatrixFactorizationModel,movieTitle:Map[Int,String]):Unit={
      var choose=""
      while(choose!="3"){
        print("请选择推荐类型,1:针对用户推荐电影,2:针对电影推荐给感兴趣的用户,3:离开")
        choose = scala.io.StdIn.readLine()
        if (choose=="1"){
          print("请输入用户id?")
          val inputUserID = scala.io.StdIn.readLine()
          RecommendMovies(model,movieTitle,inputUserID.toInt)
        }
        else if(choose=="2") {
          print("请输入电影的id?")
          val inputMovieID = scala.io.StdIn.readLine()
          RecommendUsers(model, movieTitle, inputMovieID.toInt)
        }
      }

    }

主函数

def main(args:Array[String]):Unit={
      setLogger
      val (ratings,movieTitle) = PrepareData()
      val model = ALS.train(ratings,5,20,0.1)
      recommend(model,movieTitle)
    }

完整代码

package mllib.ALSrecommend
import org.apache.spark.mllib.recommendation.{ALS, Rating,MatrixFactorizationModel}
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import scala.io.StdIn
import org.apache.log4j.Logger
import org.apache.log4j.Level

object Recommend {

  def setLogger ={
    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("com").setLevel(Level.OFF)
    System.setProperty("spark.ui.showConsoleProgress","false")
    Logger.getRootLogger().setLevel(Level.OFF)
  }

    def PrepareData():(RDD[Rating],Map[Int,String])={
      val spark = SparkSession
        .builder()
        .appName("Spark SQL basic example")
        // .enableHiveSupport()
        //.config("spark.some.config.option", "some-value")
        .getOrCreate()
      print("开始读取用户评分数据")
      val rawUserData = spark.sparkContext.textFile("data/ml-100k/u.data")
      val ratings = rawUserData.map(_.split("\t").take(3))
      val ratingsRDD = ratings.map{case Array(user,movie,rating)=>Rating(user.toInt,movie.toInt,rating.toDouble)}
      println("共计"+ratingsRDD.count.toString()+"条ratings")

      print("开始读取电影数据")
      val itemRDD = spark.sparkContext.textFile("data/ml-100k/u.item")
      val movieTitle = itemRDD.map(line=>line.split("\\|").take(2)).map(array=>(array(0).toInt,array(1))).collect().toMap
      //显示数据记录数
      val numRatings = ratingsRDD.count()
      val numUsers = ratingsRDD.map(_.user).distinct().count()
      val numMovies = ratingsRDD.map(_.product).distinct().count()
      println("共计:ratings:"+numRatings+" User:"+numUsers+" Movies:"+numMovies)
      return (ratingsRDD,movieTitle)
    }

    def RecommendMovies(model:MatrixFactorizationModel,movieTitle:Map[Int,String],inputUserID:Int):Unit= {
      val RecommendMovie = model.recommendProducts(inputUserID,10)
      var i = 1
      println("针对用户"+inputUserID+" 推荐下列电影:")
      RecommendMovie.foreach{r => println(i.toString()+"."+movieTitle(r.product)+" 评分:"+r.rating.toString());i+=1}
    }

    def RecommendUsers(model:MatrixFactorizationModel,movieTitle:Map[Int,String],inputMovieID:Int):Unit= {
      val RecommendUser = model.recommendUsers(inputMovieID,10)
      var i = 1
      println("针对电影"+inputMovieID+" 推荐下列用户:")
      RecommendUser.foreach{r => println(i.toString()+"."+movieTitle(r.user)+" 评分:"+r.rating.toString());i+=1}
    }

    def recommend(model:MatrixFactorizationModel,movieTitle:Map[Int,String]):Unit={
      var choose=""
      while(choose!="3"){
        print("请选择推荐类型,1:针对用户推荐电影,2:针对电影推荐给感兴趣的用户,3:离开")
        choose = scala.io.StdIn.readLine()
        if (choose=="1"){
          print("请输入用户id?")
          val inputUserID = scala.io.StdIn.readLine()
          RecommendMovies(model,movieTitle,inputUserID.toInt)
        }
        else if(choose=="2") {
          print("请输入电影的id?")
          val inputMovieID = scala.io.StdIn.readLine()
          RecommendUsers(model, movieTitle, inputMovieID.toInt)
        }
      }

    }
    def main(args:Array[String]):Unit={
      setLogger
      val (ratings,movieTitle) = PrepareData()
      val model = ALS.train(ratings,5,20,0.1)
      recommend(model,movieTitle)
    }

}

7、ALS.train方法参数调优

机器学习中很重要的一个部分就是对代码参数进行调优,调优的两个重要衡量指标是代码的运行时间以及运行的结果好坏,前面我们已经介绍了,ALS.train方法主要涉及了三个参数,分别为:rank,iterations和lambda,那么这三个参数分别如何影响代码的性能,以及如何选择三者的组合来达到最好的效率呢,这是本节主要讨论的目的。

准备数据
和上面有所不同的是,这里我们要准备数据,将数据分为训练集,验证集和测试集,返回三个RDD[Rating]类型的数据:

def PrepareData():(RDD[Rating],RDD[Rating],RDD[Rating])={
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      // .enableHiveSupport()
      //.config("spark.some.config.option", "some-value")
      .getOrCreate()
    print("开始读取用户评分数据")
    val rawUserData = spark.sparkContext.textFile("data/ml-100k/u.data")
    val ratings = rawUserData.map(_.split("\t").take(3))
    val ratingsRDD = ratings.map{case Array(user,movie,rating)=>Rating(user.toInt,movie.toInt,rating.toDouble)}
    println("共计"+ratingsRDD.count.toString()+"条ratings")

    println("开始读取电影数据")
    val itemRDD = spark.sparkContext.textFile("data/ml-100k/u.item")
    val movieTitle = itemRDD.map(line=>line.split("\\|").take(2)).map(array=>(array(0).toInt,array(1))).collect().toMap
    //显示数据记录数
    val numRatings = ratingsRDD.count()
    val numUsers = ratingsRDD.map(_.user).distinct().count()
    val numMovies = ratingsRDD.map(_.product).distinct().count()
    println("共计:ratings:"+numRatings+" User:"+numUsers+" Movies:"+numMovies)

    println("以随机数的方式将数据分为3个部分并返回")
    val Array(trainData,validationData,testData) = ratingsRDD.randomSplit(Array(0.8,0.8,0.1))
    println("trainData:"+trainData.count()+",validationData:"+validationData.count()+",testData:"+testData.count())
    return (trainData,validationData,testData)
  }

训练评估

def trainValidation(trainData:RDD[Rating],validationData:RDD[Rating]):MatrixFactorizationModel={
    println("------评估rank参数使用------")
    evaluateParameter(trainData,validationData,"rank",Array(5,10,15,20,50,100),Array(10),Array(0.1))
    println("------评估numIterations参数使用------")
    evaluateParameter(trainData,validationData,"numIterations",Array(10),Array(5,10,15,20,25),Array(0.1))
    println("------评估lambda参数使用------")
    evaluateParameter(trainData,validationData,"lambda",Array(10),Array(10),Array(0.05,0.1,1,5,10))
    println("------所有参数交叉评估找出最好的参数组合------")
    val bestModel = evaluateAllParameter(trainData,validationData,Array(5,10,15,20,50,100),Array(5,10,15,20,25),Array(0.05,0.1,1,5,10))
    return bestModel

可以看到,训练评估函数中涉及了两个方法,分别用于对单个参数进行测试和三个参数整体进行调优,两个方法分别如下:

def evaluateParameter(trainData:RDD[Rating],validationData:RDD[Rating],evaluateParameter:String,rankArray:Array[Int],numIterationsArray:Array[Int],lambdaArray:Array[Double])={
    val dataBarChart = new DefaultCategoryDataset()
    val dataLineChart = new DefaultCategoryDataset()
    for(rank <- rankArray;numIterations <- numIterationsArray;lambda <- lambdaArray){
      val (rmse,time) = trainModel(trainData,validationData,rank,numIterations,lambda)
      val parameterData = evaluateParameter match{
        case "rank" => rank;
        case "numIterations" => numIterations;
        case "lambda" => lambda
      }
      dataBarChart.addValue(rmse,evaluateParameter,parameterData.toString())
      dataLineChart.addValue(time,"Time",parameterData.toString())

    }
    Chart.plotBarLineChart("ALS evaluations " + evaluateParameter,evaluateParameter,"RMSE",0.58,5,"Time",dataBarChart,dataLineChart)

  }
def evaluateAllParameter(trainData: RDD[Rating], validationData: RDD[Rating], rankArray: Array[Int], numIterationsArray: Array[Int], lambdaArray: Array[Double]):MatrixFactorizationModel={
    val evaluations = for(rank <- rankArray;numIterations <- numIterationsArray;lambda <- lambdaArray) yield {
      val (rmse,time) = trainModel(trainData,validationData,rank,numIterations,lambda)
      (rank,numIterations,lambda,rmse)
    }
    val Eval = (evaluations.sortBy(_._4))
    val BestEval = Eval(0)
    println("最佳model参数:rank:"+BestEval._1+",iterations:"+BestEval._2+"lambda:"+BestEval._3+"结果RMSE="+BestEval._4)
    val bestModel = ALS.train(trainData,BestEval._1,BestEval._2,BestEval._3)
    (bestModel)
  }

其中,训练模型和计算RMSE的函数分别如下:

 def computeRmse(model:MatrixFactorizationModel,ratingRDD:RDD[Rating]):Double={
    val num = ratingRDD.count()
    val predictedRDD = model.predict(ratingRDD.map(line=>(line.user,line.product)))
    val kvRatingRDD = ratingRDD.map(r=>((r.user,r.product),r.rating))
    val predictedAndRatings = predictedRDD.map(p=>((p.user,p.product),p.rating)).join(kvRatingRDD).values

    math.sqrt(predictedAndRatings.map(x=>(x._1-x._2) * (x._1-x._2)).reduce((_+_))/num)

  }
def trainModel(trainData:RDD[Rating],validationData:RDD[Rating],rank:Int,numIterations:Int,lambda:Double):(Double,Double) ={
    val startTime = new DateTime()
    val model = ALS.train(trainData,rank,numIterations,lambda)
    val endTime = new DateTime()
    val Rmse = computeRmse(model,validationData)
    val duration = new Duration(startTime,endTime)
    println(f"训练参数:rank:$rank%3d,iterations;$numIterations%.2f;lambda:$lambda%.2f 结果 RMSE=$Rmse%.2f"+"训练需要时间为:"+duration.getMillis+"毫秒")
    return (Rmse,duration.getStandardSeconds)

  }

画图类
在测试单个参数的性能时,我们引入了画图类,画图类的代码如下所示:

package mllib.ALSrecommend

import org.jfree.chart._
import org.jfree.data.xy._
import org.jfree.data.category.DefaultCategoryDataset
import org.jfree.chart.axis.NumberAxis
import org.jfree.chart.axis._
import java.awt.Color
import org.jfree.chart.renderer.category.LineAndShapeRenderer;
import org.jfree.chart.plot.DatasetRenderingOrder;
import org.jfree.chart.labels.StandardCategoryToolTipGenerator;
import java.awt.BasicStroke

object Chart {
  def plotBarLineChart(Title: String, xLabel: String, yBarLabel: String, yBarMin: Double, yBarMax: Double, yLineLabel: String, dataBarChart : DefaultCategoryDataset, dataLineChart: DefaultCategoryDataset): Unit = {

    //画出Bar Chart
    val chart = ChartFactory
      .createBarChart(
        "", // Bar Chart 标题
        xLabel, // X轴标题
        yBarLabel, // Bar Chart 标题 y轴标题l
        dataBarChart , // Bar Chart数据
        org.jfree.chart.plot.PlotOrientation.VERTICAL,//画图方向垂直
        true, // 包含 legend
        true, // 显示tooltips
        false // 不要URL generator
      );
    //取得plot
    val plot = chart.getCategoryPlot();
    plot.setBackgroundPaint(new Color(0xEE, 0xEE, 0xFF));
    plot.setDomainAxisLocation(AxisLocation.BOTTOM_OR_RIGHT);
    plot.setDataset(1, dataLineChart); plot.mapDatasetToRangeAxis(1, 1)
    //画直方图y轴
    val vn = plot.getRangeAxis(); vn.setRange(yBarMin, yBarMax);  vn.setAutoTickUnitSelection(true)
    //画折线图y轴
    val axis2 = new NumberAxis(yLineLabel); plot.setRangeAxis(1, axis2);
    val renderer2 = new LineAndShapeRenderer()
    renderer2.setToolTipGenerator(new StandardCategoryToolTipGenerator());
    //设置先画直方图,再画折线图以免折线图被盖掉
    plot.setRenderer(1, renderer2);plot.setDatasetRenderingOrder(DatasetRenderingOrder.FORWARD);
    //创建画框
    val frame = new ChartFrame(Title,chart); frame.setSize(500, 500);
    frame.pack(); frame.setVisible(true)
  }
}

主函数

def main(args:Array[String]) = {
    setLogger
    println("========数据准备阶段========")
    val (trainData,validationData,testData) = PrepareData()
    trainData.persist()
    validationData.persist()
    testData.persist()
    println("========训练验证阶段========")
    val bestModel = trainValidation(trainData,validationData)
    println("========测试阶段========")
    val testRmse = computeRmse(bestModel,testData)
    println("使用testData测试bestModel的结果RMSE="+testRmse)
   //trainData.unpersist()
//    validationData.unpersist()
   // testData.unpersist()
  }

完整代码

package mllib.ALSrecommend

import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.recommendation.{ALS, Rating,MatrixFactorizationModel}
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.jfree.data.category.DefaultCategoryDataset
import org.joda.time.format._
import org.joda.time._
import org.joda.time.Duration

object AlsEvaluation {
  def setLogger ={
    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("com").setLevel(Level.OFF)
    System.setProperty("spark.ui.showConsoleProgress","false")
    Logger.getRootLogger().setLevel(Level.OFF)
  }

  def PrepareData():(RDD[Rating],RDD[Rating],RDD[Rating])={
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      // .enableHiveSupport()
      //.config("spark.some.config.option", "some-value")
      .getOrCreate()
    print("开始读取用户评分数据")
    val rawUserData = spark.sparkContext.textFile("data/ml-100k/u.data")
    val ratings = rawUserData.map(_.split("\t").take(3))
    val ratingsRDD = ratings.map{case Array(user,movie,rating)=>Rating(user.toInt,movie.toInt,rating.toDouble)}
    println("共计"+ratingsRDD.count.toString()+"条ratings")

    println("开始读取电影数据")
    val itemRDD = spark.sparkContext.textFile("data/ml-100k/u.item")
    val movieTitle = itemRDD.map(line=>line.split("\\|").take(2)).map(array=>(array(0).toInt,array(1))).collect().toMap
    //显示数据记录数
    val numRatings = ratingsRDD.count()
    val numUsers = ratingsRDD.map(_.user).distinct().count()
    val numMovies = ratingsRDD.map(_.product).distinct().count()
    println("共计:ratings:"+numRatings+" User:"+numUsers+" Movies:"+numMovies)

    println("以随机数的方式将数据分为3个部分并返回")
    val Array(trainData,validationData,testData) = ratingsRDD.randomSplit(Array(0.8,0.8,0.1))
    println("trainData:"+trainData.count()+",validationData:"+validationData.count()+",testData:"+testData.count())
    return (trainData,validationData,testData)

  }

  def computeRmse(model:MatrixFactorizationModel,ratingRDD:RDD[Rating]):Double={
    val num = ratingRDD.count()
    val predictedRDD = model.predict(ratingRDD.map(line=>(line.user,line.product)))
    val kvRatingRDD = ratingRDD.map(r=>((r.user,r.product),r.rating))
    val predictedAndRatings = predictedRDD.map(p=>((p.user,p.product),p.rating)).join(kvRatingRDD).values

    math.sqrt(predictedAndRatings.map(x=>(x._1-x._2) * (x._1-x._2)).reduce((_+_))/num)

  }
  def trainModel(trainData:RDD[Rating],validationData:RDD[Rating],rank:Int,numIterations:Int,lambda:Double):(Double,Double) ={
    val startTime = new DateTime()
    val model = ALS.train(trainData,rank,numIterations,lambda)
    val endTime = new DateTime()
    val Rmse = computeRmse(model,validationData)
    val duration = new Duration(startTime,endTime)
    println(f"训练参数:rank:$rank%3d,iterations;$numIterations%.2f;lambda:$lambda%.2f 结果 RMSE=$Rmse%.2f"+"训练需要时间为:"+duration.getMillis+"毫秒")
    return (Rmse,duration.getStandardSeconds)

  }

  def evaluateParameter(trainData:RDD[Rating],validationData:RDD[Rating],evaluateParameter:String,rankArray:Array[Int],numIterationsArray:Array[Int],lambdaArray:Array[Double])={
    val dataBarChart = new DefaultCategoryDataset()
    val dataLineChart = new DefaultCategoryDataset()
    for(rank <- rankArray;numIterations <- numIterationsArray;lambda <- lambdaArray){
      val (rmse,time) = trainModel(trainData,validationData,rank,numIterations,lambda)
      val parameterData = evaluateParameter match{
        case "rank" => rank;
        case "numIterations" => numIterations;
        case "lambda" => lambda
      }
      dataBarChart.addValue(rmse,evaluateParameter,parameterData.toString())
      dataLineChart.addValue(time,"Time",parameterData.toString())

    }
    Chart.plotBarLineChart("ALS evaluations " + evaluateParameter,evaluateParameter,"RMSE",0.58,5,"Time",dataBarChart,dataLineChart)

  }

  def evaluateAllParameter(trainData: RDD[Rating], validationData: RDD[Rating], rankArray: Array[Int], numIterationsArray: Array[Int], lambdaArray: Array[Double]):MatrixFactorizationModel={
    val evaluations = for(rank <- rankArray;numIterations <- numIterationsArray;lambda <- lambdaArray) yield {
      val (rmse,time) = trainModel(trainData,validationData,rank,numIterations,lambda)
      (rank,numIterations,lambda,rmse)
    }
    val Eval = (evaluations.sortBy(_._4))
    val BestEval = Eval(0)
    println("最佳model参数:rank:"+BestEval._1+",iterations:"+BestEval._2+"lambda:"+BestEval._3+"结果RMSE="+BestEval._4)
    val bestModel = ALS.train(trainData,BestEval._1,BestEval._2,BestEval._3)
    (bestModel)
  }


  def trainValidation(trainData:RDD[Rating],validationData:RDD[Rating]):MatrixFactorizationModel={
    println("------评估rank参数使用------")
    evaluateParameter(trainData,validationData,"rank",Array(5,10,15,20,50,100),Array(10),Array(0.1))
    println("------评估numIterations参数使用------")
    evaluateParameter(trainData,validationData,"numIterations",Array(10),Array(5,10,15,20,25),Array(0.1))
    println("------评估lambda参数使用------")
    evaluateParameter(trainData,validationData,"lambda",Array(10),Array(10),Array(0.05,0.1,1,5,10))
    println("------所有参数交叉评估找出最好的参数组合------")
    val bestModel = evaluateAllParameter(trainData,validationData,Array(5,10,15,20,50,100),Array(5,10,15,20,25),Array(0.05,0.1,1,5,10))
    return bestModel

  }
  def main(args:Array[String]) = {
    setLogger
    println("========数据准备阶段========")
    val (trainData,validationData,testData) = PrepareData()
    trainData.persist()
    validationData.persist()
    testData.persist()
    println("========训练验证阶段========")
    val bestModel = trainValidation(trainData,validationData)
    println("========测试阶段========")
    val testRmse = computeRmse(bestModel,testData)
    println("使用testData测试bestModel的结果RMSE="+testRmse)
   //trainData.unpersist()
//    validationData.unpersist()
   // testData.unpersist()
  }
}

结果展示

我们首先来看rank对模型训练的影响,可以看到,rank对模型的误差影响不大,但随着rank 的增加,模型训练时间会有所增加

接下来我们看numIterations,该参数对模型的误差影响也不大,但随着参数数值的增大,模型运行时间变长。

最后是lambda参数,lambda参数对模型的运行时间没有影响,但是随着lambda的增大,模型的训练误差逐渐增大。

那么使用什么样的参数组合能够带来最佳的模型训练效果呢,答案如下:



这里答案不是唯一的,每次训练可能得到不同的结果。

8、代码知识点整理

根据上面的两段完整的代码,我整理了一些我第一次接触到的,不是太熟悉的地方:

设置打印输出
spark运行时会显示很多log信息,信息太多会和正常信息混合在一起,所以可以根据如下代码设置不显示log信息:

def setLogger ={
    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("com").setLevel(Level.OFF)
    System.setProperty("spark.ui.showConsoleProgress","false")
    Logger.getRootLogger().setLevel(Level.OFF)
  }

randomSplit函数切分RDD
使用randomSplit函数对RDD进行切分:

 val Array(trainData,validationData,testData) = ratingsRDD.randomSplit(Array(0.8,0.8,0.1))

打印输出的花样

println(f"训练参数:rank:$rank%3d,iterations;$numIterations%.2f;lambda:$lambda%.2f 结果 RMSE=$Rmse%.2f"+"训练需要时间为:"+duration.getMillis+"毫秒")

scala中使用yield
Scala中的yield的主要作用是记住每次迭代中的有关值,并逐一存入到一个数组中。用法如下:for {子句} yield {变量或表达式}

val evaluations = for(rank <- rankArray;numIterations <- numIterationsArray;lambda <- lambdaArray) yield {
      val (rmse,time) = trainModel(trainData,validationData,rank,numIterations,lambda)
      (rank,numIterations,lambda,rmse)
    }
上一篇下一篇

猜你喜欢

热点阅读