Spark MLlib ALS 推荐系统

2018-06-06  本文已影响0人  博弈史密斯

Spark 机器学习库从 1.2 版本以后被分为两个包:

从Spark2.0开始,基于RDD的API进入维护模式(即不增加任何新的特性),并预期于3.0版本的时候被移除出MLLib。因此,我们将以ml包为主进行介绍。

一个典型的机器学习过程从数据收集开始,要经历多个步骤,才能得到需要的输出。这非常类似于流水线式工作,即通常会包含源数据ETL(抽取、转化、加载),数据预处理,指标提取,模型训练与交叉验证,新数据预测等步骤。

在介绍工作流之前,我们先来了解几个重要概念:

工作流如何工作

要构建一个 Pipeline工作流,首先需要定义 Pipeline 中的各个工作流阶段PipelineStage,(包括转换器和评估器),比如指标提取和转换模型训练等。有了这些处理特定问题的转换器和 评估器,就可以按照具体的处理逻辑有序的组织PipelineStages 并创建一个Pipeline。比如:

val pipeline = new Pipeline().setStages(Array(stage1,stage2,stage3,…))

然后就可以把训练数据集作为输入参数,调用 Pipeline 实例的 fit 方法来开始以流的方式来处理源训练数据。这个调用会返回一个 PipelineModel 类实例,进而被用来预测测试数据的标签。更具体的说,工作流的各个阶段按顺序运行,输入的DataFrame在它通过每个阶段时被转换。 对于Transformer阶段,在DataFrame上调用transform()方法。 对于估计器阶段,调用fit()方法来生成一个转换器(它成为PipelineModel的一部分或拟合的Pipeline),并且在DataFrame上调用该转换器的transform()方法。


ml-pipeline

上面,顶行表示具有三个阶段的流水线。 前两个(Tokenizer和HashingTF)是Transformers(蓝色),第三个(LogisticRegression)是Estimator(红色)。 底行表示流经管线的数据,其中圆柱表示DataFrames。 在原始DataFrame上调用Pipeline.fit()方法,它具有原始文本文档和标签。 Tokenizer.transform()方法将原始文本文档拆分为单词,向DataFrame添加一个带有单词的新列。 HashingTF.transform()方法将字列转换为特征向量,向这些向量添加一个新列到DataFrame。 现在,由于LogisticRegression是一个Estimator,Pipeline首先调用LogisticRegression.fit()产生一个LogisticRegressionModel。 如果流水线有更多的阶段,则在将DataFrame传递到下一个阶段之前,将在DataFrame上调用LogisticRegressionModel的transform()方法。

SparkSession是spark2.0的全新切入点,用以替代 sparkcontext ,StreamingContext,sqlContext,HiveContext。

直接隐式反馈。

al Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))

val alsImplicit = new ALS().setMaxIter(5).setRegParam(0.01).setImplicitPrefs(true). 
    setUserCol("userId").setItemCol("movieId").setRatingCol("rating")

​ 在 ML 中的实现有如下的参数:

可以调整这些参数,不断优化结果,使均方差变小。比如:maxIter越大,regParam越 小,均方差会越小,推荐结果较优。

接下来,把推荐模型放在训练数据上训练:
val modelImplicit = alsImplicit.fit(training)

使用训练好的推荐模型对测试集中的用户商品进行预测评分,得到预测评分的数据集:
val predictionsImplicit = modelImplicit.transform(test)

结果:

predictionsImplicit.show()
 
+------+-------+------+----------+-----------+
|userId|movieId|rating| timestamp| prediction|
+------+-------+------+----------+-----------+
|    13|     31|   1.0|1424380312| 0.33150947|
|     5|     31|   1.0|1424380312|-0.24669354|
|    24|     31|   1.0|1424380312|-0.22434244|
|    29|     31|   1.0|1424380312| 0.15776125|
|     0|     31|   1.0|1424380312| 0.51940984|
|    28|     85|   1.0|1424380312| 0.88610375|
|    13|     85|   1.0|1424380312| 0.15872183|
|    20|     85|   2.0|1424380312| 0.64086926|
|     4|     85|   1.0|1424380312|-0.06314563|
|     8|     85|   5.0|1424380312|  0.2783457|
|     7|     85|   4.0|1424380312|  0.1618208|
|    29|     85|   1.0|1424380312|-0.19970453|
|    19|     65|   1.0|1424380312| 0.11606887|
|     4|     65|   1.0|1424380312|0.068018675|
|     2|     65|   1.0|1424380312| 0.28533924|
|    12|     53|   1.0|1424380312| 0.42327875|
|    20|     53|   3.0|1424380312| 0.17345423|
|    19|     53|   2.0|1424380312| 0.33321634|
|     8|     53|   5.0|1424380312| 0.10090684|
|    23|     53|   1.0|1424380312| 0.06724724|
+------+-------+------+----------+-----------+
only showing top 20 rows                   
模型评估

​ 通过计算模型的均方根误差来对模型进行评估,均方根误差越小,模型越准确:

val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("rating"). 
    setPredictionCol("prediction")

val rmseImplicit = evaluator.evaluate(predictionsImplicit)
//  rmseImplicit: Double = 1.8011620822359165

可以看到打分的均方差值为1.69和1.80左右。由于本例的数据量很少,预测的结果和实际相比有一定的差距。

使用推荐系统

ALS fit 方法返回 ALSModel,其有 recommendForAllUsers(numItems: Int) 和 recommendForAllItems(numUsers: Int) 方法,用于推荐。

例子
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS

case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
def parseRating(str: String): Rating = {
  val fields = str.split("::")
  assert(fields.size == 4)
  Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
}

val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt")
  .map(parseRating)
  .toDF()
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))

// Build the recommendation model using ALS on the training data
val als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating")
val model = als.fit(training)

// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop")
val predictions = model.transform(test)

val evaluator = new RegressionEvaluator()
  .setMetricName("rmse")
  .setLabelCol("rating")
  .setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")

// Generate top 10 movie recommendations for each user
val userRecs = model.recommendForAllUsers(10)
// Generate top 10 user recommendations for each movie
val movieRecs = model.recommendForAllItems(10)

// Generate top 10 movie recommendations for a specified set of users
val users = ratings.select(als.getUserCol).distinct().limit(3)
val userSubsetRecs = model.recommendForUserSubset(users, 10)
// Generate top 10 user recommendations for a specified set of movies
val movies = ratings.select(als.getItemCol).distinct().limit(3)
val movieSubSetRecs = model.recommendForItemSubset(movies, 10)
上一篇下一篇

猜你喜欢

热点阅读