业务实战场景(十四)推荐系统
思维导图
思维导图.png系列总目录
推荐系统简介
1. 推荐系统分类
推荐系统分类.png1.1 根据实时性分类
- 离线推荐
- 实时推荐
1.2 推荐原则分类
- 基于相似度的推荐
- 基于知识的推荐
- 基于模型的推荐
1.3 是否个性化分类
- 基于统计的推荐
- 个性化推荐
1.4 数据源分类
- 基于人口统计学的推荐
- 基于内容的推荐
- 基于协同过滤的推荐
2. 推荐系统算法
2.1 基于人口学统计
-
判断用户a的年龄段跟用户c相似,而用户a喜欢物品A,那给用户c推荐物品A
基于人口学统计.png
2.2 基于内容推荐
-
电影A和电影C的类型都是爱情,动作判断这两个电影相似,那用户a喜欢电影A则给用户a推荐电影C
基于内容推荐.png
2.3 基于协同过滤
- 基于内容推荐主要是利于用户评价过的物品特征,而协同过滤还可以根据其他用户的评价。协同过滤的优势在于不受限与内容质量判断,当物品内容难以获得,协同过滤还是有用武之地
-
基于近邻的协同过滤
1.1 基于用户协同过滤(User-CF)
基于用户协同过滤.png
1.2 基于物品协同过滤(Item-CF)
基于物品协同过滤.png -
基于模型的协同过滤
2.1 奇异值分解(SVD)
2.2 潜在语义分析(LSA)
2.3 支撑向量机(SVM)
2.4 混合推荐
- 实际网站的推荐系统往往都不是单纯只采用了某一种推荐的机制和策略,往往是将多个方法混合在一起,从而达到更好的推荐效果
机器学习基础
1. 定义
- 机器学习是一门人工智能的科学,该领域的主要研究对象是人工智能,特别是如何在经验学习中改善具体算法的性能
- 机器学习是对能通过经验自动改进的计算机算法的研究
- 机器学习是用数据或以往的经验,以此优化计算机程序的性能标准
- 深度学习是机器学习的一个重要分支,推荐算法是机器学习的一个重要应用
2. 过程
过程.png3.1 分类--有监督学习
- 有监督学习是提供数据并提供相应结果的机器学习过程
3.1.1 分类算法
- 输出被限制有限的离散数值,比如根据房屋特性判断某个房屋是否出售,这里是否就是离散数组
- 分类模型包含分类学习过程,学习过程利用学习方法学习一个分类器,分类过程是对已获取的分类器对新输入的进行分类。分类器性能评估:
- 召回率: 所有实际为正类的数据中,被正确预测找出的比例
- 精确率: 所有实际为正类(关注的类)的数据中,预测正确比例
3.1.2 分类模型
- 输出被限制连续数值,比如根据房屋特性判断某个房屋出售概率
- KNN
- 决策树
- 逻辑斯蒂回归
3.1.3 回归算法
- 输出连续的数值,比如根据房屋特性判断某个房屋出售概率,这里概率就是连续数值
3.1.4 回归模型算法
- 输入到输出的映射函数
- 线性回归
- 非线性回归
- 最小二乘法:推荐算法中ALS介绍了最小二乘法,后面篇幅会有具体代码举例
3.1.5 监督学习模型评估策略
- 监督学习三要素: 选出合适的模型,选出合适的评价模型策略,选出具体算法
- 模型:总结内在规律用数学函数表示
1.1 模型评估
1.1.1 损失函数: 用来衡量模型测误差大小,评价损失就叫经验风险
1.1.2 过拟合和欠拟合: 欠拟合可能是训练集太小,没有很好的捕获到信息,比如误认为绿色的就是树叶。 过拟合就是学习的太彻底,把很多噪声都学进去了,比如误认为树叶必要是椭圆形的。 所以模型的复杂度要适中 - 算法: 选取最优模型的具体方法
- 策略:选取最优模型评价准则
- 一般步骤
- 得到一个有限的数据集合,确定包含所有学习模型集合
- 确定模型选择准则----学习策略
- 实现求解最优模型算法---学习算法
- 通过学习算法选择最优模型,得到最优模型进行预测分析
3.1.6 反馈常见处理
反馈常见处理.png3.2 分类--无监督学习
- 无监督学习是提供数据不提供相应结果的机器学习过程,核心应该是密度估计和聚类分析,比如google新闻内容会分组,按照不同主题呈现给用户
- 有监督学习使用的是有类别的数据
- 除了聚类还有降维, 需要使用降维的原因是数据压缩,数据压缩不仅可以对数据进行压缩,使得数据占用较少的内存或硬盘空间,还能对学习算法进行加速
聚类模型-k均值
- 如果能对顾客数据(消费金额及购物时间段等)进行聚类,那么输出的类别将是家庭主妇或者上班族等,顾客将被表示为不同的类别,这样就可以针对不同的类别实施不同的销售策略
- 实现步骤
-
原始数据集合
原始数据集合.png -
选择K个随机的点,称为聚类中心(cluster centroids),K就是“K-均值”中的K,表示的是样本要进行分类的数目,在本例中K=2。我们随机地选择连个聚类中心,分别用红色的叉和蓝色的叉表示
第一步.png -
对于数据集中的每一个数据,按照距离聚类中心点的距离,将其与距离最近的中心点关联起来,组成一个类。如下图所示,与红色的聚类中心距离近的点被分为红色的类,与蓝色的聚类中心距离近的点被分为蓝色的类
第二步.png
3.计算每一个类中样本的平均值,将该类的聚类中心移动到平均值的位置。如下图所示,聚类中心进行了相应的移动
第三步.png
4.重复步骤②,将样本进行重新分类,如下图所示:
第四步.png
5.重复步骤③,再次移动聚类中心
第五步.png -
重复步骤②,将样本进行重分类
第六步.png - 依次类推,重复步骤②③,一直迭代,直到聚类中心不在变化
3.3 推荐协同过滤推荐
- 基于内容推荐主要是用户评价过的物品特征,协同推荐还可以利用其它用户评价过的物品内容
- 基于协同推荐解决物品内容难以获得,可以基于其它用户推荐
- 协同推荐可以推荐差异很大的物品,但是又有内在联系
3.3.1 基于近邻协同推荐(类似分类,是否推荐)
是否推荐.png- 基于用户协同: 根据所有用户对物品偏好,找出口味相同邻居,并根据近邻推荐,可以用K-近邻算法,基于K个邻居推荐
- 基于用户推荐与人口统计差别:基于人口统计学只考虑用户本身特性,基于用户协同过滤考虑用户历史偏好
- 基于物品协同推荐,类似基于用户协同过滤,使用所有用户对物品偏好,发现物品与物品相似度,根据用户历史偏好进行推荐
- 基于物品协同推荐与基于内容推荐差别: 基于内容推荐是基于物品特征本身,协同推荐则会考虑历史
- 使用场景: 基于物品协同场景是在web站点中,物品远小于用户,所以物品会稳定些。基于用户协同: 新闻推荐中新闻数量大于用户数量,这时候用户会稳定些
- knn算法举例:
-
首先获取训练集,每行数据包含多个特征和分类标签
训练集.png -
输入没有标签但有多个特征的新数据
输入无标签.png -
将新数据的每个特征与样本中每条数据对应的特征进行比较,然后提取出样本中与新数据最相似的K条数据
对比选择相似数据.png
结果.png - 如何得到相似度:可以用欧式距离求解, 将上述训练集中的数据特征用来对应A或B的坐标,即大眼睛、高鼻梁、细腰、... 对应 a1,a2。。。,大眼睛、高鼻梁都是字符,这怎么进行计算呢? 将字符型数据转化为数值型数据以及其它对数据的预处理操作也是机器学习中的关键步骤,可以将眼睛的大小级别设为1,2,3个等级,3表示为大眼睛,1表示为小眼睛,鼻梁、身高等特征同理
具体化.png
3.3.2 基于模型协同推荐(类似回归)
- 基于样本的用户偏好,训练一个推荐模型,根据用户实时偏好,进行新物品预测,计算得分
- 与基于近邻区别: 基于近邻是使用已有用户偏好,通过近邻数据预测对新数据偏好,类似分类。基于模型是训练模型,根据模型预测,类似回归
- 训练模型可以使用LFM训练隐语义: 协同过滤非常依赖历史数据,而历史数据一般是稀疏的,这就需要降维处理,分解矩阵之后得到用户和物品的隐藏特征
-
矩阵因式分解,分解出需要训练的模型,得到LFM, 并得到隐藏特征f1 f2。。。, SPARK ML中有ALS算法解决模型的数据表达,里面加入平方损失函数,并加入正则化,防止过拟合
矩阵因式分解.png
提取f1_f2.png - 模型的求解: 最小交替二乘法ALS或者随机梯度下降算法
- ALS:由于模拟的矩阵P Q都位置那就先固定一个P0, 通过损失函数求出Q,这是典型的最小二乘法问题,然后反过来固定Q0求出P,如此交替直到达到误差满足阈值,后面篇幅会有具体代码举例
电影推荐项目
- 该项目是尚硅谷的尚硅谷机器学习和推荐系统项目实战教程(初学者零基础快速入门), 资料可以在blibli对应评论里面找打, 包含项目代码,资料等
1. 架构
1.1 大数据处理流程
- 1.1.1 实时计算
- 用户接口: 网站或者APP, 前端可以通过埋点产生数据
- 后端服务器: SpringBoot项目,通过打log形式产生数据
- 日志文件: 后端服务器集群部署,所以有可能一个后端服务有多份日志文件
- 日志采集:每收集一分钟,或者一点数据就放入文件,然后可以转移到flume中,或者直接通过定制api打入flume中,可以配置flume写入kafka中
- 数据总线: 通常由kafka等来的消息,实时数据,实时log, 写入kafka, 再由Flink等实时处理读取
- 实时计算: Flink等,可以封装大量业务,甚至进行机器学习,智能推荐等
- 数据存储:计算完存储进数据库
-
数据可视化:大屏展示等
实时计算.jpg
- 1.1.2 离线数仓
- 用户接口: 网站或者APP, 前端可以通过埋点产生数据
- 后端服务器: SpringBoot项目,通过打log形式产生数据
- 日志文件: 后端服务器集群部署,所以有可能一个后端服务有多份日志文件
- 日志采集:可以用python脚本,或者自己写java服务,利用定时任务,将当天所有数据采集起来,用文件日志转移到flume agent监控的目录,然后flume agent可以sink到HDFS, flume agent是单个jvm进程
- 日志存储: 可以存储在Hadoop上用于大数据分析
- 日志清洗: 可以用Azkaban来进行定时调度,可以用corn定时工具调度,将HDFS文件写入另外个HDFS文件
- 数据加载: 将清洗后的HDFS文件放入HIVE表中,HIVE表分区,每个分区存一天数据
- 数据仓库: 数仓处理
- 数据计算: Spark计算
- 数据存储:计算完存储进数据库
-
数据可视化:大屏展示等
离线数据仓库.jpg
1.2 系统模块设计
-
项目分为 实时推荐服务,离线推荐服务,离线统计服务,内容检索服务其中各个服务细分又分为基于内容,基于协同,基于模型的推荐
系统模块设计.png
1.3 项目系统架构
- 离线部分: Azkaban调度系统将每日的HDF定时S进行清洗加载,Spark离线统计服务进行离线统计计算,Spark Ml lib机器学习中ALS实现离线推荐
- 在线部分: 从综合业务服务,一般是spring boot服务,使用flume采集到kafka, 使用spark stream(Flink可以)进行实时推荐来补充离线推荐的信息滞后性
-
近线部分: 离线,在线都有可能直接写业务数据库,综合业务服务可以从业务数据库,ES,redis读取数据
项目系统架构.png
更详细架构.jpg
2. 统计推荐
-
Azkaban定时调度,更新电影均分,个数等数据到mongodb
统计推荐.png
3. 离线推荐
- 用ALS算法训练隐语义模型,对应机器学习有监督学习中回归模型算法,当然新注册用户可能会有冷启动问题,可以让用户自己填标签然后推荐
- 计算用户推荐矩阵
- 计算电影相似度矩阵
-
也是Azkaban定时调度
离线推荐.png
代码实战
- 由于本人是JAVA出身,实例代码是Scala语言,很多地方打了Debug才知道具体含义,这里可以多Debug下,里面的集合.collect下可以得到结果
- 具体步骤:
- Azkaban定时调度
- 创建一个SparkSession
- 从mongodb加载数据
- 从rating数据中提取所有的uid和mid,并去重
- 训练隐语义模型, 使用spark ml lib的ALS算法
- 基于用户和电影的隐特征,计算预测评分,得到用户的推荐列表,计算user和movie的笛卡尔积,得到一个空评分矩阵
- 调用model的predict方法预测评分
- 过滤出评分大于0的项
- 基于电影隐特征,计算相似度矩阵,得到电影的相似度列表
- 对所有电影两两计算它们的相似度,先做笛卡尔积
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix
// 基于评分数据的LFM,只需要rating数据
case class MovieRating(uid: Int, mid: Int, score: Double, timestamp: Int )
case class MongoConfig(uri:String, db:String)
// 定义一个基准推荐对象
case class Recommendation( mid: Int, score: Double )
// 定义基于预测评分的用户推荐列表
case class UserRecs( uid: Int, recs: Seq[Recommendation] )
// 定义基于LFM电影特征向量的电影相似度列表
case class MovieRecs( mid: Int, recs: Seq[Recommendation] )
// 1. Azkaban定时调度
object OfflineRecommender {
// 定义表名和常量
val MONGODB_RATING_COLLECTION = "Rating"
val USER_RECS = "UserRecs"
val MOVIE_RECS = "MovieRecs"
val USER_MAX_RECOMMENDATION = 20
def main(args: Array[String]): Unit = {
println("Start")
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
// 2. 创建一个SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 3. 从mongodb加载数据
val ratingRDD = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[MovieRating]
.rdd
.map( rating => ( rating.uid, rating.mid, rating.score ) ) // 转化成rdd,并且去掉时间戳
.cache()
// 4. 从rating数据中提取所有的uid和mid,并去重
val userRDD = ratingRDD.map(_._1).distinct()
val movieRDD = ratingRDD.map(_._2).distinct()
// 5. 训练隐语义模型, 使用spark ml lib的ALS算法
val trainData = ratingRDD.map( x => Rating(x._1, x._2, x._3) )
val (rank, iterations, lambda) = (200, 5, 0.1)
val model = ALS.train(trainData, rank, iterations, lambda)
// 6. 基于用户和电影的隐特征,计算预测评分,得到用户的推荐列表,计算user和movie的笛卡尔积,得到一个空评分矩阵
val userMovies = userRDD.cartesian(movieRDD)
// 7. 调用model的predict方法预测评分
val preRatings = model.predict(userMovies)
// 8. 过滤出评分大于0的项
val userRecs = preRatings
.filter(_.rating > 0)
.map(rating => ( rating.user, (rating.product, rating.rating) ) )
.groupByKey()
.map{
case (uid, recs) => UserRecs( uid, recs.toList.sortWith(_._2>_._2).take(USER_MAX_RECOMMENDATION).map(x=>Recommendation(x._1, x._2)) )
}
.toDF()
userRecs.write
.option("uri", mongoConfig.uri)
.option("collection", USER_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
// 9. 基于电影隐特征,计算相似度矩阵,得到电影的相似度列表
val movieFeatures = model.productFeatures.map{
case (mid, features) => (mid, new DoubleMatrix(features))
}
// 10. 对所有电影两两计算它们的相似度,先做笛卡尔积
val movieRecs = movieFeatures.cartesian(movieFeatures)
.filter{
// 把自己跟自己的配对过滤掉
case (a, b) => a._1 != b._1
}
.map{
case (a, b) => {
val simScore = this.consinSim(a._2, b._2)
( a._1, ( b._1, simScore ) )
}
}
// 过滤出相似度大于0.6的
.filter(_._2._2 > 0.6)
.groupByKey()
.map{
case (mid, items) => MovieRecs( mid, items.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)) )
}
.toDF()
movieRecs.write
.option("uri", mongoConfig.uri)
.option("collection", MOVIE_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
spark.stop()
println("Over")
}
// 求向量余弦相似度
def consinSim(movie1: DoubleMatrix, movie2: DoubleMatrix):Double ={
movie1.dot(movie2) / ( movie1.norm2() * movie2.norm2() )
}
}
4. 实时推荐
- 计算速度要快,结果可以不是特别精确,有预先设计好的模型
-
日志触发实时更新,Flume将日志数据写到Kafka,Spark Stream或者Flink订阅Kafka的topic, 然后实时推荐从redis读取数据,并将结果写入Mongodb, 实时推荐的服务部署可以类似Flink部署
实时推荐.png - 基本原理: 用户最近的口味是相同的
- 代码实践,假如用户对某电影标记喜欢: 步骤
- 部署可以用实时计算那一套
- 创建一个SparkSession
- 拿到streaming context,当然也可以用Flink
- 从Mongodb加载电影相似度矩阵数据,把它广播出去
- 定义kafka连接参数
6.通过kafka创建一个DStream - 把原始数据UID|MID|SCORE|TIMESTAMP 转换成评分流
- 继续做流式处理,核心实时算法部分
8.1 从redis里获取当前用户最近的K次评分,保存成Array[(mid, score)]
8.2 从相似度矩阵中取出当前电影最相似的N个电影,作为备选列表,Array[mid],数据来源于离线推荐计算的相似度
8.3 对每个备选电影,计算推荐优先级,得到当前用户的实时推荐列表,Array[(mid, score)],computeMovieScores有具体说明,拿到备选电影和最近评分电影的相似度就可以过滤了,获取两个电影之间的相似度也是根据离线结果来的
8.4 把推荐数据保存到mongodb
9.开始接收和处理数据
import com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import kafka.Kafka
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
// 定义连接助手对象,序列化
object ConnHelper extends Serializable{
lazy val jedis = new Jedis("localhost")
lazy val mongoClient = MongoClient( MongoClientURI("mongodb://localhost:27017/recommender") )
}
case class MongoConfig(uri:String, db:String)
// 定义一个基准推荐对象
case class Recommendation( mid: Int, score: Double )
// 定义基于预测评分的用户推荐列表
case class UserRecs( uid: Int, recs: Seq[Recommendation] )
// 定义基于LFM电影特征向量的电影相似度列表
case class MovieRecs( mid: Int, recs: Seq[Recommendation] )
// 1. 部署可以用实时计算那一套
object StreamingRecommender {
val MAX_USER_RATINGS_NUM = 20
val MAX_SIM_MOVIES_NUM = 20
val MONGODB_STREAM_RECS_COLLECTION = "StreamRecs"
val MONGODB_RATING_COLLECTION = "Rating"
val MONGODB_MOVIE_RECS_COLLECTION = "MovieRecs"
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender",
"kafka.topic" -> "recommender"
)
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StreamingRecommender")
// 2. 创建一个SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 3. 拿到streaming context
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(2)) // batch duration
import spark.implicits._
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 4. 加载电影相似度矩阵数据,把它广播出去
val simMovieMatrix = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_MOVIE_RECS_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[MovieRecs]
.rdd
.map{ movieRecs => // 为了查询相似度方便,转换成map
(movieRecs.mid, movieRecs.recs.map( x=> (x.mid, x.score) ).toMap )
}.collectAsMap()
val simMovieMatrixBroadCast = sc.broadcast(simMovieMatrix)
// 5. 定义kafka连接参数
val kafkaParam = Map(
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "recommender",
"auto.offset.reset" -> "latest"
)
// 6.通过kafka创建一个DStream
val kafkaStream = KafkaUtils.createDirectStream[String, String]( ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String]( Array(config("kafka.topic")), kafkaParam )
)
// 7. 把原始数据UID|MID|SCORE|TIMESTAMP 转换成评分流
val ratingStream = kafkaStream.map{
msg =>
val attr = msg.value().split("\\|")
( attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt )
}
// 8. 继续做流式处理,核心实时算法部分
ratingStream.foreachRDD{
rdds => rdds.foreach{
case (uid, mid, score, timestamp) => {
println("rating data coming! >>>>>>>>>>>>>>>>")
// 8.1 从redis里获取当前用户最近的K次评分,保存成Array[(mid, score)]
val userRecentlyRatings = getUserRecentlyRating( MAX_USER_RATINGS_NUM, uid, ConnHelper.jedis )
// 8.2 从相似度矩阵中取出当前电影最相似的N个电影,作为备选列表,Array[mid]
val candidateMovies = getTopSimMovies( MAX_SIM_MOVIES_NUM, mid, uid, simMovieMatrixBroadCast.value )
// 8.3 对每个备选电影,计算推荐优先级,得到当前用户的实时推荐列表,Array[(mid, score)]
val streamRecs = computeMovieScores( candidateMovies, userRecentlyRatings, simMovieMatrixBroadCast.value )
// 8.4 把推荐数据保存到mongodb
saveDataToMongoDB( uid, streamRecs )
}
}
}
// 9.开始接收和处理数据
ssc.start()
println(">>>>>>>>>>>>>>> streaming started!")
ssc.awaitTermination()
}
// redis操作返回的是java类,为了用map操作需要引入转换类
import scala.collection.JavaConversions._
def getUserRecentlyRating(num: Int, uid: Int, jedis: Jedis): Array[(Int, Double)] = {
// 从redis读取数据,用户评分数据保存在 uid:UID 为key的队列里,value是 MID:SCORE
jedis.lrange("uid:" + uid, 0, num-1)
.map{
item => // 具体每个评分又是以冒号分隔的两个值
val attr = item.split("\\:")
( attr(0).trim.toInt, attr(1).trim.toDouble )
}
.toArray
}
/**
* 获取跟当前电影做相似的num个电影,作为备选电影
* @param num 相似电影的数量
* @param mid 当前电影ID
* @param uid 当前评分用户ID
* @param simMovies 相似度矩阵
* @return 过滤之后的备选电影列表
*/
def getTopSimMovies(num: Int, mid: Int, uid: Int, simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]])
(implicit mongoConfig: MongoConfig): Array[Int] ={
// 1. 从相似度矩阵中拿到所有相似的电影
val allSimMovies = simMovies(mid).toArray
// 2. 从mongodb中查询用户已看过的电影
val ratingExist = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION)
.find( MongoDBObject("uid" -> uid) )
.toArray
.map{
item => item.get("mid").toString.toInt
}
// 3. 把看过的过滤,得到输出列表
allSimMovies.filter( x=> ! ratingExist.contains(x._1) )
.sortWith(_._2>_._2)
.take(num)
.map(x=>x._1)
}
def computeMovieScores(candidateMovies: Array[Int],
userRecentlyRatings: Array[(Int, Double)],
simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Array[(Int, Double)] ={
// 定义一个ArrayBuffer,用于保存每一个备选电影的基础得分
val scores = scala.collection.mutable.ArrayBuffer[(Int, Double)]()
// 定义一个HashMap,保存每一个备选电影的增强减弱因子
val increMap = scala.collection.mutable.HashMap[Int, Int]()
val decreMap = scala.collection.mutable.HashMap[Int, Int]()
for( candidateMovie <- candidateMovies; userRecentlyRating <- userRecentlyRatings){
// 拿到备选电影和最近评分电影的相似度
val simScore = getMoviesSimScore( candidateMovie, userRecentlyRating._1, simMovies )
if(simScore > 0.7){
// 计算备选电影的基础推荐得分
scores += ( (candidateMovie, simScore * userRecentlyRating._2) )
if( userRecentlyRating._2 > 3 ){
increMap(candidateMovie) = increMap.getOrDefault(candidateMovie, 0) + 1
} else{
decreMap(candidateMovie) = decreMap.getOrDefault(candidateMovie, 0) + 1
}
}
}
// 根据备选电影的mid做groupby,根据公式去求最后的推荐评分
scores.groupBy(_._1).map{
// groupBy之后得到的数据 Map( mid -> ArrayBuffer[(mid, score)] )
case (mid, scoreList) =>
( mid, scoreList.map(_._2).sum / scoreList.length + log(increMap.getOrDefault(mid, 1)) - log(decreMap.getOrDefault(mid, 1)) )
}.toArray.sortWith(_._2>_._2)
}
// 获取两个电影之间的相似度
def getMoviesSimScore(mid1: Int, mid2: Int, simMovies: scala.collection.Map[Int,
scala.collection.immutable.Map[Int, Double]]): Double ={
simMovies.get(mid1) match {
case Some(sims) => sims.get(mid2) match {
case Some(score) => score
case None => 0.0
}
case None => 0.0
}
}
// 求一个数的对数,利用换底公式,底数默认为10
def log(m: Int): Double ={
val N = 10
math.log(m)/ math.log(N)
}
def saveDataToMongoDB(uid: Int, streamRecs: Array[(Int, Double)])(implicit mongoConfig: MongoConfig): Unit ={
// 定义到StreamRecs表的连接
val streamRecsCollection = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_STREAM_RECS_COLLECTION)
// 如果表中已有uid对应的数据,则删除
streamRecsCollection.findAndRemove( MongoDBObject("uid" -> uid) )
// 将streamRecs数据存入表中
streamRecsCollection.insert( MongoDBObject( "uid"->uid,
"recs"-> streamRecs.map(x=>MongoDBObject( "mid"->x._1, "score"->x._2 )) ) )
}
}
5. 基于内容推荐
- 当然还有混合推荐,混合推荐就是把种推荐加权处理,基于统计推荐 + 基于离线推荐 + 基于实时推荐 + 基于内容推荐
- 电影A的相似电影:有相同标签就可以定义为相似,这个需要定义好标签,可以用户自定义画像等
- 基于UGC的特征提取: 可以用TF-IDF,加入TF-IDF是为了防止热门标签对推荐结果影响,步骤, 可以定时调度
- 适用于用户喜欢某个电影,然后找出电影相似度最高的几个,推荐给用户
- 创建一个SparkSession
- 加载数据,并作预处理
- 核心部分: 用TF-IDF从内容信息中提取电影特征向量,创建一个分词器,默认按空格分词,这里按照演员分词
- 用分词器对原始数据做转换,生成新的一列words, 现在对象里面有四个"mid", "name", "genres", "words"是按照空格分词
- 引入HashingTF工具,可以把一个词语序列转化成对应的词频
- 引入IDF工具,可以得到idf模型, 现在对象里面再加了一列叫rawFeatures,把一个词语genres序列转化成对应的词频
- 训练idf模型,得到每个词的逆文档频率
- 用模型对原数据进行处理,得到文档中每个词的tf-idf,作为新的特征向量, 再加了一列features得到逆词频
- 对所有电影两两计算它们的相似度,先做笛卡尔积
10.1 把自己跟自己的配对过滤掉, _1是mid即是电影序号, _2是特征值
import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix
// 需要的数据源是电影内容信息
case class Movie(mid: Int, name: String, descri: String, timelong: String, issue: String,
shoot: String, language: String, genres: String, actors: String, directors: String)
case class MongoConfig(uri:String, db:String)
// 定义一个基准推荐对象
case class Recommendation( mid: Int, score: Double )
// 定义电影内容信息提取出的特征向量的电影相似度列表
case class MovieRecs( mid: Int, recs: Seq[Recommendation] )
// 1. 适用于 用户喜欢某个电影,然后找出电影相似度最高的几个,推荐给用户
object ContentRecommender {
// 定义表名和常量
val MONGODB_MOVIE_COLLECTION = "Movie"
val CONTENT_MOVIE_RECS = "ContentMovieRecs"
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
// 2. 创建一个SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 3. 加载数据,并作预处理
val movieTagsDF = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_MOVIE_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[Movie]
.map(
// 提取mid,name,genres三项作为原始内容特征,genres演员是按照|存储的,分词器默认按照空格做分词
x => ( x.mid, x.name, x.genres.map(c=> if(c=='|') ' ' else c) )
)
.toDF("mid", "name", "genres")
.cache()
// 4. 核心部分: 用TF-IDF从内容信息中提取电影特征向量,创建一个分词器,默认按空格分词
val tokenizer = new Tokenizer().setInputCol("genres").setOutputCol("words")
// 5. 用分词器对原始数据做转换,生成新的一列words, 现在对象里面有四个"mid", "name", "genres", "words"是按照空格分词
val wordsData = tokenizer.transform(movieTagsDF)
// 6. 引入HashingTF工具,可以把一个词语序列转化成对应的词频
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(50)
val featurizedData = hashingTF.transform(wordsData)
// 7. 引入IDF工具,可以得到idf模型, 现在对象里面再加了一列叫rawFeatures,把一个词语genres序列转化成对应的词频
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
// 8. 训练idf模型,得到每个词的逆文档频率
val idfModel = idf.fit(featurizedData)
// 9. 用模型对原数据进行处理,得到文档中每个词的tf-idf,作为新的特征向量, 再加了一列features得到逆词频
val rescaledData = idfModel.transform(featurizedData)
val movieFeatures = rescaledData.map(
row => ( row.getAs[Int]("mid"), row.getAs[SparseVector]("features").toArray )
)
.rdd
.map(
x => ( x._1, new DoubleMatrix(x._2) )
)
movieFeatures.collect().foreach(println)
// 10. 对所有电影两两计算它们的相似度,先做笛卡尔积
val movieRecs = movieFeatures.cartesian(movieFeatures)
.filter{
// 10.1 把自己跟自己的配对过滤掉, _1是mid即是电影序号, _2是特征值
case (a, b) => a._1 != b._1
}
.map{
case (a, b) => {
val simScore = this.consinSim(a._2, b._2)
( a._1, ( b._1, simScore ) )
}
}
.filter(_._2._2 > 0.6) // 过滤出相似度大于0.6的
.groupByKey()
.map{
// items是item集合( b._1, simScore )对象即 电影序号 + 相似度
case (mid, items) => MovieRecs( mid, items.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)) )
}
.toDF()
movieRecs.write
.option("uri", mongoConfig.uri)
.option("collection", CONTENT_MOVIE_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
spark.stop()
}
// 求向量余弦相似度
def consinSim(movie1: DoubleMatrix, movie2: DoubleMatrix):Double ={
movie1.dot(movie2) / ( movie1.norm2() * movie2.norm2() )
}
}
- 基于协同物品推荐: 可以用上面举例的KNN算法
6. 部署之Azkaban离线调度
-
大数据离线调度系统, 可以按照顺序执行流程
Azkaban历史.png - 使用方式:
1.创建Azkaban-Stat.job,编辑器输入, 其中***表示包名, xxx表示mac路径,运行前得先装spark哈
-
将对于jar包和 job文件打包成zip, 然后点upload
upload.png - 这里可以直接点运行,也可以设置定时调度时间
type=command
command=/usr/local/spark/bin/spark-submit --class ***.OfflineRecommender /Users/xxx/Desktop/ideaworkspace/big_data/MovieRecommendSystem/recommender/OfflineRecommender/target/OfflineRecommender-1.0-SNAPSHOT.jar
- mac启动方式:
1. cd /Users/xxx/Desktop/azkaban/azkaban-solo-server-0.1.0-SNAPSHOT
2. bin/start-solo.sh
3. 账号azkaban密码也是
4. http://localhost:8081/index
7. 部署之Dataworks + Maxcomputer
- 主要是阿里云的大数据平台,Dataworks转移数据,Maxcomputer计算,还可以实现按照顺序类似Azkaban的按顺序调度
- 阿里云平台可以体验,阿里平台也提供人工智能集成,但是公司并没有使用,用了spark ml机器学
- 这里也有AI学习,AI学习天地
参考
- 机器学习思维导图
- 有监督和无监督学习都各有哪些有名的算法和深度学习?
- 蚂蚁金服 ZSearch 在向量检索上的探索
- milvus向量数据库文档
- 一文入门Facebook开源向量检索框架Faiss
- opencv3与tensorflow的关系
- mongodb与mysql区别
- 日志收集组件flume和logstash对比
- ELK系列-使用flume日志收集
- 如何使用flume采集日志到kafka中
- 快速学习-电影推荐系统设计(实时推荐模块)
- 项目体系架构设计——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(四)
- PyODPS 基本操作 | 学习笔记
- PAI 平台搭建企业级个性化推荐系统 最佳实践
- 机器学习2—KNN算法(原理、实现、实例)
- 入门机器学习(十五)--无监督学习(K均值)
- 无监督学习——K-均值聚类算法对未标注数据分组
- Flume数据采集工具之agent
- Flume案例二:实时监控单个追加文件(tail -f 日志)(exec source)
- macos安装spark
- azkaban详细使用教程
- Azkaban中的一些坑
- MongoDB使用场景总结
- MongoDB 在评论中台的实践