推荐算法大数据学习

Spark LSH 近似最近邻矢量检索:LinkedInAtti

2021-03-31  本文已影响0人  xiaogp

摘要:Spark局部敏感哈希矢量检索推荐系统

使用背景

最近有个需求做百万级别实体的相关推荐,离线场景算完入库接口调用,数仓和计算引擎基于Hive和Spark,一开始设想直接老套路embedding+LSH(Spark ml下局部敏感哈希),测了几次都GG了,十分不好用,原因有以下:

不谈了,去Github搜了一个项目LinkedIn ScANNS,LinkedIn机器学习团队出品,测试了一下相当好用


LSH原理概述

简单而言,LSH就是对高维数据使用局部敏感哈希函数进行转换,映射到多个HashTable中的桶中,局部敏感哈希的特性是使得原本在高维相似的向量进行哈希映射到低维度也相似,及映射到同样的桶中,不相似的数据可能尽量避免在同一个桶,由于相似度的定义不同,局部敏感哈希函数也不同,没有通用的哈希函数(这个地方哈希函数和CNN中的卷积核函数作用类似,是一种提取特征的方式,比如欧氏距离的哈希函数就是wx+b的形式),因此先用哈希降维作为召回,然后在同一个桶下进行线性遍历求解距离即可,具体原理参考我的另一篇文章https://www.jianshu.com/p/bbf142f8ec84


Spark ml是如何实现LSH的

参考https://blog.csdn.net/shenyanasop/article/details/110454273这篇文章,简单而言Spark使用LSH先对DataFrame中的Vector列做转化生成N和hash table中的桶值,然后炸开每个hash table作为一行进行join匹配,只要有一个hash table值一样就匹配上,最后再对匹配上的数据对做两两距离计算,留下在半径阈值内的数据对,结合代码调用案例深入理解一下
首先我们导入包,创建一个含有向量的DataFrame

scala> import org.apache.spark.ml.linalg.Vectors.dense
import org.apache.spark.ml.linalg.Vectors.dense

scala> import org.apache.spark.ml.linalg.Vectors.dense
import org.apache.spark.ml.linalg.Vectors.dense

scala> import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.feature.BucketedRandomProjectionLSH

scala> val df = Seq(("a", dense(1, 0, 2)), ("b", dense(-1, 0, 5)), ("c", dense(2, -1, 2)), ("d", dense(3, 3, -1))).toDF("a", "vec")
df: org.apache.spark.sql.DataFrame = [a: string, vec: vector]

然后我们初始化一个LSH模型,设置hash_table 10个,桶长4,输出一个新的列hashes,并且fit,transform一波

scala> :paste
// Entering paste mode (ctrl-D to finish)

val brp = new BucketedRandomProjectionLSH()
      .setBucketLength(4)
      .setNumHashTables(10)
      .setInputCol("vec")
      .setOutputCol("hashes")

// Exiting paste mode, now interpreting.

brp: org.apache.spark.ml.feature.BucketedRandomProjectionLSH = brp-lsh_ea56cf7270a2
scala> val brpModel = brp.fit(df)
brpModel: org.apache.spark.ml.feature.BucketedRandomProjectionLSHModel = brp-lsh_ea56cf7270a2

scala> val hashDF = brpModel.transform(df)
hashDF: org.apache.spark.sql.DataFrame = [a: string, vec: vector ... 1 more field]

最终看一下进行hash转化之后的向量特征长什么样子,可见新生成的hashes列中生成了10个hash_table,每个hash_table中记录了向量进行局部敏感哈希之后的桶值

scala> hashDF.show(false)
+---+--------------+------------------------------------------------------------------------------+
|a  |vec           |hashes                                                                        |
+---+--------------+------------------------------------------------------------------------------+
|a  |[1.0,0.0,2.0] |[[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]  |
|b  |[-1.0,0.0,5.0]|[[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]] |
|c  |[2.0,-1.0,2.0]|[[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]|
|d  |[3.0,3.0,-1.0]|[[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]]     |
+---+--------------+------------------------------------------------------------------------------+

然后我们应用模型的join方法,自身和自身join,找到每个元素的近邻

scala> val brpDf = brpModel.approxSimilarityJoin(hashDF, hashDF, 2.0, "EuclideanDistance")
brpDf: org.apache.spark.sql.Dataset[_] = [datasetA: struct<a: string, vec: vector ... 1 more field>, datasetB: struct<a: string, hashes: array<vector> ... 1 more field> ... 1 more field]

查看计算结果

scala> brpDf.show(false)
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
|datasetA                                                                                           |datasetB                                                                                           |EuclideanDistance |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]]|[a, [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]], [1.0,0.0,2.0]]   |1.4142135623730951|
|[d, [3.0,3.0,-1.0], [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]]]     |[d, [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]], [3.0,3.0,-1.0]]     |0.0               |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]]   |[a, [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]], [1.0,0.0,2.0]]   |0.0               |
|[b, [-1.0,0.0,5.0], [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]]] |[b, [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]], [-1.0,0.0,5.0]] |0.0               |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]]   |[c, [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]], [2.0,-1.0,2.0]]|1.4142135623730951|
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]]|[c, [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]], [2.0,-1.0,2.0]]|0.0               |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+

可以看到只有a和c join上了,其他都是只有join上了自己,因为自己和自己的距离是0,hash值也一模一样,原因可能是半径阈值调小了,我们将半径调大到10.0,重新show一波

scala> val brpDf = brpModel.approxSimilarityJoin(hashDF, hashDF, 10.0, "EuclideanDistance")
brpDf: org.apache.spark.sql.Dataset[_] = [datasetA: struct<a: string, vec: vector ... 1 more field>, datasetB: struct<a: string, hashes: array<vector> ... 1 more field> ... 1 more field]

scala> brpDf.show(false)
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
|datasetA                                                                                           |datasetB                                                                                           |EuclideanDistance |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
|[d, [3.0,3.0,-1.0], [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]]]     |[a, [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]], [1.0,0.0,2.0]]   |4.69041575982343  |
|[d, [3.0,3.0,-1.0], [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]]]     |[c, [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]], [2.0,-1.0,2.0]]|5.0990195135927845|
|[d, [3.0,3.0,-1.0], [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]]]     |[b, [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]], [-1.0,0.0,5.0]] |7.810249675906654 |
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]]|[a, [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]], [1.0,0.0,2.0]]   |1.4142135623730951|
|[d, [3.0,3.0,-1.0], [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]]]     |[d, [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]], [3.0,3.0,-1.0]]     |0.0               |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]]   |[a, [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]], [1.0,0.0,2.0]]   |0.0               |
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]]|[b, [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]], [-1.0,0.0,5.0]] |4.358898943540674 |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]]   |[b, [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]], [-1.0,0.0,5.0]] |3.605551275463989 |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]]   |[d, [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]], [3.0,3.0,-1.0]]     |4.69041575982343  |
|[b, [-1.0,0.0,5.0], [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]]] |[c, [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]], [2.0,-1.0,2.0]]|4.358898943540674 |
|[b, [-1.0,0.0,5.0], [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]]] |[b, [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]], [-1.0,0.0,5.0]] |0.0               |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]]   |[c, [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]], [2.0,-1.0,2.0]]|1.4142135623730951|
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]]|[c, [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]], [2.0,-1.0,2.0]]|0.0               |
|[b, [-1.0,0.0,5.0], [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]]] |[d, [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]], [3.0,3.0,-1.0]]     |7.810249675906654 |
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]]|[d, [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]], [3.0,3.0,-1.0]]     |5.0990195135927845|
|[b, [-1.0,0.0,5.0], [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]]] |[a, [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]], [1.0,0.0,2.0]]   |3.605551275463989 |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+

可以半径设置大了之后,笛卡尔积完全展示出来了,结论就是Spark LSH在先join上之后,再做半径阈值筛选,最终计算结果显示join且在半径范围内的,但是某些情况下就算是调高半径阈值也会丢失原有数据,原因是不满足join条件,join条件是所有hash 桶值必须有至少一个相同的一对才能匹配,比如我们调小hash_table的数量,这样一条向量的桶的数量就会减少,和其他向量碰撞在一个桶的概率降低,我们将hash_table数量设置为2

scala> :paste
// Entering paste mode (ctrl-D to finish)

val brp = new BucketedRandomProjectionLSH()
      .setBucketLength(4)
      .setNumHashTables(2)
      .setInputCol("vec")
      .setOutputCol("hashes")

// Exiting paste mode, now interpreting.

brp: org.apache.spark.ml.feature.BucketedRandomProjectionLSH = brp-lsh_8d2fef7fc1f5

重新走一遍流程,最终欧式距离阈值依旧是10.0,结果如下

scala> val brpDf = brpModel.approxSimilarityJoin(hashDF, hashDF, 10.0, "EuclideanDistance")
brpDf: org.apache.spark.sql.Dataset[_] = [datasetA: struct<a: string, vec: vector ... 1 more field>, datasetB: struct<a: string, hashes: array<vector> ... 1 more field> ... 1 more field]

scala> brpDf.show(false)
+-------------------------------------+-------------------------------------+------------------+
|datasetA                             |datasetB                             |EuclideanDistance |
+-------------------------------------+-------------------------------------+------------------+
|[a, [1.0,0.0,2.0], [[-1.0], [0.0]]]  |[a, [[-1.0], [0.0]], [1.0,0.0,2.0]]  |0.0               |
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0]]] |[d, [[0.0], [0.0]], [3.0,3.0,-1.0]]  |5.0990195135927845|
|[d, [3.0,3.0,-1.0], [[0.0], [0.0]]]  |[c, [[-1.0], [0.0]], [2.0,-1.0,2.0]] |5.0990195135927845|
|[d, [3.0,3.0,-1.0], [[0.0], [0.0]]]  |[d, [[0.0], [0.0]], [3.0,3.0,-1.0]]  |0.0               |
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0]]] |[c, [[-1.0], [0.0]], [2.0,-1.0,2.0]] |0.0               |
|[b, [-1.0,0.0,5.0], [[-2.0], [-1.0]]]|[b, [[-2.0], [-1.0]], [-1.0,0.0,5.0]]|0.0               |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0]]]  |[d, [[0.0], [0.0]], [3.0,3.0,-1.0]]  |4.69041575982343  |
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0]]] |[a, [[-1.0], [0.0]], [1.0,0.0,2.0]]  |1.4142135623730951|
|[d, [3.0,3.0,-1.0], [[0.0], [0.0]]]  |[a, [[-1.0], [0.0]], [1.0,0.0,2.0]]  |4.69041575982343  |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0]]]  |[c, [[-1.0], [0.0]], [2.0,-1.0,2.0]] |1.4142135623730951|
+-------------------------------------+-------------------------------------+------------------+

可见明细有数据没有匹配上,如果不算自身和自身匹配,b彻底消失,a,b,c互相匹配到了,再仔细看一下b为啥没匹配上,他的两个hash值是-2,,1,两个hash值在同一个位次的hash_table上绝无仅有(其他都是-1,,0)因此join不上,更不要说走下一步半径过滤
接下来我们测试一下另一个重要的参数BucketLength,这个数值越大,计算的hash值离散程度越低,碰撞的概率越大,现在我们把这个值调小

scala> :paste
// Entering paste mode (ctrl-D to finish)

val brp = new BucketedRandomProjectionLSH()
      .setBucketLength(1)
      .setNumHashTables(10)
      .setInputCol("vec")
      .setOutputCol("hashes")

// Exiting paste mode, now interpreting.

brp: org.apache.spark.ml.feature.BucketedRandomProjectionLSH = brp-lsh_70cba14ac181

直接看结果,hash值里面尤其是b离散程度非常大,出现了-5,-4,-3

scala> brpDf.show(false)
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
|datasetA                                                                                           |datasetB                                                                                           |EuclideanDistance |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
|[d, [3.0,3.0,-1.0], [[2.0], [2.0], [-4.0], [1.0], [0.0], [3.0], [3.0], [-4.0], [0.0], [-3.0]]]     |[a, [[-2.0], [0.0], [0.0], [-2.0], [-2.0], [-1.0], [0.0], [-1.0], [0.0], [-2.0]], [1.0,0.0,2.0]]   |4.69041575982343  |
|[c, [2.0,-1.0,2.0], [[-2.0], [1.0], [-1.0], [-3.0], [-2.0], [-2.0], [1.0], [-2.0], [-1.0], [-2.0]]]|[b, [[-5.0], [-2.0], [3.0], [-5.0], [-5.0], [-2.0], [-2.0], [1.0], [2.0], [-4.0]], [-1.0,0.0,5.0]] |4.358898943540674 |
|[a, [1.0,0.0,2.0], [[-2.0], [0.0], [0.0], [-2.0], [-2.0], [-1.0], [0.0], [-1.0], [0.0], [-2.0]]]   |[c, [[-2.0], [1.0], [-1.0], [-3.0], [-2.0], [-2.0], [1.0], [-2.0], [-1.0], [-2.0]], [2.0,-1.0,2.0]]|1.4142135623730951|
|[a, [1.0,0.0,2.0], [[-2.0], [0.0], [0.0], [-2.0], [-2.0], [-1.0], [0.0], [-1.0], [0.0], [-2.0]]]   |[d, [[2.0], [2.0], [-4.0], [1.0], [0.0], [3.0], [3.0], [-4.0], [0.0], [-3.0]], [3.0,3.0,-1.0]]     |4.69041575982343  |
|[b, [-1.0,0.0,5.0], [[-5.0], [-2.0], [3.0], [-5.0], [-5.0], [-2.0], [-2.0], [1.0], [2.0], [-4.0]]] |[c, [[-2.0], [1.0], [-1.0], [-3.0], [-2.0], [-2.0], [1.0], [-2.0], [-1.0], [-2.0]], [2.0,-1.0,2.0]]|4.358898943540674 |
|[c, [2.0,-1.0,2.0], [[-2.0], [1.0], [-1.0], [-3.0], [-2.0], [-2.0], [1.0], [-2.0], [-1.0], [-2.0]]]|[a, [[-2.0], [0.0], [0.0], [-2.0], [-2.0], [-1.0], [0.0], [-1.0], [0.0], [-2.0]], [1.0,0.0,2.0]]   |1.4142135623730951|
|[c, [2.0,-1.0,2.0], [[-2.0], [1.0], [-1.0], [-3.0], [-2.0], [-2.0], [1.0], [-2.0], [-1.0], [-2.0]]]|[c, [[-2.0], [1.0], [-1.0], [-3.0], [-2.0], [-2.0], [1.0], [-2.0], [-1.0], [-2.0]], [2.0,-1.0,2.0]]|0.0               |
|[b, [-1.0,0.0,5.0], [[-5.0], [-2.0], [3.0], [-5.0], [-5.0], [-2.0], [-2.0], [1.0], [2.0], [-4.0]]] |[b, [[-5.0], [-2.0], [3.0], [-5.0], [-5.0], [-2.0], [-2.0], [1.0], [2.0], [-4.0]], [-1.0,0.0,5.0]] |0.0               |
|[a, [1.0,0.0,2.0], [[-2.0], [0.0], [0.0], [-2.0], [-2.0], [-1.0], [0.0], [-1.0], [0.0], [-2.0]]]   |[a, [[-2.0], [0.0], [0.0], [-2.0], [-2.0], [-1.0], [0.0], [-1.0], [0.0], [-2.0]], [1.0,0.0,2.0]]   |0.0               |
|[d, [3.0,3.0,-1.0], [[2.0], [2.0], [-4.0], [1.0], [0.0], [3.0], [3.0], [-4.0], [0.0], [-3.0]]]     |[d, [[2.0], [2.0], [-4.0], [1.0], [0.0], [3.0], [3.0], [-4.0], [0.0], [-3.0]], [3.0,3.0,-1.0]]     |0.0               |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+

从join结果来看abcd都没有完全join全,比如a和b没有一个hash table的值一样,所以结论是


ScANNS项目概述

ScANNS是Spark的近邻搜索库,它支持在余弦距离jaccard距离欧几里德距离空间内的离线大批量数据进行最近邻搜索。这个项目主要解决现有的Spark ML下LSH的不足:
(1)不支持余弦相似度
(2)数据量大了跑不起来OOM,程序失败,就算资源足够,现有算法存在各别桶数据倾斜的问题
(3)增加全量数据各自TopK搜索的功能

ScANNS的算法优化

github项目官网上写了项目相比与Spark ml的优化措施



上面这一步是将原始向量映射为多个hash table,以及每个hash table中记录的bucket值,然后使用explode炸开每一行形成(hash table索引,hash值组合)的元组,这个地方是一组hash值而不是之前一个hash值,以这个元组进行join,这个和传统的Spark ml是一样的,但是在join阶段领英工程师进行了如下优化


ScANNS工程下载

工程下载参考项目的github主页https://github.com/LinkedInAttic/scanns#understanding-the-model-parameters

git clone git@github.com:linkedin/scanns.git
cd scanns
./gradlew build # jar will be built in build/scanns_2.1*/libs

其间可能会报错没有权限,需要在个人github主页settings下设置ssh,本地也要设置,过程报错直接百度,项目构建完成生成jar包scanns_2.11-1.0.0.jar

root@ubuntu:/home/scanns/build/scanns_2.11/libs# ls
scanns_2.11-1.0.0.jar

将jar包引入Maven pom.xml依赖

<dependency>
    <groupId>com.linkedin.nn</groupId>
    <artifactId>scanns_2.11</artifactId>
    <version>2.11</version>
    <systemPath>/home/scanns/build/scanns_2.11/libs/scanns_2.11-1.0.0.jar</systemPath>
    <scope>system</scope>
</dependency>

IDEA测试导入成功

import com.linkedin.nn.algorithm.CosineSignRandomProjectionNNS

ScANNS输入要求

算法的输入是RDD[Long, org.apache.spark.ml.linalg.Vector)]. 使用rdd而不是DataFrame/Dataset的原因是保留了一些较低级别的API,允许对算法中执行的连接进行更多的控制。
一个简单的符合算法输入要求的例子

scala> val a  = sc.parallelize(Array((1L, dense(1, 2, 3)), (2L, dense(1, 0, 3))))
a: org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)] = ParallelCollectionRDD[5] at parallelize at <console>:28

也可以从DataFrame转化为算法需要的格式

scala> val df = Seq((1L, dense(1, 2, 3)), (2L, dense(2, 3, 4))).toDF("a", "b")
df: org.apache.spark.sql.DataFrame = [a: bigint, b: vector]

scala> val df2 = df.rdd.map(x => (x.getLong(0), x.get(1).asInstanceOf[org.apache.spark.ml.linalg.Vector]))
df2: org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)] = MapPartitionsRDD[4] at map at <console>:26

总结一下算法对格式有严格的要求:首先必须就传入2列,顺序要一致,然后第一列作为实体的标识必须是Long类型字符串类型不行,最后必须是RDD


代码实现部署测试

写一个简单的例子实现企业实体之间的embedding相似推荐,首先看一下输入数据



输入的数据包括id(作为算法的Long类型标识列),实体名称(作为推荐结果),16维度的embedding向量(事先离线算好)存储为txt格式写入放在HDFS上,第一步就是Spark读取数据处理算法需要的格式

import org.apache.spark.ml.linalg.Vectors.dense

def loadEntVector(spark: SparkSession, config: Broadcast[Properties]): DataFrame = {
    /*离线训练好词向量*/
    import spark.implicits._
    val df = spark.read.format("csv").option("sep", " ").option("header", false).load(config.value.getProperty("entVectorPath"))
      .toDF("id", "ent_name", "vector")
      .select($"id".cast("Long"), $"ent_name", $"vector")
      .withColumn("vector", split($"vector", ","))
      .withColumn("vector", udf((vec: scala.collection.mutable.WrappedArray[String]) => dense(vec.map(_.toDouble).toArray)).apply($"vector"))
    return df
  }

以上主要是将向量字符串转化为Array再转化为org.apache.spark.ml.linalg.Vectors.dense
准备两张表,一张表作为idMap映射,一张表作为算法输入

val vtScore = loadEntVector(spark, configProperties)
val idMapDF = vtScore.select($"id", $"ent_name")
val modelDF = vtScore.select($"id", $"vector").rdd.map(x => (x.getLong(0), x.get(1).asInstanceOf[org.apache.spark.ml.linalg.Vector]))

下一步定义模型

import com.linkedin.nn.algorithm.CosineSignRandomProjectionNNS

val model = new CosineSignRandomProjectionNNS()
      .setNumHashes(300)
      .setSignatureLength(15)
      .setJoinParallelism(200)
      .setBucketLimit(1000)
      .setShouldSampleBuckets(true)
      .setNumOutputPartitions(100)
      .createModel(16)

主要参数内容如下

接着训练模型,这里调用getSelfAllNearestNeighbors方法及输入数据的每一个实体和自身整个集合寻找最相似的TopK,numCandidates参数就是K值,join过程是将Long id替换为实体名称

val numCandidates = 50
val nbrs = model.getSelfAllNearestNeighbors(modelDF, numCandidates).toDF("id_a", "id_b", "score")
val res = nbrs
      .join(idMapDF.select($"id".as("id_a"), $"ent_name"), Seq("id_a"), "left_outer")
      .join(idMapDF.select($"id".as("id_b"), $"ent_name".as("ent_recommend")), Seq("id_b"), "left_outer")

最后一步将处理好的TopK使用Spark的collect_list算子全部组合在一起形成一个JSON数组入库

val res2 = res.withColumn("recommend", udf((x: String, y: Double) => "[" + x + y.toString + "]").apply($"ent_recommend", $"score"))
      .groupBy($"ent_name").agg(collect_list($"recommend").as("recommend"))

任务提交的时候将jar包挂在spark2-submit后面,如下

sudo -u hdfs spark2-submit \
--class ...
--master ...
--conf ...
--jars scanns_2.11-1.0.0.jar \
myproject.jar

任务执行相当快,200万实体16维向量寻找tok50,3分钟跑完
最后到库里看一下计算结果


算法调参

先对算法参数做一个说明,此处的参数相比于Spark ml有不同更加复杂

上一篇 下一篇

猜你喜欢

热点阅读