Network Embedding

什么是Network Embedding?

\frac{\partial O_{2}}{\partial \vec{u}_{i}} = -w_{ij}[\vec{u}_{j}^{'}(1-\sigma(\vec{u}_{j}^{'T}\cdot \vec{u}_{i})) -\sum_{k=1}^{K}\vec{u}_{k}^{'}(\vec{u}_{k}^{'T}\cdot \vec{u}_{i})]

\frac{\partial O_{2}}{\partial \vec{u}_{j}^{'}} = -w_{ij}\vec{u}_{i}[1-\sigma(\vec{u}_{j}^{'T}\cdot \vec{u}_{i})]

\frac{\partial O_{2}}{\partial \vec{u}_{k}^{'}} = w_{ij}\vec{u}_{i}\sigma(\vec{u}_{k}^{'T}\cdot \vec{u}_{i})

Update parameter \vec{u}_{i},\vec{u}_{j}^{'},\vec{u}_{k}^{'}:

\vec{u}_{i} = \vec{u}_{i} - \rho \frac{\partial O_{2}}{\partial \vec{u}_{i}}

\vec{u}_{j}^{'} = \vec{u}_{j}^{'} \rho \frac{\partial O_{2}}{\partial \vec{u}_{j}^{'}}

\vec{u}_{k}^{'} = \vec{u}_{k}^{'} - \rho \frac{\partial O_{2}}{\partial \vec{u}_{k}^{'}}

The above is the result of optimizing O_{2}, and the obtained U is the result of the second-order similarity. The optimization of O_{1} is similar to optimization of O_{2}, only one variable U needs to be updated. Just change \vec{u}_{j}^{'} to


The objective function is not convex, and we separate the
optimization to four subproblems and iteratively optimize them, which guarantees each subproblem converges to the local minima.

objective function:
\min_{M,U,H,C}=||S-MU||_{F}^{2}+\alpha||H-UC^{T}||_{F}^{2}-\beta tr(H^{T}BH)

s.t.,M\geq 0,U\geq0,H\geq,C\geq,tr(H^{T}H)=n

M-subproblem: With other parameters in objective function fixed leads to a standard NMF formulation,the updating rule for M is:

M \leftarrow M \odot \frac{SU}{MU^{T}U}

U-subproblem: Updating U with other parameters in objective function
fixed leads to a joint NMF problem,the updating rule is:

U \leftarrow U \odot \frac{S^{T}M+\alpha HC}{U(M^{T}M+\alpha C^{T}C)}

C-subproblem: Updating C with other parameters in objective function
fixed also leads to a standard NMF formulation,the updating rule of C is:

C \leftarrow C \odot \frac{H^{T}U}{CU^{T}U}

H-subproblem: This is the fixed point equation that the solution must satisfy at convergence. Given an initial value of H, the successive updating rule of H is:

H \leftarrow H \odot \sqrt{ \frac{-w\beta B_{1}H+\sqrt{ \bigtriangleup}}{8\lambda HH^{T}H}}

where \bigtriangleup = 2\beta(B_{1}H) \odot 2\beta(B_{1}H) + 16\lambda(HH^{T}H)\odot(2\beta AH+2\alpha UC^{T}+(4\lambda - 2\alpha)H)


LINE-O2 Spark实现

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import breeze.linalg._
import breeze.numerics._
import breeze.stats.distributions.Rand
import scala.math._

object LINE {

    def RandList(Range:Int,num:Int) : List[Int] = {
        var resultList:List[Int]=Nil
        while (resultList.length < num){
            val randomNum = (new util.Random).nextInt(Range)
            if(!resultList.exists(s => s==randomNum )){
        return resultList

    def RandNumber(Range:Int) : Int = {
        val randomNum = (new util.Random).nextInt(Range)
        return randomNum

    def Sigmoid(In:Double): Double = {
        var Out:Double = 1.0/(math.exp(-1.0*In)+1)
        return Out

    def main(args: Array[String]) {
        if (args.length < 4) {
            System.err.println("Usage: LINE <Adjacent Matrix> <Adjacent Edge> <Negative Sample> <dimension>")

        val NS = args(2).toInt
        println("Negative Sample: "+NS)

        val Dim = args(3).toInt
        println("Embedding dimension: "+Dim)

        val conf = new SparkConf().setAppName("LINE")
        val sc = new SparkContext(conf)

        val InputFile = sc.textFile(args(0),3)
        val EgdeFile = sc.textFile(args(1),3)

        val InputFileCount = InputFile.count().toInt
        println("InputFileCount(number of lines): "+InputFileCount)

        val sample_rate : Double = 0.1

        val HashTableSize: Int = 50000
        println("HashTableSize: "+HashTableSize)

        //LINE O_2 的二阶相似度变量 
        var U_vertex = DenseMatrix.rand(InputFileCount, Dim, Rand.uniform)
        var U_context = DenseMatrix.rand(InputFileCount, Dim, Rand.uniform)

        val Adjacent = => line.split(",")).map(splitline => => word.toDouble))
        val EgdeSet = => line.split(",")).map(splitline => => word.toDouble))

        //当数据量变大,collect操作将会有崩溃 待优化点1
        val AdjacentCollect = Adjacent.collect()

        val rows = AdjacentCollect.length
        val cols = AdjacentCollect(0).length

        val flattenAdjacent = AdjacentCollect.flatten

        //邻接矩阵转为 breeze 矩阵
        val AdjacentMatrix = new DenseMatrix(cols,rows,flattenAdjacent).t
        // Adjacent.foreach{
        //  rdd => println(rdd.toList)
        // }

        val VertexDegree = => line.reduce((x,y) => x+y))

        var SumOfDegree = VertexDegree.reduce((x,y)=>x+y)

        //var SumOfDegree = sc.accumulator(0)
        //VertexDegree.foreach(x => SumOfDegree += x)  

        val SmoothProbability = => degree/SumOfDegree).map(math.pow(_,0.75))

        val p : Array[Double] = SmoothProbability.collect()
        val CumulativeProbability : Array[Double] = new Array[Double](InputFileCount)
        for(i <- 0 to InputFileCount-1) {
            var inner_sum : Double = 0.0
            for(j <- 0 to i){
                inner_sum = inner_sum + p(j)
            CumulativeProbability(i) = inner_sum

        val HashProbability : Array[Int] = new Array[Int](InputFileCount)
        var max_cpro = CumulativeProbability(InputFileCount-1)
        for(i <- 0  to InputFileCount-1)
            HashProbability(i) = ((CumulativeProbability(i)/max_cpro)*HashTableSize).toInt
        val HashTable : Array[Int] = new Array[Int](HashTableSize+1)

        for(i <- 0 to InputFileCount-1) {
            if (i==0) {
                var start : Int = 0
                var end : Int = HashProbability(1)
                for(j <- start to end) {
                    HashTable(j) = i
            else {
                var start : Int = HashProbability(i-1)
                var end : Int = HashProbability(i)
                for(j <- start to end) {
                    HashTable(j) = i


        val sample_num = (sample_rate*InputFileCount).toInt
        println("sample_num "+sample_num)
        var O2_Array: Array[Double] = new Array[Double](100)
        for(iterator <- 0 to 99)
            //println("the iterator is "+iterator)
            var learningrate = 0.1
            var O_2 = 0.0
            //false表示无放回采样 选取预先选定的采样数量
            var sampling = EgdeSet.takeSample(false,sample_num)
            for(i <- 0 to sample_num-1)
                var objective = 0.0
                //println("i is " + i)
                var row:Int = sampling(i)(0).toInt
                var col:Int = sampling(i)(1).toInt
                var u_j_context = U_context(col,::).t
                var u_j_context_t = U_context(col,::)
                var u_i_vertex = U_vertex(row,::).t
                var part1=(-1)*sampling(i)(2)*u_j_context*(1-Sigmoid((u_j_context_t*u_i_vertex).toDouble))
                //println("part1: "+part1)

                var negativeSampleSum = DenseVector.zeros[Double](Dim)
                var RandomSet : List[Int] = RandList(50000,NS)
                //println("RandomSet is:"+RandomSet)
                for(j <- 0 to RandomSet.length-1){
                    var u_k_context = U_context(HashTable(RandomSet(j)),::).t
                    var u_k_context_t = U_context(HashTable(RandomSet(j)),::)
                    negativeSampleSum = negativeSampleSum + u_k_context*Sigmoid((u_k_context_t*u_i_vertex).toDouble)
                //println("negativeSampleSum: "+negativeSampleSum)
                var part2 = sampling(i)(2)*negativeSampleSum
                //println("part2: "+part2)
                var d_O2_ui = part1-part2
                //println("d_O2_ui: "+d_O2_ui)

                var tmp1 = u_i_vertex - learningrate*(d_O2_ui)
                //println(tmp1(0)+" "+tmp1(1))

                // println("previous U_context(row,::): "+U_context(row,::))
                for(k1 <- 0 to Dim-1){
                    U_vertex(row,k1) = tmp1(k1)
                //println("after U_context(row,::): "+U_context(row,::))

                var d_O2_uj_context = (-1)*sampling(i)(2)*u_i_vertex*(1-Sigmoid((u_j_context_t*u_i_vertex).toDouble))

                var tmp2 = u_j_context - learningrate*(d_O2_uj_context)
                for(k2 <- 0 to Dim-1){
                    U_context(row,k2) = tmp2(k2)

                var negative_cal = 0.0
                for(j <- 0 to RandomSet.length-1){
                    var u_k_context = U_context(HashTable(RandomSet(j)),::).t
                    var u_k_context_t = U_context(HashTable(RandomSet(j)),::)

                    var sigmoid_uk_ui = Sigmoid((u_k_context_t*u_i_vertex).toDouble)
                    negative_cal = negative_cal + math.log(sigmoid_uk_ui)

                    var d_O2_uk_context = sampling(i)(2)*u_i_vertex*sigmoid_uk_ui

                    var tmp3 = u_k_context - learningrate*d_O2_uk_context
                    for(k3 <- 0 to Dim-1){
                        U_context(HashTable(RandomSet(j)),k3) = tmp2(k3)

                objective = (-1)*sampling(i)(2)*(math.log(Sigmoid((u_j_context_t*u_i_vertex).toDouble)) + negative_cal)
                O_2 = O_2 + objective

            O2_Array(iterator) = O_2
        val U2_HDFS = sc.parallelize(U_vertex.toArray,3)
        val O2_HDFS = sc.parallelize(O2_Array,3)

        //a(::, 2)  
