SVD 代码实践
2019-05-25 本文已影响0人
张虾米试错
本文主要介绍tensorflow和pyspark对svd的实现,具体原理可见上篇-SVD在协同过滤推荐系统中的应用
大纲
- SVD 代码实践之tensorflow
- SVD 代码实践之pyspark
本文不介绍原理,但是仍回顾下目标函数:
本文使用的数据集是经典的电影评分数据集.
1. SVD代码实践之tensorflow
先对目标函数做下简化:
读取数据集
数据通过"\t"分隔,共有4列('user', 'item', 'rate', 'time'),但其实真正需要的是前3列。
1.1 读取数据集
import pandas as pd
import time
# read data
df = pd.read_csv('u.data', sep='\t', names=['user', 'item', 'rate', 'time'])
df["rate"] = df["rate"].astype("float")
print (df.dtypes)
msk = numpy.random.rand(len(df)) < 0.7 #产生[true,false,false,true]
df_train = df[msk]
user_indecies = [x-1 for x in df_train.user.values]
user_num = max(df_train.user.values)
item_indecies = [x-1 for x in df_train.item.values]
item_num = max(df_train.item.values)
rates = df_train.rate.values
print ("user_num: %d, item_num: %d" % (user_num, item_num) )
注意:原数据集的rate字段本来是int类型,经过试验,转换成float类型也是可以的。只是在预测的时候很奇怪,具体可见1.3训练并测试。
1.2 计算目标函数,建模型
# variables
feature_len = 10
U = tf.Variable(initial_value=tf.truncated_normal([user_num, feature_len]), name='users')
P = tf.Variable(initial_value=tf.truncated_normal([feature_len, item_num]), name='items')
result = tf.matmul(U, P)
result_flatten = tf.reshape(result, [-1])
# rating
R = tf.gather(result_flatten, user_indecies * tf.shape(result)[1] + item_indecies, name='extracting_user_rate')
# cost function
diff_op = tf.subtract(R, rates, name='trainig_diff')
diff_op_squared = tf.abs(diff_op, name="squared_difference")
base_cost = tf.reduce_sum(diff_op_squared, name="sum_squared_error")
# regularization
lda = tf.constant(.001, name='lambda')
norm_sums = tf.add(tf.reduce_sum(tf.abs(U, name='user_abs'), name='user_norm'),
tf.reduce_sum(tf.abs(P, name='item_abs'), name='item_norm'))
regularizer = tf.multiply(norm_sums, lda, 'regularizer')
# cost function
lr = tf.constant(.001, name='learning_rate')
global_step = tf.Variable(0, trainable=False)
learning_rate = tf.train.exponential_decay(lr, global_step, 10000, 0.96, staircase=True)
optimizer = tf.train.GradientDescentOptimizer(learning_rate)
training_step = optimizer.minimize(base_cost, global_step=global_step)
tf.gather是将参数中的切片收集到由索引指定的形状的张量中,所以解决了 的问题。
1.3 训练并测试
sess = tf.Session()
init = tf.global_variables_initializer()
sess.run(init)
print ("training...")
print(sess.run(R))
for i in range(2):
sess.run(training_step)
print(df.dtypes)
u, p, r = df[['user', 'item', 'rate']].values[0]
#u,p为什么会莫名其妙地变成float类型
u, p = int(u), int(p)
print(u, p, r)
rhat = tf.gather(tf.gather(result, u-1), p-1)
print("rating for user " + str(u) + " for item " + str(p) + " is " + str(r) + " and our prediction is: " + str(sess.run(rhat)))
# calculate accuracy
df_test = df[~msk] #test set
user_indecies_test = [x-1 for x in df_test.user.values]
item_indecies_test = [x-1 for x in df_test.item.values]
rates_test = df_test.rate.values
# accuracy
R_test = tf.gather(result_flatten, user_indecies_test * tf.shape(result)[1] + item_indecies_test, name='extracting_user_rate_test')
diff_op_test = tf.subtract(R_test, rates_test, name='test_diff')
diff_op_squared_test = tf.abs(diff_op, name="squared_difference_test")
cost_test = tf.div(tf.reduce_sum(tf.square(diff_op_squared_test, name="squared_difference_test"), name="sum_squared_error_test"), df_test.shape[0], name="average_error")
print(sess.run(cost_test))
当前面把rate字段转换成float类型的时候,在'u, p, r = df[['user', 'item', 'rate']].values[0]'这行代码,变成了float类型,感觉很奇怪,没想明白。
2. SVD 代码实践之pyspark
spark因为直接有mllib库,所以直接调用的API,spark的求解方式是ALS。
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
rdd = sc.textFile(data_path).map(lambda x: x.strip().split("\t"))
ratings = rdd.map(lambda x: Rating(int(x[0]), int(x[1]), float(x[2])))
rank = 10
num_iterations = 10
model = ALS.train(ratings, rank, num_iterations)
spark的ALS有很多种predict的方法:
- predict
- predictAll
- recommendUsers
- recommendProducts
- recommendProductsForUsers
- recommendUsersforProducts
更详细的可参考这篇博客Pyspark ALS and Recommendation Outputs
因为是调用API,所以操作起来比tensorflow简单。