Sparkspark技术干货

PySpark初体验!

2017-06-23  本文已影响283人  文哥的学习日记

1、引言

突发奇想,想做这样一件事,有一堆句子,每个句子都已经表示成了特征向量。当有一个新句子到来的时候,如何计算这个新句子与这一堆句子的相似性?
嗨呀,这还不简单么,循环计算,再排个序不就行了么。对,这样的想法当然没错,不过当句子的数量特别多时,效率就会下降。这不,想起来之前在公司电脑上安装的spark环境,何不利用一下,正好也可以学一下spark的基本使用,所以,准备用spark来做这件事!

spark的安装过程参照我之前写的文章:http://www.jianshu.com/p/c248b8c966e2
spark提供了许多语言的接口,这里我准备使用Python,也就是PySpark。

2、pyspark环境安装

在之前安装的基础上,我们需要把spark下的pyspark包移动到python安装路径下的site-packages下(ubuntu是dist-packages),首先,我们要找到这个路径,使用命令:
python -c "from distutils.sysconfig import get_python_lib; print get_python_lib()"

查看python包位置

可以发现,路径在/usr/lib/python2.7/dist-packages下,
随后使用mv命令,将包移动过去:

移动pyspark库

3、Spark RDD

spark的数据类型只有一种,那就是RDD(Resilient Distributed Dataset),即弹性分布式数据集,属于一种分布式的内存系统的数据集应用。
有关Spark RDD 的应用,我后面应该会单独整理一篇帖子,这里就不写太多了。

4、代码实现

首先,我们要导入PySpark库:

from pyspark import SparkConf, SparkContext

接下来创建spark的上下文环境:

sc = SparkContext()

为实现我们想要的效果,我们首先创造一些测试数据集:我们的一堆句子定义为(id,特征向量)的二元组,并使用parallelize转换为RDD类型的数据:

data1 = sc.parallelize([(1,[1,2,4,6,8]),(2,[2,1,4,6,8]),(3,[1,1,1,1,1]),(4,[2,3,4,5,6]),(5,[3,2,5,6,4])])
data2 = sc.parallelize([[2,3,5,6,7]])

翻了很多网上的资料(可能不是最优的方法),在两个RDD上的操作有取并集(union),取交集(intersection),取差集(subtract)和取笛卡尔积(cartesian)
这里我想到了用笛卡尔积的方法,并将数据转换回python中的array数据类型,查看输出:

data_rdd = data1.cartesian(data2).collect()
取笛卡尔积

哇,可以看到,它已经变成了我们可以计算的形式了,我们可以编写一个函数,对每一条数据的两个特征向量计算余弦相似度,并返回(id,相似度)的二元组:

def cosine(t):
    print (t)
    x = t[0][1]
    y = t[1]
    convxy = 0
    sumx=0
    sumy=0
    for i in range(len(x)):
        convxy += x[i] * y[i]
        sumx += x[i] ** 2
        sumy += y[i] ** 2
    return (t[0][0],convxy /(math.sqrt(sumx) * math.sqrt(sumy)))

最后我们将这个函数作用于data_rdd的每一条数据上就好了,类似于python的map函数:

output = data_rdd.map(cosine).collect()

最后的输出就是我们想要的形式啦,偷了个小懒,没有排序,哈哈哈,无伤大雅!

结果输出

5、完整代码

完整代码如下:

#-*-coding:utf-8-*-#:
from __future__ import print_function

import sys
from operator import add
import math

from pyspark import SparkConf, SparkContext

def cosine(t):
   print (t)
   x = t[0][1]
   y = t[1]
   convxy = 0
   sumx=0
   sumy=0
   for i in range(len(x)):
       convxy += x[i] * y[i]
       sumx += x[i] ** 2
       sumy += y[i] ** 2
   return (t[0][0],convxy /(math.sqrt(sumx) * math.sqrt(sumy)))


if __name__ == "__main__":

   sc = SparkContext()

  
   data1 = sc.parallelize([(1,[1,2,4,6,8]),(2,[2,1,4,6,8]),(3,[1,1,1,1,1]),(4,[2,3,4,5,6]),(5,[3,2,5,6,4])])
   data2 = sc.parallelize([[2,3,5,6,7]])
   data_rdd = data1.cartesian(data2)
   output = data_rdd.map(cosine).collect()
   
   print (output)

如果你喜欢我写的文章,可以帮忙给小编点个赞或者加个关注,我一定会互粉的!
如果大家对leetcode感兴趣,欢迎跟小编进行交流,小编微信为sxw2251,加我要写好备注哟!:

我的微信
上一篇下一篇

猜你喜欢

热点阅读