PySpark初体验!
1、引言
突发奇想,想做这样一件事,有一堆句子,每个句子都已经表示成了特征向量。当有一个新句子到来的时候,如何计算这个新句子与这一堆句子的相似性?
嗨呀,这还不简单么,循环计算,再排个序不就行了么。对,这样的想法当然没错,不过当句子的数量特别多时,效率就会下降。这不,想起来之前在公司电脑上安装的spark环境,何不利用一下,正好也可以学一下spark的基本使用,所以,准备用spark来做这件事!
spark的安装过程参照我之前写的文章:http://www.jianshu.com/p/c248b8c966e2
spark提供了许多语言的接口,这里我准备使用Python,也就是PySpark。
2、pyspark环境安装
在之前安装的基础上,我们需要把spark下的pyspark包移动到python安装路径下的site-packages下(ubuntu是dist-packages),首先,我们要找到这个路径,使用命令:
python -c "from distutils.sysconfig import get_python_lib; print get_python_lib()"
可以发现,路径在/usr/lib/python2.7/dist-packages下,
随后使用mv命令,将包移动过去:
3、Spark RDD
spark的数据类型只有一种,那就是RDD(Resilient Distributed Dataset),即弹性分布式数据集,属于一种分布式的内存系统的数据集应用。
有关Spark RDD 的应用,我后面应该会单独整理一篇帖子,这里就不写太多了。
4、代码实现
首先,我们要导入PySpark库:
from pyspark import SparkConf, SparkContext
接下来创建spark的上下文环境:
sc = SparkContext()
为实现我们想要的效果,我们首先创造一些测试数据集:我们的一堆句子定义为(id,特征向量)的二元组,并使用parallelize转换为RDD类型的数据:
data1 = sc.parallelize([(1,[1,2,4,6,8]),(2,[2,1,4,6,8]),(3,[1,1,1,1,1]),(4,[2,3,4,5,6]),(5,[3,2,5,6,4])])
data2 = sc.parallelize([[2,3,5,6,7]])
翻了很多网上的资料(可能不是最优的方法),在两个RDD上的操作有取并集(union),取交集(intersection),取差集(subtract)和取笛卡尔积(cartesian)
这里我想到了用笛卡尔积的方法,并将数据转换回python中的array数据类型,查看输出:
data_rdd = data1.cartesian(data2).collect()
取笛卡尔积
哇,可以看到,它已经变成了我们可以计算的形式了,我们可以编写一个函数,对每一条数据的两个特征向量计算余弦相似度,并返回(id,相似度)的二元组:
def cosine(t):
print (t)
x = t[0][1]
y = t[1]
convxy = 0
sumx=0
sumy=0
for i in range(len(x)):
convxy += x[i] * y[i]
sumx += x[i] ** 2
sumy += y[i] ** 2
return (t[0][0],convxy /(math.sqrt(sumx) * math.sqrt(sumy)))
最后我们将这个函数作用于data_rdd的每一条数据上就好了,类似于python的map函数:
output = data_rdd.map(cosine).collect()
最后的输出就是我们想要的形式啦,偷了个小懒,没有排序,哈哈哈,无伤大雅!
结果输出5、完整代码
完整代码如下:
#-*-coding:utf-8-*-#:
from __future__ import print_function
import sys
from operator import add
import math
from pyspark import SparkConf, SparkContext
def cosine(t):
print (t)
x = t[0][1]
y = t[1]
convxy = 0
sumx=0
sumy=0
for i in range(len(x)):
convxy += x[i] * y[i]
sumx += x[i] ** 2
sumy += y[i] ** 2
return (t[0][0],convxy /(math.sqrt(sumx) * math.sqrt(sumy)))
if __name__ == "__main__":
sc = SparkContext()
data1 = sc.parallelize([(1,[1,2,4,6,8]),(2,[2,1,4,6,8]),(3,[1,1,1,1,1]),(4,[2,3,4,5,6]),(5,[3,2,5,6,4])])
data2 = sc.parallelize([[2,3,5,6,7]])
data_rdd = data1.cartesian(data2)
output = data_rdd.map(cosine).collect()
print (output)
如果你喜欢我写的文章,可以帮忙给小编点个赞或者加个关注,我一定会互粉的!
如果大家对leetcode感兴趣,欢迎跟小编进行交流,小编微信为sxw2251,加我要写好备注哟!: