商品相似性计算

2021-04-27  本文已影响0人  郭彦超

场景

操作

源码


import com.hankcs.hanlp.HanLP;
import scala.collection.JavaConversions._
import com.hankcs.hanlp.seg.common.Term;
import org.apache.spark.ml.feature.{Word2Vec, Tokenizer}
import org.apache.spark.ml.linalg.{DenseVector=>DV}
import org.apache.spark.ml.Pipeline
// import spark.implicits._

val df = spark.read.parquet("/user/wbb/test/part-00000-b1233115-d581-497e-b885-d6c3e355b4bd-c000.snappy.parquet")
 
case class wordFearture(title:String, wordsFearture:String)
case  class sims(i:String, j:String, sim:Double)

val wordsSet = df.rdd.map(row=>{
//   try {
    //  print(row)
     var words = ""
    
     val ls = HanLP.segment(row.getAs("title").toString())
     for(item <- ls){
        if(item.word.length>1 && !item.word.startsWith("%")){
            words = words + item.word + " "
        }
     }
     wordFearture(row.getAs[String]("title") , words)
//   }catch {
//         //如果解析报错赋予空值
//      case e:Exception=> print(e)
//   }
 })

val wordsDF = spark.createDataFrame(wordsSet)
  
val tokenizer = new Tokenizer().setInputCol("wordsFearture").setOutputCol("words")
 
val word2Vec = new Word2Vec().setInputCol("words").setOutputCol("features").setVectorSize(3000).setMinCount(3)

val pipeline = new Pipeline().setStages(Array(tokenizer, word2Vec))

val featureDF = pipeline.fit(wordsDF).transform(wordsDF)
featureDF.show()

val bitemsFeature = sc.broadcast(featureDF.select("title","features").collect())

val itemsSim = featureDF.select("title","features").flatMap{
      case row=>
           val iDense = row.getAs[String]("features").asInstanceOf[DV].toSparse
           import breeze.linalg._
           val bv1 = new SparseVector[Double](iDense.indices, iDense.values, iDense.size)
           bitemsFeature.value.map{
             case row2 =>
               val jDense = row2.getAs[String]("features").asInstanceOf[DV].toSparse
               val bv2 = new SparseVector[Double](jDense.indices, jDense.values, jDense.size)
               val ijSim = bv1.dot(bv2)/(norm(bv1) * norm(bv2))
               sims(row.getAs[String]("title"),row2.getAs[String]("title"),ijSim)
           }
}

itemsSim.orderBy(desc("sim")).show()

上述方式也可以用在推荐系统里计算用户间或物品间的相似性


val featureDF = pipeline.fit(wordsDF).transform(wordsDF)

//使用 spark MLlib 自带的计算列相似度的方法,节省代码量
val irm = new IndexedRowMatrix(featureDF.select("id","features").rdd.map{
  case row =>
  val iDense = row.getAs[String]("features").asInstanceOf[DV].toSparse
  org.apache.spark.mllib.linalg.SparseVector.fromML(iDense )
}.zipWithIndex.map {
  case (v, i) => IndexedRow(i, v) }).toCoordinateMatrix.transpose.toRowMatrix.columnSimilarities

val irmRDD  = irm.entries.map(line=>{
   (line.i,line.j,line.value)
})
上一篇下一篇

猜你喜欢

热点阅读