高级场景面试题产品

业务实战场景(十四)推荐系统

2022-06-14  本文已影响0人  后来丶_a24d

思维导图

思维导图.png

系列总目录


推荐系统简介

1. 推荐系统分类

推荐系统分类.png
1.1 根据实时性分类
  1. 离线推荐
  2. 实时推荐
1.2 推荐原则分类
  1. 基于相似度的推荐
  2. 基于知识的推荐
  3. 基于模型的推荐
1.3 是否个性化分类
  1. 基于统计的推荐
  2. 个性化推荐
1.4 数据源分类
  1. 基于人口统计学的推荐
  2. 基于内容的推荐
  3. 基于协同过滤的推荐

2. 推荐系统算法

2.1 基于人口学统计
2.2 基于内容推荐
2.3 基于协同过滤
  1. 基于近邻的协同过滤
    1.1 基于用户协同过滤(User-CF)


    基于用户协同过滤.png

    1.2 基于物品协同过滤(Item-CF)


    基于物品协同过滤.png
  2. 基于模型的协同过滤
    2.1 奇异值分解(SVD)
    2.2 潜在语义分析(LSA)
    2.3 支撑向量机(SVM)

2.4 混合推荐

机器学习基础

1. 定义

2. 过程

过程.png

3.1 分类--有监督学习

3.1.1 分类算法
  1. 召回率: 所有实际为正类的数据中,被正确预测找出的比例
  2. 精确率: 所有实际为正类(关注的类)的数据中,预测正确比例
3.1.2 分类模型
  1. KNN
  2. 决策树
  3. 逻辑斯蒂回归
3.1.3 回归算法
3.1.4 回归模型算法
  1. 线性回归
  2. 非线性回归
  3. 最小二乘法:推荐算法中ALS介绍了最小二乘法,后面篇幅会有具体代码举例
3.1.5 监督学习模型评估策略
  1. 模型:总结内在规律用数学函数表示
    1.1 模型评估
    1.1.1 损失函数: 用来衡量模型测误差大小,评价损失就叫经验风险
    1.1.2 过拟合和欠拟合: 欠拟合可能是训练集太小,没有很好的捕获到信息,比如误认为绿色的就是树叶。 过拟合就是学习的太彻底,把很多噪声都学进去了,比如误认为树叶必要是椭圆形的。 所以模型的复杂度要适中
  2. 算法: 选取最优模型的具体方法
  3. 策略:选取最优模型评价准则
  1. 得到一个有限的数据集合,确定包含所有学习模型集合
  2. 确定模型选择准则----学习策略
  3. 实现求解最优模型算法---学习算法
  4. 通过学习算法选择最优模型,得到最优模型进行预测分析
3.1.6 反馈常见处理
反馈常见处理.png

3.2 分类--无监督学习

聚类模型-k均值
  1. 原始数据集合


    原始数据集合.png
  2. 选择K个随机的点,称为聚类中心(cluster centroids),K就是“K-均值”中的K,表示的是样本要进行分类的数目,在本例中K=2。我们随机地选择连个聚类中心,分别用红色的叉和蓝色的叉表示


    第一步.png
  3. 对于数据集中的每一个数据,按照距离聚类中心点的距离,将其与距离最近的中心点关联起来,组成一个类。如下图所示,与红色的聚类中心距离近的点被分为红色的类,与蓝色的聚类中心距离近的点被分为蓝色的类


    第二步.png

    3.计算每一个类中样本的平均值,将该类的聚类中心移动到平均值的位置。如下图所示,聚类中心进行了相应的移动


    第三步.png
    4.重复步骤②,将样本进行重新分类,如下图所示:
    第四步.png

    5.重复步骤③,再次移动聚类中心


    第五步.png
  4. 重复步骤②,将样本进行重分类


    第六步.png
  5. 依次类推,重复步骤②③,一直迭代,直到聚类中心不在变化

3.3 推荐协同过滤推荐

3.3.1 基于近邻协同推荐(类似分类,是否推荐)
是否推荐.png
  1. 首先获取训练集,每行数据包含多个特征和分类标签


    训练集.png
  2. 输入没有标签但有多个特征的新数据


    输入无标签.png
  3. 将新数据的每个特征与样本中每条数据对应的特征进行比较,然后提取出样本中与新数据最相似的K条数据


    对比选择相似数据.png
    结果.png
  4. 如何得到相似度:可以用欧式距离求解, 将上述训练集中的数据特征用来对应A或B的坐标,即大眼睛、高鼻梁、细腰、... 对应 a1,a2。。。,大眼睛、高鼻梁都是字符,这怎么进行计算呢? 将字符型数据转化为数值型数据以及其它对数据的预处理操作也是机器学习中的关键步骤,可以将眼睛的大小级别设为1,2,3个等级,3表示为大眼睛,1表示为小眼睛,鼻梁、身高等特征同理
    具体化.png
3.3.2 基于模型协同推荐(类似回归)

电影推荐项目

1. 架构

1.1 大数据处理流程
  1. 用户接口: 网站或者APP, 前端可以通过埋点产生数据
  2. 后端服务器: SpringBoot项目,通过打log形式产生数据
  3. 日志文件: 后端服务器集群部署,所以有可能一个后端服务有多份日志文件
  4. 日志采集:每收集一分钟,或者一点数据就放入文件,然后可以转移到flume中,或者直接通过定制api打入flume中,可以配置flume写入kafka中
  5. 数据总线: 通常由kafka等来的消息,实时数据,实时log, 写入kafka, 再由Flink等实时处理读取
  6. 实时计算: Flink等,可以封装大量业务,甚至进行机器学习,智能推荐等
  7. 数据存储:计算完存储进数据库
  8. 数据可视化:大屏展示等


    实时计算.jpg
  1. 用户接口: 网站或者APP, 前端可以通过埋点产生数据
  2. 后端服务器: SpringBoot项目,通过打log形式产生数据
  3. 日志文件: 后端服务器集群部署,所以有可能一个后端服务有多份日志文件
  4. 日志采集:可以用python脚本,或者自己写java服务,利用定时任务,将当天所有数据采集起来,用文件日志转移到flume agent监控的目录,然后flume agent可以sink到HDFS, flume agent是单个jvm进程
  5. 日志存储: 可以存储在Hadoop上用于大数据分析
  6. 日志清洗: 可以用Azkaban来进行定时调度,可以用corn定时工具调度,将HDFS文件写入另外个HDFS文件
  7. 数据加载: 将清洗后的HDFS文件放入HIVE表中,HIVE表分区,每个分区存一天数据
  8. 数据仓库: 数仓处理
  9. 数据计算: Spark计算
  10. 数据存储:计算完存储进数据库
  11. 数据可视化:大屏展示等


    离线数据仓库.jpg
1.2 系统模块设计
1.3 项目系统架构

2. 统计推荐

3. 离线推荐

代码实战
  1. Azkaban定时调度
  2. 创建一个SparkSession
  3. 从mongodb加载数据
  4. 从rating数据中提取所有的uid和mid,并去重
  5. 训练隐语义模型, 使用spark ml lib的ALS算法
  6. 基于用户和电影的隐特征,计算预测评分,得到用户的推荐列表,计算user和movie的笛卡尔积,得到一个空评分矩阵
  7. 调用model的predict方法预测评分
  8. 过滤出评分大于0的项
  9. 基于电影隐特征,计算相似度矩阵,得到电影的相似度列表
  10. 对所有电影两两计算它们的相似度,先做笛卡尔积
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix


// 基于评分数据的LFM,只需要rating数据
case class MovieRating(uid: Int, mid: Int, score: Double, timestamp: Int )

case class MongoConfig(uri:String, db:String)

// 定义一个基准推荐对象
case class Recommendation( mid: Int, score: Double )

// 定义基于预测评分的用户推荐列表
case class UserRecs( uid: Int, recs: Seq[Recommendation] )

// 定义基于LFM电影特征向量的电影相似度列表
case class MovieRecs( mid: Int, recs: Seq[Recommendation] )

// 1. Azkaban定时调度
object OfflineRecommender {

  // 定义表名和常量
  val MONGODB_RATING_COLLECTION = "Rating"

  val USER_RECS = "UserRecs"
  val MOVIE_RECS = "MovieRecs"

  val USER_MAX_RECOMMENDATION = 20

  def main(args: Array[String]): Unit = {
    println("Start")
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://localhost:27017/recommender",
      "mongo.db" -> "recommender"
    )

    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")

    // 2. 创建一个SparkSession
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._

    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))


    // 3. 从mongodb加载数据
    val ratingRDD = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_RATING_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[MovieRating]
      .rdd
      .map( rating => ( rating.uid, rating.mid, rating.score ) )    // 转化成rdd,并且去掉时间戳
      .cache()

    // 4. 从rating数据中提取所有的uid和mid,并去重
    val userRDD = ratingRDD.map(_._1).distinct()
    val movieRDD = ratingRDD.map(_._2).distinct()

    // 5. 训练隐语义模型, 使用spark ml lib的ALS算法
    val trainData = ratingRDD.map( x => Rating(x._1, x._2, x._3) )

    val (rank, iterations, lambda) = (200, 5, 0.1)
    val model = ALS.train(trainData, rank, iterations, lambda)

    // 6. 基于用户和电影的隐特征,计算预测评分,得到用户的推荐列表,计算user和movie的笛卡尔积,得到一个空评分矩阵
    val userMovies = userRDD.cartesian(movieRDD)

    // 7. 调用model的predict方法预测评分
    val preRatings = model.predict(userMovies)

    // 8.  过滤出评分大于0的项
    val userRecs = preRatings
      .filter(_.rating > 0)   
      .map(rating => ( rating.user, (rating.product, rating.rating) ) )
      .groupByKey()
      .map{
        case (uid, recs) => UserRecs( uid, recs.toList.sortWith(_._2>_._2).take(USER_MAX_RECOMMENDATION).map(x=>Recommendation(x._1, x._2)) )
      }
      .toDF()

    userRecs.write
      .option("uri", mongoConfig.uri)
      .option("collection", USER_RECS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    // 9. 基于电影隐特征,计算相似度矩阵,得到电影的相似度列表
    val movieFeatures = model.productFeatures.map{
      case (mid, features) => (mid, new DoubleMatrix(features))
    }

    // 10. 对所有电影两两计算它们的相似度,先做笛卡尔积
    val movieRecs = movieFeatures.cartesian(movieFeatures)
      .filter{
        // 把自己跟自己的配对过滤掉
        case (a, b) => a._1 != b._1
      }
      .map{
        case (a, b) => {
          val simScore = this.consinSim(a._2, b._2)
          ( a._1, ( b._1, simScore ) )
        }
      }
      // 过滤出相似度大于0.6的
      .filter(_._2._2 > 0.6)   
      .groupByKey()
      .map{
        case (mid, items) => MovieRecs( mid, items.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)) )
      }
      .toDF()
    movieRecs.write
      .option("uri", mongoConfig.uri)
      .option("collection", MOVIE_RECS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    spark.stop()

    println("Over")
  }

  // 求向量余弦相似度
  def consinSim(movie1: DoubleMatrix, movie2: DoubleMatrix):Double ={
    movie1.dot(movie2) / ( movie1.norm2() * movie2.norm2() )
  }

}

4. 实时推荐

  1. 部署可以用实时计算那一套
  2. 创建一个SparkSession
  3. 拿到streaming context,当然也可以用Flink
  4. 从Mongodb加载电影相似度矩阵数据,把它广播出去
  5. 定义kafka连接参数
    6.通过kafka创建一个DStream
  6. 把原始数据UID|MID|SCORE|TIMESTAMP 转换成评分流
  7. 继续做流式处理,核心实时算法部分
    8.1 从redis里获取当前用户最近的K次评分,保存成Array[(mid, score)]
    8.2 从相似度矩阵中取出当前电影最相似的N个电影,作为备选列表,Array[mid],数据来源于离线推荐计算的相似度
    8.3 对每个备选电影,计算推荐优先级,得到当前用户的实时推荐列表,Array[(mid, score)],computeMovieScores有具体说明,拿到备选电影和最近评分电影的相似度就可以过滤了,获取两个电影之间的相似度也是根据离线结果来的
    8.4 把推荐数据保存到mongodb
    9.开始接收和处理数据

import com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import kafka.Kafka
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

// 定义连接助手对象,序列化
object ConnHelper extends Serializable{
  lazy val jedis = new Jedis("localhost")
  lazy val mongoClient = MongoClient( MongoClientURI("mongodb://localhost:27017/recommender") )
}

case class MongoConfig(uri:String, db:String)

// 定义一个基准推荐对象
case class Recommendation( mid: Int, score: Double )

// 定义基于预测评分的用户推荐列表
case class UserRecs( uid: Int, recs: Seq[Recommendation] )

// 定义基于LFM电影特征向量的电影相似度列表
case class MovieRecs( mid: Int, recs: Seq[Recommendation] )

// 1. 部署可以用实时计算那一套
object StreamingRecommender {

  val MAX_USER_RATINGS_NUM = 20
  val MAX_SIM_MOVIES_NUM = 20
  val MONGODB_STREAM_RECS_COLLECTION = "StreamRecs"
  val MONGODB_RATING_COLLECTION = "Rating"
  val MONGODB_MOVIE_RECS_COLLECTION = "MovieRecs"

  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://localhost:27017/recommender",
      "mongo.db" -> "recommender",
      "kafka.topic" -> "recommender"
    )

    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StreamingRecommender")

    // 2. 创建一个SparkSession
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    // 3. 拿到streaming context
    val sc = spark.sparkContext
    val ssc = new StreamingContext(sc, Seconds(2))    // batch duration

    import spark.implicits._

    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))

    // 4. 加载电影相似度矩阵数据,把它广播出去
    val simMovieMatrix = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_MOVIE_RECS_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[MovieRecs]
      .rdd
      .map{ movieRecs => // 为了查询相似度方便,转换成map
        (movieRecs.mid, movieRecs.recs.map( x=> (x.mid, x.score) ).toMap )
      }.collectAsMap()

    val simMovieMatrixBroadCast = sc.broadcast(simMovieMatrix)

    // 5. 定义kafka连接参数
    val kafkaParam = Map(
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "recommender",
      "auto.offset.reset" -> "latest"
    )
    // 6.通过kafka创建一个DStream
    val kafkaStream = KafkaUtils.createDirectStream[String, String]( ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String]( Array(config("kafka.topic")), kafkaParam )
    )

    // 7. 把原始数据UID|MID|SCORE|TIMESTAMP 转换成评分流
    val ratingStream = kafkaStream.map{
      msg =>
        val attr = msg.value().split("\\|")
        ( attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt )
    }

    // 8. 继续做流式处理,核心实时算法部分
    ratingStream.foreachRDD{
      rdds => rdds.foreach{
        case (uid, mid, score, timestamp) => {
          println("rating data coming! >>>>>>>>>>>>>>>>")

          // 8.1 从redis里获取当前用户最近的K次评分,保存成Array[(mid, score)]
          val userRecentlyRatings = getUserRecentlyRating( MAX_USER_RATINGS_NUM, uid, ConnHelper.jedis )

          // 8.2 从相似度矩阵中取出当前电影最相似的N个电影,作为备选列表,Array[mid]
          val candidateMovies = getTopSimMovies( MAX_SIM_MOVIES_NUM, mid, uid, simMovieMatrixBroadCast.value )

          // 8.3 对每个备选电影,计算推荐优先级,得到当前用户的实时推荐列表,Array[(mid, score)]
          val streamRecs = computeMovieScores( candidateMovies, userRecentlyRatings, simMovieMatrixBroadCast.value )

          // 8.4 把推荐数据保存到mongodb
          saveDataToMongoDB( uid, streamRecs )
        }
      }
    }
    // 9.开始接收和处理数据
    ssc.start()

    println(">>>>>>>>>>>>>>> streaming started!")

    ssc.awaitTermination()

  }

  // redis操作返回的是java类,为了用map操作需要引入转换类
  import scala.collection.JavaConversions._

  def getUserRecentlyRating(num: Int, uid: Int, jedis: Jedis): Array[(Int, Double)] = {
    // 从redis读取数据,用户评分数据保存在 uid:UID 为key的队列里,value是 MID:SCORE
    jedis.lrange("uid:" + uid, 0, num-1)
      .map{
        item => // 具体每个评分又是以冒号分隔的两个值
          val attr = item.split("\\:")
          ( attr(0).trim.toInt, attr(1).trim.toDouble )
      }
      .toArray
  }

  /**
    * 获取跟当前电影做相似的num个电影,作为备选电影
    * @param num       相似电影的数量
    * @param mid       当前电影ID
    * @param uid       当前评分用户ID
    * @param simMovies 相似度矩阵
    * @return          过滤之后的备选电影列表
    */
  def getTopSimMovies(num: Int, mid: Int, uid: Int, simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]])
                     (implicit mongoConfig: MongoConfig): Array[Int] ={
    // 1. 从相似度矩阵中拿到所有相似的电影
    val allSimMovies = simMovies(mid).toArray

    // 2. 从mongodb中查询用户已看过的电影
    val ratingExist = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION)
      .find( MongoDBObject("uid" -> uid) )
      .toArray
      .map{
        item => item.get("mid").toString.toInt
      }

    // 3. 把看过的过滤,得到输出列表
    allSimMovies.filter( x=> ! ratingExist.contains(x._1) )
      .sortWith(_._2>_._2)
      .take(num)
      .map(x=>x._1)
  }

  def computeMovieScores(candidateMovies: Array[Int],
                         userRecentlyRatings: Array[(Int, Double)],
                         simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Array[(Int, Double)] ={
    // 定义一个ArrayBuffer,用于保存每一个备选电影的基础得分
    val scores = scala.collection.mutable.ArrayBuffer[(Int, Double)]()
    // 定义一个HashMap,保存每一个备选电影的增强减弱因子
    val increMap = scala.collection.mutable.HashMap[Int, Int]()
    val decreMap = scala.collection.mutable.HashMap[Int, Int]()

    for( candidateMovie <- candidateMovies; userRecentlyRating <- userRecentlyRatings){
      // 拿到备选电影和最近评分电影的相似度
      val simScore = getMoviesSimScore( candidateMovie, userRecentlyRating._1, simMovies )

      if(simScore > 0.7){
        // 计算备选电影的基础推荐得分
        scores += ( (candidateMovie, simScore * userRecentlyRating._2) )
        if( userRecentlyRating._2 > 3 ){
          increMap(candidateMovie) = increMap.getOrDefault(candidateMovie, 0) + 1
        } else{
          decreMap(candidateMovie) = decreMap.getOrDefault(candidateMovie, 0) + 1
        }
      }
    }
    // 根据备选电影的mid做groupby,根据公式去求最后的推荐评分
    scores.groupBy(_._1).map{
      // groupBy之后得到的数据 Map( mid -> ArrayBuffer[(mid, score)] )
      case (mid, scoreList) =>
        ( mid, scoreList.map(_._2).sum / scoreList.length + log(increMap.getOrDefault(mid, 1)) - log(decreMap.getOrDefault(mid, 1)) )
    }.toArray.sortWith(_._2>_._2)
  }

  // 获取两个电影之间的相似度
  def getMoviesSimScore(mid1: Int, mid2: Int, simMovies: scala.collection.Map[Int,
    scala.collection.immutable.Map[Int, Double]]): Double ={

    simMovies.get(mid1) match {
      case Some(sims) => sims.get(mid2) match {
        case Some(score) => score
        case None => 0.0
      }
      case None => 0.0
    }
  }

  // 求一个数的对数,利用换底公式,底数默认为10
  def log(m: Int): Double ={
    val N = 10
    math.log(m)/ math.log(N)
  }

  def saveDataToMongoDB(uid: Int, streamRecs: Array[(Int, Double)])(implicit mongoConfig: MongoConfig): Unit ={
    // 定义到StreamRecs表的连接
    val streamRecsCollection = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_STREAM_RECS_COLLECTION)

    // 如果表中已有uid对应的数据,则删除
    streamRecsCollection.findAndRemove( MongoDBObject("uid" -> uid) )
    // 将streamRecs数据存入表中
    streamRecsCollection.insert( MongoDBObject( "uid"->uid,
      "recs"-> streamRecs.map(x=>MongoDBObject( "mid"->x._1, "score"->x._2 )) ) )
  }

}

5. 基于内容推荐

  1. 适用于用户喜欢某个电影,然后找出电影相似度最高的几个,推荐给用户
  2. 创建一个SparkSession
  3. 加载数据,并作预处理
  4. 核心部分: 用TF-IDF从内容信息中提取电影特征向量,创建一个分词器,默认按空格分词,这里按照演员分词
  5. 用分词器对原始数据做转换,生成新的一列words, 现在对象里面有四个"mid", "name", "genres", "words"是按照空格分词
  6. 引入HashingTF工具,可以把一个词语序列转化成对应的词频
  7. 引入IDF工具,可以得到idf模型, 现在对象里面再加了一列叫rawFeatures,把一个词语genres序列转化成对应的词频
  8. 训练idf模型,得到每个词的逆文档频率
  9. 用模型对原数据进行处理,得到文档中每个词的tf-idf,作为新的特征向量, 再加了一列features得到逆词频
  10. 对所有电影两两计算它们的相似度,先做笛卡尔积
    10.1 把自己跟自己的配对过滤掉, _1是mid即是电影序号, _2是特征值
import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix


// 需要的数据源是电影内容信息
case class Movie(mid: Int, name: String, descri: String, timelong: String, issue: String,
                 shoot: String, language: String, genres: String, actors: String, directors: String)

case class MongoConfig(uri:String, db:String)

// 定义一个基准推荐对象
case class Recommendation( mid: Int, score: Double )

// 定义电影内容信息提取出的特征向量的电影相似度列表
case class MovieRecs( mid: Int, recs: Seq[Recommendation] )

// 1. 适用于 用户喜欢某个电影,然后找出电影相似度最高的几个,推荐给用户
object ContentRecommender {

  // 定义表名和常量
  val MONGODB_MOVIE_COLLECTION = "Movie"

  val CONTENT_MOVIE_RECS = "ContentMovieRecs"

  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://localhost:27017/recommender",
      "mongo.db" -> "recommender"
    )

    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")

    // 2. 创建一个SparkSession
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._

    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))

    // 3. 加载数据,并作预处理
    val movieTagsDF = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_MOVIE_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[Movie]
      .map(
        // 提取mid,name,genres三项作为原始内容特征,genres演员是按照|存储的,分词器默认按照空格做分词
        x => ( x.mid, x.name, x.genres.map(c=> if(c=='|') ' ' else c) )
      )
      .toDF("mid", "name", "genres")
      .cache()



    // 4. 核心部分: 用TF-IDF从内容信息中提取电影特征向量,创建一个分词器,默认按空格分词
    val tokenizer = new Tokenizer().setInputCol("genres").setOutputCol("words")

    // 5. 用分词器对原始数据做转换,生成新的一列words, 现在对象里面有四个"mid", "name", "genres", "words"是按照空格分词
    val wordsData = tokenizer.transform(movieTagsDF)

    // 6. 引入HashingTF工具,可以把一个词语序列转化成对应的词频
    val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(50)
    val featurizedData = hashingTF.transform(wordsData)

    // 7. 引入IDF工具,可以得到idf模型, 现在对象里面再加了一列叫rawFeatures,把一个词语genres序列转化成对应的词频
    val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
    // 8. 训练idf模型,得到每个词的逆文档频率
    val idfModel = idf.fit(featurizedData)
    // 9. 用模型对原数据进行处理,得到文档中每个词的tf-idf,作为新的特征向量, 再加了一列features得到逆词频
    val rescaledData = idfModel.transform(featurizedData)


    val movieFeatures = rescaledData.map(
      row => ( row.getAs[Int]("mid"), row.getAs[SparseVector]("features").toArray )
    )
      .rdd
      .map(
        x => ( x._1, new DoubleMatrix(x._2) )
      )
    movieFeatures.collect().foreach(println)

    // 10. 对所有电影两两计算它们的相似度,先做笛卡尔积
    val movieRecs = movieFeatures.cartesian(movieFeatures)
      .filter{
        // 10.1 把自己跟自己的配对过滤掉, _1是mid即是电影序号, _2是特征值
        case (a, b) => a._1 != b._1
      }
      .map{
        case (a, b) => {
          val simScore = this.consinSim(a._2, b._2)
          ( a._1, ( b._1, simScore ) )
        }
      }
      .filter(_._2._2 > 0.6)    // 过滤出相似度大于0.6的
      .groupByKey()
      .map{
        // items是item集合( b._1, simScore )对象即 电影序号 + 相似度
        case (mid, items) => MovieRecs( mid, items.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)) )
      }
      .toDF()
    movieRecs.write
      .option("uri", mongoConfig.uri)
      .option("collection", CONTENT_MOVIE_RECS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    spark.stop()
  }

  // 求向量余弦相似度
  def consinSim(movie1: DoubleMatrix, movie2: DoubleMatrix):Double ={
    movie1.dot(movie2) / ( movie1.norm2() * movie2.norm2() )
  }
}

6. 部署之Azkaban离线调度

  1. 将对于jar包和 job文件打包成zip, 然后点upload


    upload.png
  2. 这里可以直接点运行,也可以设置定时调度时间
type=command
command=/usr/local/spark/bin/spark-submit --class ***.OfflineRecommender /Users/xxx/Desktop/ideaworkspace/big_data/MovieRecommendSystem/recommender/OfflineRecommender/target/OfflineRecommender-1.0-SNAPSHOT.jar 
  1. mac启动方式:
1. cd /Users/xxx/Desktop/azkaban/azkaban-solo-server-0.1.0-SNAPSHOT
2. bin/start-solo.sh 
3. 账号azkaban密码也是 
4. http://localhost:8081/index

7. 部署之Dataworks + Maxcomputer


参考

上一篇 下一篇

猜你喜欢

热点阅读