Spark LSH 近似最近邻矢量检索:LinkedInAtti
摘要:Spark
,局部敏感哈希
,矢量检索
,推荐系统
使用背景
最近有个需求做百万级别实体的相关推荐,离线场景算完入库接口调用,数仓和计算引擎基于Hive和Spark,一开始设想直接老套路embedding+LSH(Spark ml下局部敏感哈希),测了几次都GG了,十分不好用,原因有以下:
- 计算不稳定:Spark的LSH动不动卡着不动或者慢或者OOM,主要原因是join步骤相当消耗资源和桶内数据倾斜导致,然而在倾斜的桶内暴力搜索可能是不值得的,因为相似度数据对可能也在另一个不倾斜的桶内出现了
-
数据丢失:调用
approxSimilarityJoin
会莫名其妙的丢失实体,比如输入1000个实体做最近邻50个检索,最后只输出了200个实体的top50,这个问题不是半径太小导致的,而是哈希之后没有任何一条(hash_table,哈希值)一样可以join上的数据对,这个问题是参数设置导致的,LSH调参比较蛋疼 -
不能对所有实体输出TopK:Spark的LSH的
approxNearestNeighbors
是输出TopK,和需求完全切合,但是这个API不支持在全表操作,只能输出一个实体进行推荐,所以只能使用join方法再对join到的数据对进行排序取topK,相当浪费计算资源 -
不支持余弦相似度:Spark的
BucketedRandomProjectionLSH
不支持余弦相似度,这个影响不大,可以先做一步归一化然后用欧氏距离,但是不是很方便
不谈了,去Github搜了一个项目LinkedIn ScANNS,LinkedIn机器学习团队出品,测试了一下相当好用
LSH原理概述
简单而言,LSH就是对高维数据使用局部敏感哈希函数进行转换,映射到多个HashTable中的桶中,局部敏感哈希的特性是使得原本在高维相似的向量进行哈希映射到低维度也相似,及映射到同样的桶中,不相似的数据可能尽量避免在同一个桶,由于相似度的定义不同,局部敏感哈希函数也不同,没有通用的哈希函数(这个地方哈希函数和CNN中的卷积核函数作用类似,是一种提取特征的方式,比如欧氏距离的哈希函数就是wx+b的形式),因此先用哈希降维作为召回,然后在同一个桶下进行线性遍历求解距离即可,具体原理参考我的另一篇文章https://www.jianshu.com/p/bbf142f8ec84
Spark ml是如何实现LSH的
参考https://blog.csdn.net/shenyanasop/article/details/110454273这篇文章,简单而言Spark使用LSH先对DataFrame中的Vector列做转化生成N和hash table中的桶值,然后炸开每个hash table作为一行进行join匹配,只要有一个hash table值一样就匹配上,最后再对匹配上的数据对做两两距离计算,留下在半径阈值内的数据对,结合代码调用案例深入理解一下
首先我们导入包,创建一个含有向量的DataFrame
scala> import org.apache.spark.ml.linalg.Vectors.dense
import org.apache.spark.ml.linalg.Vectors.dense
scala> import org.apache.spark.ml.linalg.Vectors.dense
import org.apache.spark.ml.linalg.Vectors.dense
scala> import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
scala> val df = Seq(("a", dense(1, 0, 2)), ("b", dense(-1, 0, 5)), ("c", dense(2, -1, 2)), ("d", dense(3, 3, -1))).toDF("a", "vec")
df: org.apache.spark.sql.DataFrame = [a: string, vec: vector]
然后我们初始化一个LSH模型,设置hash_table 10个,桶长4,输出一个新的列hashes,并且fit,transform一波
scala> :paste
// Entering paste mode (ctrl-D to finish)
val brp = new BucketedRandomProjectionLSH()
.setBucketLength(4)
.setNumHashTables(10)
.setInputCol("vec")
.setOutputCol("hashes")
// Exiting paste mode, now interpreting.
brp: org.apache.spark.ml.feature.BucketedRandomProjectionLSH = brp-lsh_ea56cf7270a2
scala> val brpModel = brp.fit(df)
brpModel: org.apache.spark.ml.feature.BucketedRandomProjectionLSHModel = brp-lsh_ea56cf7270a2
scala> val hashDF = brpModel.transform(df)
hashDF: org.apache.spark.sql.DataFrame = [a: string, vec: vector ... 1 more field]
最终看一下进行hash转化之后的向量特征长什么样子,可见新生成的hashes列中生成了10个hash_table,每个hash_table中记录了向量进行局部敏感哈希之后的桶值
scala> hashDF.show(false)
+---+--------------+------------------------------------------------------------------------------+
|a |vec |hashes |
+---+--------------+------------------------------------------------------------------------------+
|a |[1.0,0.0,2.0] |[[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]] |
|b |[-1.0,0.0,5.0]|[[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]] |
|c |[2.0,-1.0,2.0]|[[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]|
|d |[3.0,3.0,-1.0]|[[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]] |
+---+--------------+------------------------------------------------------------------------------+
然后我们应用模型的join方法,自身和自身join,找到每个元素的近邻
scala> val brpDf = brpModel.approxSimilarityJoin(hashDF, hashDF, 2.0, "EuclideanDistance")
brpDf: org.apache.spark.sql.Dataset[_] = [datasetA: struct<a: string, vec: vector ... 1 more field>, datasetB: struct<a: string, hashes: array<vector> ... 1 more field> ... 1 more field]
查看计算结果
scala> brpDf.show(false)
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
|datasetA |datasetB |EuclideanDistance |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]]|[a, [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]], [1.0,0.0,2.0]] |1.4142135623730951|
|[d, [3.0,3.0,-1.0], [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]]] |[d, [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]], [3.0,3.0,-1.0]] |0.0 |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]] |[a, [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]], [1.0,0.0,2.0]] |0.0 |
|[b, [-1.0,0.0,5.0], [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]]] |[b, [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]], [-1.0,0.0,5.0]] |0.0 |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]] |[c, [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]], [2.0,-1.0,2.0]]|1.4142135623730951|
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]]|[c, [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]], [2.0,-1.0,2.0]]|0.0 |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
可以看到只有a和c join上了,其他都是只有join上了自己,因为自己和自己的距离是0,hash值也一模一样,原因可能是半径阈值调小了,我们将半径调大到10.0,重新show一波
scala> val brpDf = brpModel.approxSimilarityJoin(hashDF, hashDF, 10.0, "EuclideanDistance")
brpDf: org.apache.spark.sql.Dataset[_] = [datasetA: struct<a: string, vec: vector ... 1 more field>, datasetB: struct<a: string, hashes: array<vector> ... 1 more field> ... 1 more field]
scala> brpDf.show(false)
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
|datasetA |datasetB |EuclideanDistance |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
|[d, [3.0,3.0,-1.0], [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]]] |[a, [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]], [1.0,0.0,2.0]] |4.69041575982343 |
|[d, [3.0,3.0,-1.0], [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]]] |[c, [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]], [2.0,-1.0,2.0]]|5.0990195135927845|
|[d, [3.0,3.0,-1.0], [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]]] |[b, [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]], [-1.0,0.0,5.0]] |7.810249675906654 |
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]]|[a, [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]], [1.0,0.0,2.0]] |1.4142135623730951|
|[d, [3.0,3.0,-1.0], [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]]] |[d, [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]], [3.0,3.0,-1.0]] |0.0 |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]] |[a, [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]], [1.0,0.0,2.0]] |0.0 |
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]]|[b, [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]], [-1.0,0.0,5.0]] |4.358898943540674 |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]] |[b, [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]], [-1.0,0.0,5.0]] |3.605551275463989 |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]] |[d, [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]], [3.0,3.0,-1.0]] |4.69041575982343 |
|[b, [-1.0,0.0,5.0], [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]]] |[c, [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]], [2.0,-1.0,2.0]]|4.358898943540674 |
|[b, [-1.0,0.0,5.0], [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]]] |[b, [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]], [-1.0,0.0,5.0]] |0.0 |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]]] |[c, [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]], [2.0,-1.0,2.0]]|1.4142135623730951|
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]]|[c, [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]], [2.0,-1.0,2.0]]|0.0 |
|[b, [-1.0,0.0,5.0], [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]]] |[d, [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]], [3.0,3.0,-1.0]] |7.810249675906654 |
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0], [-1.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [-1.0], [-1.0]]]|[d, [[0.0], [0.0], [-1.0], [0.0], [0.0], [0.0], [0.0], [-1.0], [0.0], [-1.0]], [3.0,3.0,-1.0]] |5.0990195135927845|
|[b, [-1.0,0.0,5.0], [[-2.0], [-1.0], [0.0], [-2.0], [-2.0], [-1.0], [-1.0], [0.0], [0.0], [-1.0]]] |[a, [[-1.0], [0.0], [0.0], [-1.0], [-1.0], [-1.0], [0.0], [-1.0], [0.0], [-1.0]], [1.0,0.0,2.0]] |3.605551275463989 |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
可以半径设置大了之后,笛卡尔积完全展示出来了,结论就是Spark LSH在先join上之后,再做半径阈值筛选,最终计算结果显示join且在半径范围内的,但是某些情况下就算是调高半径阈值也会丢失原有数据,原因是不满足join条件,join条件是所有hash 桶值必须有至少一个相同的一对才能匹配,比如我们调小hash_table的数量,这样一条向量的桶的数量就会减少,和其他向量碰撞在一个桶的概率降低,我们将hash_table数量设置为2
scala> :paste
// Entering paste mode (ctrl-D to finish)
val brp = new BucketedRandomProjectionLSH()
.setBucketLength(4)
.setNumHashTables(2)
.setInputCol("vec")
.setOutputCol("hashes")
// Exiting paste mode, now interpreting.
brp: org.apache.spark.ml.feature.BucketedRandomProjectionLSH = brp-lsh_8d2fef7fc1f5
重新走一遍流程,最终欧式距离阈值依旧是10.0,结果如下
scala> val brpDf = brpModel.approxSimilarityJoin(hashDF, hashDF, 10.0, "EuclideanDistance")
brpDf: org.apache.spark.sql.Dataset[_] = [datasetA: struct<a: string, vec: vector ... 1 more field>, datasetB: struct<a: string, hashes: array<vector> ... 1 more field> ... 1 more field]
scala> brpDf.show(false)
+-------------------------------------+-------------------------------------+------------------+
|datasetA |datasetB |EuclideanDistance |
+-------------------------------------+-------------------------------------+------------------+
|[a, [1.0,0.0,2.0], [[-1.0], [0.0]]] |[a, [[-1.0], [0.0]], [1.0,0.0,2.0]] |0.0 |
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0]]] |[d, [[0.0], [0.0]], [3.0,3.0,-1.0]] |5.0990195135927845|
|[d, [3.0,3.0,-1.0], [[0.0], [0.0]]] |[c, [[-1.0], [0.0]], [2.0,-1.0,2.0]] |5.0990195135927845|
|[d, [3.0,3.0,-1.0], [[0.0], [0.0]]] |[d, [[0.0], [0.0]], [3.0,3.0,-1.0]] |0.0 |
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0]]] |[c, [[-1.0], [0.0]], [2.0,-1.0,2.0]] |0.0 |
|[b, [-1.0,0.0,5.0], [[-2.0], [-1.0]]]|[b, [[-2.0], [-1.0]], [-1.0,0.0,5.0]]|0.0 |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0]]] |[d, [[0.0], [0.0]], [3.0,3.0,-1.0]] |4.69041575982343 |
|[c, [2.0,-1.0,2.0], [[-1.0], [0.0]]] |[a, [[-1.0], [0.0]], [1.0,0.0,2.0]] |1.4142135623730951|
|[d, [3.0,3.0,-1.0], [[0.0], [0.0]]] |[a, [[-1.0], [0.0]], [1.0,0.0,2.0]] |4.69041575982343 |
|[a, [1.0,0.0,2.0], [[-1.0], [0.0]]] |[c, [[-1.0], [0.0]], [2.0,-1.0,2.0]] |1.4142135623730951|
+-------------------------------------+-------------------------------------+------------------+
可见明细有数据没有匹配上,如果不算自身和自身匹配,b彻底消失,a,b,c互相匹配到了,再仔细看一下b为啥没匹配上,他的两个hash值是-2,,1,两个hash值在同一个位次的hash_table上绝无仅有(其他都是-1,,0)因此join不上,更不要说走下一步半径过滤
接下来我们测试一下另一个重要的参数BucketLength
,这个数值越大,计算的hash值离散程度越低,碰撞的概率越大,现在我们把这个值调小
scala> :paste
// Entering paste mode (ctrl-D to finish)
val brp = new BucketedRandomProjectionLSH()
.setBucketLength(1)
.setNumHashTables(10)
.setInputCol("vec")
.setOutputCol("hashes")
// Exiting paste mode, now interpreting.
brp: org.apache.spark.ml.feature.BucketedRandomProjectionLSH = brp-lsh_70cba14ac181
直接看结果,hash值里面尤其是b离散程度非常大,出现了-5,-4,-3
scala> brpDf.show(false)
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
|datasetA |datasetB |EuclideanDistance |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
|[d, [3.0,3.0,-1.0], [[2.0], [2.0], [-4.0], [1.0], [0.0], [3.0], [3.0], [-4.0], [0.0], [-3.0]]] |[a, [[-2.0], [0.0], [0.0], [-2.0], [-2.0], [-1.0], [0.0], [-1.0], [0.0], [-2.0]], [1.0,0.0,2.0]] |4.69041575982343 |
|[c, [2.0,-1.0,2.0], [[-2.0], [1.0], [-1.0], [-3.0], [-2.0], [-2.0], [1.0], [-2.0], [-1.0], [-2.0]]]|[b, [[-5.0], [-2.0], [3.0], [-5.0], [-5.0], [-2.0], [-2.0], [1.0], [2.0], [-4.0]], [-1.0,0.0,5.0]] |4.358898943540674 |
|[a, [1.0,0.0,2.0], [[-2.0], [0.0], [0.0], [-2.0], [-2.0], [-1.0], [0.0], [-1.0], [0.0], [-2.0]]] |[c, [[-2.0], [1.0], [-1.0], [-3.0], [-2.0], [-2.0], [1.0], [-2.0], [-1.0], [-2.0]], [2.0,-1.0,2.0]]|1.4142135623730951|
|[a, [1.0,0.0,2.0], [[-2.0], [0.0], [0.0], [-2.0], [-2.0], [-1.0], [0.0], [-1.0], [0.0], [-2.0]]] |[d, [[2.0], [2.0], [-4.0], [1.0], [0.0], [3.0], [3.0], [-4.0], [0.0], [-3.0]], [3.0,3.0,-1.0]] |4.69041575982343 |
|[b, [-1.0,0.0,5.0], [[-5.0], [-2.0], [3.0], [-5.0], [-5.0], [-2.0], [-2.0], [1.0], [2.0], [-4.0]]] |[c, [[-2.0], [1.0], [-1.0], [-3.0], [-2.0], [-2.0], [1.0], [-2.0], [-1.0], [-2.0]], [2.0,-1.0,2.0]]|4.358898943540674 |
|[c, [2.0,-1.0,2.0], [[-2.0], [1.0], [-1.0], [-3.0], [-2.0], [-2.0], [1.0], [-2.0], [-1.0], [-2.0]]]|[a, [[-2.0], [0.0], [0.0], [-2.0], [-2.0], [-1.0], [0.0], [-1.0], [0.0], [-2.0]], [1.0,0.0,2.0]] |1.4142135623730951|
|[c, [2.0,-1.0,2.0], [[-2.0], [1.0], [-1.0], [-3.0], [-2.0], [-2.0], [1.0], [-2.0], [-1.0], [-2.0]]]|[c, [[-2.0], [1.0], [-1.0], [-3.0], [-2.0], [-2.0], [1.0], [-2.0], [-1.0], [-2.0]], [2.0,-1.0,2.0]]|0.0 |
|[b, [-1.0,0.0,5.0], [[-5.0], [-2.0], [3.0], [-5.0], [-5.0], [-2.0], [-2.0], [1.0], [2.0], [-4.0]]] |[b, [[-5.0], [-2.0], [3.0], [-5.0], [-5.0], [-2.0], [-2.0], [1.0], [2.0], [-4.0]], [-1.0,0.0,5.0]] |0.0 |
|[a, [1.0,0.0,2.0], [[-2.0], [0.0], [0.0], [-2.0], [-2.0], [-1.0], [0.0], [-1.0], [0.0], [-2.0]]] |[a, [[-2.0], [0.0], [0.0], [-2.0], [-2.0], [-1.0], [0.0], [-1.0], [0.0], [-2.0]], [1.0,0.0,2.0]] |0.0 |
|[d, [3.0,3.0,-1.0], [[2.0], [2.0], [-4.0], [1.0], [0.0], [3.0], [3.0], [-4.0], [0.0], [-3.0]]] |[d, [[2.0], [2.0], [-4.0], [1.0], [0.0], [3.0], [3.0], [-4.0], [0.0], [-3.0]], [3.0,3.0,-1.0]] |0.0 |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+------------------+
从join结果来看abcd都没有完全join全,比如a和b没有一个hash table的值一样,所以结论是
-
BucketLength
越大,向量映射在同一个桶的概率越大,召回地越多,计算量大,可以降低假阴,但是也提高了假阳 -
NumHashTables
越大,向量映射到的桶选择就多,如果只要求有一个桶一样就召回,则这个值越大召回就越多,计算量就越大,可以降低假阴,但是也提高了假阳 - 召回(准确率)和计算量的取舍:如果需要计算的结果不漏,且准确率高,那么必然要越接近全表扫描,即创造更多的召回,可以调大桶长和hash_table的个数,但是计算量会变大消耗资源且容易桶内倾斜OOM,如果为了降低计算量调低参数,又容易召回不出算不出近邻,索引引入下面的主角ScANNS
ScANNS项目概述
ScANNS是Spark的近邻搜索库,它支持在余弦距离
、jaccard距离
和欧几里德距离
空间内的离线大批量数据进行最近邻搜索。这个项目主要解决现有的Spark ML下LSH的不足:
(1)不支持余弦相似度
(2)数据量大了跑不起来OOM,程序失败,就算资源足够,现有算法存在各别桶数据倾斜的问题
(3)增加全量数据各自TopK搜索的功能
ScANNS的算法优化
github项目官网上写了项目相比与Spark ml的优化措施
上面这一步是将原始向量映射为多个hash table,以及每个hash table中记录的bucket值,然后使用explode炸开每一行形成(hash table索引,hash值组合)的元组,这个地方是一组hash值而不是之前一个hash值,以这个元组进行join,这个和传统的Spark ml是一样的,但是在join阶段领英工程师进行了如下优化
- 对bucket join列进行hash编码:由于(index,hash bucket)元组由一个整数索引和多个散列值组成,因此要来回移动元组可能会非常昂贵。我们最终关心的只是索引匹配和散列值匹配。我们不关心索引或散列本身的实际值。因此,我们使用散列技巧将这个元组简单地散列为整数。虽然这保证了相同的元组被映射到相同的整数,但也可能存在冲突,不相等的哈希桶项可能会得到相同的整数值。然而,这并不影响我们算法的“正确性”,它只是增加了我们在暴力步骤中需要检查的元素的数量。
- 自定义连接join优化:由于LSH的hash函数具有随机性,以及可能数据源本身分布的问题,会导致形成某某些桶分布着大量的数据,形成桶倾斜,并且里面大量的数据可能并不值得暴力搜索。领英做了两个优化,第一基于budkct id进行自定义分区join,第二对于桶倾斜,设置bucket limit参数,对于超过limit的倾斜桶,里面的每个元素只和该桶下随机bucket limit数量的实体进行距离计算,而不是所有都计算
- topQueue策略:我们还使用了一个定制的topQueue,它是scala的PriorityQueue的包装器,可以容纳的元素总数是常量。这用于返回前k个最近邻居,而不是阈值距离内的邻居
- 数据对处理成迭代器:在一个bucket中,当通过蛮力返回候选对象时,将内存中的所有对具体化是一种浪费,因为对的数量可能相当大。相反,我们通过构建一个自定义迭代器来处理bucket,从而在按需的基础上生成pair。
ScANNS工程下载
工程下载参考项目的github主页https://github.com/LinkedInAttic/scanns#understanding-the-model-parameters
git clone git@github.com:linkedin/scanns.git
cd scanns
./gradlew build # jar will be built in build/scanns_2.1*/libs
其间可能会报错没有权限,需要在个人github主页settings下设置ssh,本地也要设置,过程报错直接百度,项目构建完成生成jar包scanns_2.11-1.0.0.jar
root@ubuntu:/home/scanns/build/scanns_2.11/libs# ls
scanns_2.11-1.0.0.jar
将jar包引入Maven pom.xml依赖
<dependency>
<groupId>com.linkedin.nn</groupId>
<artifactId>scanns_2.11</artifactId>
<version>2.11</version>
<systemPath>/home/scanns/build/scanns_2.11/libs/scanns_2.11-1.0.0.jar</systemPath>
<scope>system</scope>
</dependency>
IDEA测试导入成功
import com.linkedin.nn.algorithm.CosineSignRandomProjectionNNS
ScANNS输入要求
算法的输入是RDD[Long, org.apache.spark.ml.linalg.Vector)]
. 使用rdd而不是DataFrame/Dataset的原因是保留了一些较低级别的API,允许对算法中执行的连接进行更多的控制。
一个简单的符合算法输入要求的例子
scala> val a = sc.parallelize(Array((1L, dense(1, 2, 3)), (2L, dense(1, 0, 3))))
a: org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)] = ParallelCollectionRDD[5] at parallelize at <console>:28
也可以从DataFrame转化为算法需要的格式
scala> val df = Seq((1L, dense(1, 2, 3)), (2L, dense(2, 3, 4))).toDF("a", "b")
df: org.apache.spark.sql.DataFrame = [a: bigint, b: vector]
scala> val df2 = df.rdd.map(x => (x.getLong(0), x.get(1).asInstanceOf[org.apache.spark.ml.linalg.Vector]))
df2: org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)] = MapPartitionsRDD[4] at map at <console>:26
总结一下算法对格式有严格的要求:首先必须就传入2列,顺序要一致,然后第一列作为实体的标识必须是Long类型
,字符串类型不行,最后必须是RDD
代码实现部署测试
写一个简单的例子实现企业实体之间的embedding相似推荐,首先看一下输入数据
输入的数据包括id(作为算法的Long类型标识列),实体名称(作为推荐结果),16维度的embedding向量(事先离线算好)存储为txt格式写入放在HDFS上,第一步就是Spark读取数据处理算法需要的格式
import org.apache.spark.ml.linalg.Vectors.dense
def loadEntVector(spark: SparkSession, config: Broadcast[Properties]): DataFrame = {
/*离线训练好词向量*/
import spark.implicits._
val df = spark.read.format("csv").option("sep", " ").option("header", false).load(config.value.getProperty("entVectorPath"))
.toDF("id", "ent_name", "vector")
.select($"id".cast("Long"), $"ent_name", $"vector")
.withColumn("vector", split($"vector", ","))
.withColumn("vector", udf((vec: scala.collection.mutable.WrappedArray[String]) => dense(vec.map(_.toDouble).toArray)).apply($"vector"))
return df
}
以上主要是将向量字符串转化为Array再转化为org.apache.spark.ml.linalg.Vectors.dense
准备两张表,一张表作为idMap映射,一张表作为算法输入
val vtScore = loadEntVector(spark, configProperties)
val idMapDF = vtScore.select($"id", $"ent_name")
val modelDF = vtScore.select($"id", $"vector").rdd.map(x => (x.getLong(0), x.get(1).asInstanceOf[org.apache.spark.ml.linalg.Vector]))
下一步定义模型
import com.linkedin.nn.algorithm.CosineSignRandomProjectionNNS
val model = new CosineSignRandomProjectionNNS()
.setNumHashes(300)
.setSignatureLength(15)
.setJoinParallelism(200)
.setBucketLimit(1000)
.setShouldSampleBuckets(true)
.setNumOutputPartitions(100)
.createModel(16)
主要参数内容如下
- setNumHashes
- setSignatureLength
- setJoinParallelism
- setBucketLimit
- setShouldSampleBuckets
- setNumOutputPartitions
- createModel
接着训练模型,这里调用getSelfAllNearestNeighbors
方法及输入数据的每一个实体和自身整个集合寻找最相似的TopK,numCandidates
参数就是K值,join过程是将Long id替换为实体名称
val numCandidates = 50
val nbrs = model.getSelfAllNearestNeighbors(modelDF, numCandidates).toDF("id_a", "id_b", "score")
val res = nbrs
.join(idMapDF.select($"id".as("id_a"), $"ent_name"), Seq("id_a"), "left_outer")
.join(idMapDF.select($"id".as("id_b"), $"ent_name".as("ent_recommend")), Seq("id_b"), "left_outer")
最后一步将处理好的TopK使用Spark的collect_list
算子全部组合在一起形成一个JSON数组入库
val res2 = res.withColumn("recommend", udf((x: String, y: Double) => "[" + x + y.toString + "]").apply($"ent_recommend", $"score"))
.groupBy($"ent_name").agg(collect_list($"recommend").as("recommend"))
任务提交的时候将jar包挂在spark2-submit后面,如下
sudo -u hdfs spark2-submit \
--class ...
--master ...
--conf ...
--jars scanns_2.11-1.0.0.jar \
myproject.jar
任务执行相当快,200万实体16维向量寻找tok50,3分钟跑完
最后到库里看一下计算结果
算法调参
先对算法参数做一个说明,此处的参数相比于Spark ml有不同更加复杂
-
bucketWidth
这个参数是针对欧式距离的LSH所需要的参数,较大的桶长会降低假阴率,如果输入向量是标准化的,1-10
倍的bucketWidth应该是一个合理的值。 -
numHashes
,signatureLength
numHashes就是hash table数量,数量越大容错性越好,可以降低模型的不稳定性,但是计算量增大, signatureLength越大,假正率越低,同时也会提高假阴率,也就是说这个值越大模型对落入同一个桶的要求越严格,这个值越大召回越少,后期需要暴力求解的数据量就越少 -
joinParallelism
联接的并行性控制每个联接分区/联接任务将处理多少数据。考虑到数据集的大小,您希望这是一个合理的大小。在联接中处理的数据集是“分解”数据集,因此在原始数据集本身很大(例如,数千万到数亿项)的情况下,您需要将并行度设置为相当大的值,例如几万甚至几十万。spark中的任务创建和管理会有开销,所以这个值不是越大越好 -
bucketLimit
,shouldSampleBuckets
bucket limit对于解决前面提到的bucket歪斜问题至关重要。当一个bucket包含的项目超过此参数设置的限制时,可以通过适当地设置shouldSampleBucket布尔参数进行选择。无论哪种情况,我们都将丢弃桶中的元素。如果shouldSampleBucket设置为true,则将从传入流中对bucketLimit项目数进行采样。如果设置为false,将保留第一个bucketLimit项目数,而忽略其余项目数。这里的基本原理是,如果我们错过了这个桶中的高相似度邻居,考虑到它们的高相似度,它们最终在另一个没有倾斜的桶中匹配的可能性很大。 -
numOutputPartitions
由于连接的并行性可能很高,因此连接操作生成的输出将具有非常多的分区,即使其大小不大。设置numOutputPartitions将连接的输出重新分区到给定数量的分区中,这样,如果用户试图将这个输出RDD写回文件系统,那么它将被拆分成的文件数是可处理的。