Greedy Step Averaging A paramete

2016-12-26  本文已影响0人  kiminh

Fregdata

https://github.com/TalkingData/Fregata

Combining Features Logistic Regression
imageimage imageimage imageimage

真正计算的模块

Logistic regression derived from approximation formula
imageimage
Averaging Scheme
imageimage imageimage
运行接口,通过参数传入真正的
//fregata.spark.model.classification
class LogisticRegressionModel(val model:LLogisticRegressionModel) extends ClassificationModel

object LogisticRegression {

  /**
    *
    * @param data
    * @param localEpochNum the local model epoch num of every partition
    * @param epochNum
    * @return
    */
  def run(data:RDD[(Vector,Num)],
          localEpochNum:Int = 1 ,
          epochNum:Int = 1) = {
    //class LogisticRegression extends ModelTrainer      
    val trainer = new LLogisticRegression
    new SparkTrainer(trainer)
      .run(data,epochNum,localEpochNum)
    
    //def buildModel(ps:ParameterServer) = new LogisticRegressionModel(ps.get(0))
    new LogisticRegressionModel(trainer.buildModel(trainer.ps))
  }
}


//fregata.spark.model.SparkTrainer
class SparkTrainer(trainer:ModelTrainer) {

  def run(data:RDD[(Vector,Num)],epochNum:Int,localEpochNum:Int) {
    (0 until epochNum).foreach{
      i =>
        run(data,localEpochNum)
    }
  }

  def run(data:RDD[(Vector,Num)],localEpochNum:Int) {
    val _trainer = this.trainer
    val br_opt = data.sparkContext.broadcast(_trainer)
    val pn = data.partitions.length
    
    
    //  def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U] = withScope {
    //  val cleanedF = sc.clean(f)
    //  new MapPartitionsRDD(
    //  this,
    //  (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
    //  preservesPartitioning)
    //  }
    
    //每个分区中的内容将以Iterator[T]传递给输入函数f,f的输出结果是Iterator[U]。最终的RDD由所有分区经过输入函数处理后的结果合并起来的
    
    val ws = data.mapPartitions{
      it =>
        val local_opt = br_opt.value
        
        //真正的run函数
        local_opt.run(it.toIterable,localEpochNum)
        Iterator( local_opt.ps.get )
    }.treeReduce{
      (a,b) =>
        a.zip(b).map{
          case (w1,w2) => w1 + w2
        }
    }
    ws.foreach{
      w =>
      val values = w match {
        case w : DenseVector => w.data
        case w : SparseVector => w.data
      }
      var i = 0
      while( i < values.length ) {
        values(i) /= pn
        i += 1
      }
    }
    trainer.ps.set(ws)
  }
}

通过parameter server获取参数
class LogisticGradient(ps:ParameterServer) extends Gradient {
  val thres = 0.95
  val update = Array(0.0)
  var stepSize = 0.0
  var i = 0.0
  
  //重载caculate,计算梯度更新值
  def calculate(x:Vector,label:Num) : Array[Num] = {
    var weight = ps.get
    if( weight == null ) {
      //更新权重 1*(n+1) 带bias
      ps.init(1,x.length + 1)
      weight = ps.get
    }
    val lambda = i / ( i + 1 )
    i += 1
    val margin = VectorUtil.wxpb(weight(0),x,1.0)
    val p1 = 1.0 / ( 1.0 + math.exp( - margin ) )
    val p0 = 1 - p1
    val b1 = math.exp(p1)
    val b0 = math.exp(p0)
    val x2 = math.pow(norm(x),2.0)
    // compute greedy step size
    val greedyStep = if( label == 1 ) {
      (p1 - thres) / ( thres * (1 - p0 * b0 - p1 * b1) + p1 * (1 - b0) ) / x2
    }else{
      (p0 - thres) / ( thres * (1 - p0 * b0 - p1 * b1 ) + p0 * (1 - b1)) / x2
    }
    // compute averaged step size
    stepSize = lambda * stepSize + (1 - lambda) * greedyStep
    update(0) = 2 * ( p1 - label ) * stepSize
    update
  }
}

//LogisticRegerssion.scala
class LogisticRegressionModel(val weights:Vector) extends ClassificationModel{

  var threshold = 0.5
  def setThreshold(t:Double) : this.type = {
    this.threshold = t
    this
  }
  def classPredict(x: Vector): (Num, Num) = {
    val margin = VectorUtil.wxpb(weights,x,1.0)
    val p = 1.0 / ( 1.0 + math.exp( - margin ) )
    val c = if( p > threshold ) 1.0 else 0.0
    (asNum(p),asNum(c))
  }
}

class LogisticRegression extends ModelTrainer {
  override type M = LogisticRegressionModel
  val ps = newPs    //ps
  val gradient = new LogisticGradient(ps)   //定义计算梯度
  def buildModel(ps:ParameterServer) = new LogisticRegressionModel(ps.get(0))
  def run( data:Iterable[(Vector,Num)] ) = {
  
    !!! 在run函数中通过Target这个case class传入gradient 和ps
    val target = Target(gradient,ps)
    new AdaptiveSGD()
      .minimize(target)
      .run(data)
    new LogisticRegressionModel(ps.get(0))
  }
}

//ModelTrainer.scala  core
trait ModelTrainer extends Serializable{

  type M

  def newPs = ParameterServer.create   //创建parameter serever
  def ps : ParameterServer
  def buildModel(ps:ParameterServer) : M


    //http://stackoverflow.com/questions/38289353/using-scala-trait-as-a-callback-interface
    //callback 语法糖
  def run(data:Iterable[(Vector,Num)],epochNum:Int ,callback : (M,Int) => Unit = null) : M = {
    var model : Any = null
    (0 until epochNum).foreach{
      i =>
        model = run(data)
        if( callback != null ) callback(model.asInstanceOf[M],i)
    }
    model.asInstanceOf[M]
  }

  def run(data:Iterable[(Vector,Num)]) : M

}


定义一个case class,其中需要Gradient和Parameter Server
case class Target(val gradient : fregata.optimize.Gradient, val ps : fregata.param.ParameterServer) extends scala.AnyRef with scala.Product with scala.Serializable {
}


trait Gradient extends Serializable{
  def calculate(x:Vector,label:Num) : Array[Num]
}

SGD优化方法
//AdaptiveSGD.scala
Ad

class AdaptiveSGD extends StochasticGradientDescent {

  override def stepSize(i:Int,x:Vector) = asNum(1d)

}

//StochasticGradientDescent.scala
class StochasticGradientDescent extends Minimizer {

  private var eta = asNum(.1)
  def setStepSize(eta:Num) : this.type = {
    this.eta = eta
    this
  }
  protected def stepSize(itN:Int,x:Vector) = eta

  def run(data:TraversableOnce[(Vector,Num)]) = {
    var i = 0
    data.foreach{
      case (x,label) =>
        val gradients = target.gradient.calculate(x,label)
        val step = stepSize(i,x)
        val delta = gradients.map( v => asNum( v * step ) )
        target.ps.adjust(delta,x)
        i += 1
    }
  }
}

//Minimizer.scala
trait Minimizer extends Optimizer {
  private[this] var _target : Target = _
  def minimize(target: Target) : this.type = {
    this._target = target
    this
  }
  def target = _target
}

//Optimizer.scala
trait Optimizer extends Serializable {
  def run(data:TraversableOnce[(Vector,Num)])
}
object ParameterServer {
  def create : ParameterServer = new LocalParameterServer
}

trait ParameterServer extends Serializable {
  def init(rows:Int,cols:Int)
  def adjust(delta:Array[Num],x:Vector)
  def get : Array[Vector]
  def set(ps:Array[Vector])
}

class LocalParameterServer extends ParameterServer {

  private[this] var ps : Array[DenseVector] = null
  private[this] var values : Array[Array[Num]] = null

  def init(rows:Int,cols:Int) = {
    values = Array.fill(rows)( Array.ofDim[Num](cols) )
    ps = values.map( new DenseVector(_) )
  }
  
  //Array(Array(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))
  //Array(DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))

  def set(ps:Array[Vector]) = this.ps = ps.map( _.toDenseVector )
  def adjust(delta:Array[Num],x:Vector) = {
    var k = 0
    while( k < delta.length ) {
      val d = delta(k)
      VectorUtil.forV(x,(i,xi) =>{
        values(k)(i) -= d * xi
      })
      k += 1
    }
  }
  def get : Array[Vector] = ps.asInstanceOf[Array[Vector]]
}

上一篇下一篇

猜你喜欢

热点阅读