商品相似性计算
2021-04-27 本文已影响0人
郭彦超
场景
- 在推荐系统中基于内容相似性的商品推荐
- 论文查重系统里相似论文的比对
操作
- 对商品标题分词
- 生成词向量
- 遍历计算相似度
源码
import com.hankcs.hanlp.HanLP;
import scala.collection.JavaConversions._
import com.hankcs.hanlp.seg.common.Term;
import org.apache.spark.ml.feature.{Word2Vec, Tokenizer}
import org.apache.spark.ml.linalg.{DenseVector=>DV}
import org.apache.spark.ml.Pipeline
// import spark.implicits._
val df = spark.read.parquet("/user/wbb/test/part-00000-b1233115-d581-497e-b885-d6c3e355b4bd-c000.snappy.parquet")
case class wordFearture(title:String, wordsFearture:String)
case class sims(i:String, j:String, sim:Double)
val wordsSet = df.rdd.map(row=>{
// try {
// print(row)
var words = ""
val ls = HanLP.segment(row.getAs("title").toString())
for(item <- ls){
if(item.word.length>1 && !item.word.startsWith("%")){
words = words + item.word + " "
}
}
wordFearture(row.getAs[String]("title") , words)
// }catch {
// //如果解析报错赋予空值
// case e:Exception=> print(e)
// }
})
val wordsDF = spark.createDataFrame(wordsSet)
val tokenizer = new Tokenizer().setInputCol("wordsFearture").setOutputCol("words")
val word2Vec = new Word2Vec().setInputCol("words").setOutputCol("features").setVectorSize(3000).setMinCount(3)
val pipeline = new Pipeline().setStages(Array(tokenizer, word2Vec))
val featureDF = pipeline.fit(wordsDF).transform(wordsDF)
featureDF.show()
val bitemsFeature = sc.broadcast(featureDF.select("title","features").collect())
val itemsSim = featureDF.select("title","features").flatMap{
case row=>
val iDense = row.getAs[String]("features").asInstanceOf[DV].toSparse
import breeze.linalg._
val bv1 = new SparseVector[Double](iDense.indices, iDense.values, iDense.size)
bitemsFeature.value.map{
case row2 =>
val jDense = row2.getAs[String]("features").asInstanceOf[DV].toSparse
val bv2 = new SparseVector[Double](jDense.indices, jDense.values, jDense.size)
val ijSim = bv1.dot(bv2)/(norm(bv1) * norm(bv2))
sims(row.getAs[String]("title"),row2.getAs[String]("title"),ijSim)
}
}
itemsSim.orderBy(desc("sim")).show()
上述方式也可以用在推荐系统里计算用户间或物品间的相似性
-
效果
- 其实mllib下提供了另一种更为便捷的计算方式,不过需要对ml与mllib对象进行转换
val featureDF = pipeline.fit(wordsDF).transform(wordsDF)
//使用 spark MLlib 自带的计算列相似度的方法,节省代码量
val irm = new IndexedRowMatrix(featureDF.select("id","features").rdd.map{
case row =>
val iDense = row.getAs[String]("features").asInstanceOf[DV].toSparse
org.apache.spark.mllib.linalg.SparseVector.fromML(iDense )
}.zipWithIndex.map {
case (v, i) => IndexedRow(i, v) }).toCoordinateMatrix.transpose.toRowMatrix.columnSimilarities
val irmRDD = irm.entries.map(line=>{
(line.i,line.j,line.value)
})