数据分析

pyspark-推荐系统-RecommenderSystem

2019-08-02  本文已影响212人  NEO_X

更多信息https://blue-shadow.top/

更多信息在Github上:https://github.com/Shadow-Hunter-X

推荐系统

自动推荐内容或产品以个性化的方式向适当的用户提供,以增强整体体验。推荐系统在术语上非常强大使用海量的数据,学会理解偏好。

对于PySpark中的“推荐系统”模块 pyspark.ml.recommendation module
官方文档链接:api/python/pyspark.ml.html#module-pyspark.ml.recommendation

spark 推荐系统的ALS 算法

使用最小二乘法的,不同拟合曲线:

不同函数的拟合曲线

推荐系统的分类

基于内容推荐
基于内容的推荐(Content-based Recommendation)是信息过滤技术的延续与发展,它是建立在项目的内容信息上作出推荐的,而不需要依据用户对项目的评价意见,更多地需要用机 器学习的方法从关于内容的特征描述的事例中得到用户的兴趣资料

协同过滤推荐
协同过滤推荐(Collaborative Filtering Recommendation)技术是推荐系统中应用最早和最为成功的技术之一。它一般采用最近邻技术,利用用户的历史喜好信息计算用户之间的距离,然后 利用目标用户的最近邻居用户对商品评价的加权评价值来预测目标用户对特定商品的喜好程度,系统从而根据这一喜好程度来对目标用户进行推荐

基于关联规则推荐
基于关联规则的推荐(Association Rule-based Recommendation)是以关联规则为基础,把已购商品作为规则头,规则体为推荐对象。关联规则挖掘可以发现不同商品在销售过程中的相关性,在零 售业中已经得到了成功的应用

基于知识推荐
基于知识的推荐(Knowledge-based Recommendation)在某种程度是可以看成是一种推理(Inference)技术,它不是建立在用户需要和偏好基础上推荐的。基于知识的方法因 它们所用的功能知识不同而有明显区别

组合推荐
由于各种推荐方法都有优缺点,所以在实际中,组合推荐(Hybrid Recommendation)经常被采用。研究和应用最多的是内容推荐和协同过滤推荐的组合。最简单的做法就是分别用基于内容的方法和协同过滤推荐方法 去产生一个推荐预测结果,然后用某方法组合其结果

基于效用推荐
基于效用的推荐(Utility-based Recommendation)是建立在对用户使用项目的效用情况上计算的,其核心问题是怎么样为每一个用户去创建一个效用函数,因此,用户资料模型很大 程度上是由系统所采用的效用函数决定的。基于效用推荐的好处是它能把非产品的属性,如提供商的可靠性(Vendor Reliability)和产品的可得性(Product Availability)等考虑到效用计算中

wiki上关于推荐系统

示例代码

MovieLens https://grouplens.org/datasets/movielens/

完整数据下载
数据样例下载

在成功获取数据,对文件内容进行说明介绍:

from pyspark.sql import SparkSession 
spark=SparkSession.builder.appName('rs').getOrCreate()

from pyspark.sql.functions import *

print('-------------查看数据情况,检测数据质量和相关的特征。即相对数据有一定的认识,对后续进行训练做准备--------------------')

df_ratings=spark.read.csv('ml-latest-small/ratings.csv',inferSchema=True,header=True)   # 读取电影评分数据
df_ratings.createOrReplaceTempView("ratings")       # 构建临时表评分表
df_movie=spark.read.csv('ml-latest-small/movies.csv',inferSchema=True,header=True)      # 读取电影数据
df_movie.createOrReplaceTempView("movies")          # 构建临时电影表,这两张表通过sql关联,得到具体电影的评分信息

df_details = spark.sql("SELECT ratings.userId , ratings.movieId , movies.title , movies.genres , ratings.rating  FROM ratings   \
          LEFT JOIN movies ON ratings.movieId = movies.movieId ")       # 两表关联,获取具体的信息
          
df_details.select('userId','title','rating').where('rating=4').show(10)

print((df_details.count(),len(df_details.columns)))                     # 查看数据规模

df_details.printSchema()                                                # 数据列信息

df_details.orderBy(rand()).show(10,False)                               # 查看数据

print('-------------- 进行数据转换,主要将类别数据,转换为可通过数值来度量------------------')

from pyspark.ml.feature import StringIndexer,IndexToString              # StringIndexer可以把字符串的列按照出现频率进行排序,将字符串转化为可度量的

stringIndexer = StringIndexer(inputCol="title", outputCol="title_new")  # 构建StringIndexer对象,设定输入列和输出列

model = stringIndexer.fit(df_details)                                   # 构建model模型

indexed = model.transform(df_details)                                   # 使用模型转换数据,讲电影名转换为数值,可以进行度量

indexed.show(10)

indexed.groupBy('title_new').count().orderBy('count',ascending=False).show(10,False)    # 查看分类的数据样式

train,test=indexed.randomSplit([0.75,0.25])                                             # 划分训练数据和测试数据

print('--------------- 使用推荐模型ALS算计 ------------------------')

from pyspark.ml.recommendation import ALS

'''
关于 ALS 的参数:maxIter 最大迭代次数  ;  regParam 表示最小二乘法中lambda值的大小 ; userCol ,itemCol 用于表征对象的标识,通过指出这两列后可以,通过它们构建起关系,通过ratingCol表示它们间的关系。构建成评分矩阵
再本例子中:useCol 是 用户ID ,itemCol 是电影名 ,ratingCol 是用户对电影的评分。
'''
rec=ALS(maxIter=10,regParam=0.01,userCol='userId',itemCol='title_new',ratingCol='rating',nonnegative=True,coldStartStrategy="drop")   

rec_model=rec.fit(train)                     # 使用模型训练数据

predicted_ratings=rec_model.transform(test)  # 应用于测试数据

predicted_ratings.printSchema()

predicted_ratings.orderBy(rand()).show(10)   # 参看应用模型预测的数据

print('------------- 引入回归评估器来度量 推荐系统 --------------')

from pyspark.ml.evaluation import RegressionEvaluator        # RegressionEvaluator 回归评估器,它期望两个输入列:预测和标签。

evaluator=RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')   # 构建回归评估器,评估准确性

rmse=evaluator.evaluate(predicted_ratings)

print('{}{}'.format("标准误差:",rmse))                      # 查看使用推荐系统后的预测的标准误差,若标准误差不是很大的话,可以进行下一步操作。

unique_movies=indexed.select('title_new').distinct()        # 筛选出所有电影,使用distinct
unique_movies.count()

all = unique_movies.alias('all')                            # 所有电影df,重命名为 all

watched_movies=indexed.filter(indexed['userId'] == 46).select('title_new').distinct()   # 查看85号用户,看过的所有电影

watched_movies.count()

no_46=watched_movies.alias('no_46')     # 46号用户看过的电影df,重命名为no_46

total_movies = all.join(no_46, all.title_new == no_46.title_new,how='left')     # 关联得出用户46没有观看评分的电影。

total_movies.show(10,False)     

remaining_movies=total_movies.where(col("no_46.title_new").isNull()).select(all.title_new).distinct()   # 46号用户,没看过电影的df

remaining_movies=remaining_movies.withColumn("userId",lit(46))          # 添加一列      

recommendations=rec_model.transform(remaining_movies).orderBy('prediction',ascending=False) 

recommendations.show(5,False)   

movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=model.labels)

final_recommendations=movie_title.transform(recommendations)

final_recommendations.show(10,False)            # 但是最后推荐给用户的预估评分都超过了5分,这是个问题

part1.png part2.png part3.png

上一篇:PySpark-ml-随机深林
下一篇:PySpark-ml-聚类

上一篇下一篇

猜你喜欢

热点阅读