akka编程demo

2020-02-12  本文已影响0人  烂泥_119c

AKKA

akka基于actor模型, 是一个用于构建可扩展的弹性的快速响应的应用程序的平台;
actor模型:是一个并行计算模型。 它把actor作为基本元素来对待:未响应一个接收到的消息,一个actor能够自己做出一些决策,如创建更多的actor或者发送更多的消息

image.png

概念介绍

Actor:

actor是akka中最核心的概念,它是一个封装了状态和行为的对象,actor之间可以通过交换消息的方式进行通信,每个actor都有自己的收件箱,通过actor能够简化锁及线程管理,actor具有如下特性:

类介绍

ActorSystem

在Akka中,ActorSystem是一个重量级的结构,他需要分配多个线程,所以实际应用中,actorSystem一般是单例对象,我们通过ActorSystem创建很多actor,负责创建和监督actor

Actor

Actor负责通信,它包含一些重要的生命周期方法:

Demo

使用akka做一个简易的通信模型,实现一个主从结构通信

Master

主对象类,即注册中心,统计当前在线的worker数目

package akkaDemo

import akka.actor._
import com.typesafe.config.ConfigFactory
import org.apache.commons.cli.{GnuParser, Options}

import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration._
/**
  * @author phil.zhang
  */
class Master extends Actor {

  // workMap
  private val workerMap = new mutable.HashMap[Int, WorkerInfo]()

  private val workerList = new ListBuffer[WorkerInfo]()

  override def preStart(): Unit = {
    println("master 已经启动")

    import context.dispatcher
    // 循环检查心跳
    context.system.scheduler.schedule(0 millis, 10 seconds, self, Check)
  }

  override def receive: Receive = {
    // 接受注册信息并统计
    case RegisterMessage(workId, memory, cores) => {
      val info = new WorkerInfo(workId, memory, cores)
      info.lastHeartBeatTime = System.currentTimeMillis()
      workerMap.put(workId, info)
      workerList += info
      val size = workerList.size
      println(info)
      println(s"worker$workId 注册成功,当前worker共:$size")
      sender ! RegisterdMessage("注册成功")
    }
      // 检查心跳
    case Check => {
      val now = System.currentTimeMillis()
      val outTimeList = workerList.filter(worker => now - worker.lastHeartBeatTime > 5000)
      outTimeList.foreach(workerInfo => {
        workerList -= workerInfo
        workerMap.remove(workerInfo.workerId)
        println("移除" + workerInfo.workerId)
      })
    }
      // 接受心跳后更新心跳时间
    case SendHeartBeat(workId) => {
      if (workerMap.contains(workId)) {
        val workerInfo = workerMap(workId)
        workerInfo.lastHeartBeatTime=System.currentTimeMillis()
      }
    }
  }
}

object Master {

  def main(args: Array[String]): Unit = {
    val options = new Options()
    options.addOption("h", true, "host")
    options.addOption("p", true, "port")

    val parser = new GnuParser()
    val line = parser.parse(options, args)

    val host = line.getOptionValue("h")
    val port = line.getOptionValue("p").toInt

    val configStr=
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
      """.stripMargin

    val config=ConfigFactory.parseString(configStr)
    val actorSystem = ActorSystem("actorSystem", config)
    val master = actorSystem.actorOf(Props(new Master), "master")
  }

}

Worker

工作对象类, 向主类注册,并保持心跳

package akkaDemo

import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import org.apache.commons.cli.{GnuParser, Options}

import scala.concurrent.duration._

/**
  * @author phil.zhang
  * @date 2020/2/6
  */
class Worker(val memory:Int,val cores:Int,val masterHost:String,val masterPort:String) extends Actor{

  var master:ActorSelection = _

  override def preStart(): Unit = {

    // actorSystem 是master的ActorSystem的名字, master是masterActor的名字
    master=context.actorSelection(s"akka.tcp://actorSystem@$masterHost:$masterPort/user/master")
    // 向主类注册
    master ! RegisterMessage(1, memory, cores)
    println("worker注册")
  }

  override def receive: Receive = {
    // 主类注册返回信息
    case RegisterdMessage(message) => {
      println("worker" + message)

      import context.dispatcher
      // 循环发起心跳
      context.system.scheduler.schedule(0 millis, 2 seconds,self, HeartBeat)
    }
      // 发送心跳
    case HeartBeat => {
      master ! SendHeartBeat(1)
    }
  }
}

object Worker {

  def main(args: Array[String]): Unit = {
    val options = new Options()
    options.addOption("mh",true, "master host")
    options.addOption("mp",true, "master port")
    options.addOption("h",true, "host")
    options.addOption("p",true, "host")
    options.addOption("m",true, "memory")
    options.addOption("c",true, "cores")

    val parser = new GnuParser()
    val line = parser.parse(options, args)

    val m_host = line.getOptionValue("mh")
    val m_port = line.getOptionValue("mp")
    val host = line.getOptionValue("h")
    val port = line.getOptionValue("p")
    val memory = line.getOptionValue("m").toInt
    val cores = line.getOptionValue("c").toInt

    val configStr=
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
      """.stripMargin

    val config=ConfigFactory.parseString(configStr)

    val actorSystem = ActorSystem("workerActorSystem", config)

    val worker = actorSystem.actorOf(Props(new Worker(memory,cores,m_host,m_port)),"worker")

  }
}

WorkerInfo

工作对象信息类, 用于描述工作对象

package akkaDemo

/**
  * @author phil.zhang
  * @date 2020/2/6
  */
class WorkerInfo(val workerId:Int,val memory: Int,val cores:Int) {

  // 用于记录上次心跳时间
  var lastHeartBeatTime:Long = _
  
  override def toString = s"$workerId,$memory,$cores"
}

Message

定义了一些信息类型

package akkaDemo

/**
  * @author phil.zhang
  * @date 2020/2/6
  */
trait Message extends Serializable{

}

// slave发给master的心跳信息
case class SendHeartBeat(workId: Int) extends Message

// slave发给master的注册信息
case class RegisterMessage(workId: Int, memory: Int, cores: Int) extends Message

// master发给slave的注册反馈信息
case class RegisterdMessage(message: String) extends Message

// master发给自己的检查信息, 所以不需要序列化
case object Check

// slave发给自己的心跳信息,所以不需要序列化
case object HeartBeat

主要maven依赖

    <dependency>
      <groupId>commons-cli</groupId>
      <artifactId>commons-cli</artifactId>
      <version>1.2</version>
    </dependency>

    <dependency>
      <groupId>com.typesafe.akka</groupId>
      <artifactId>akka-actor_2.11</artifactId>
      <version>2.5.3</version>
    </dependency>
    <dependency>
      <groupId>com.typesafe.akka</groupId>
      <artifactId>akka-remote_2.11</artifactId>
      <version>2.5.3</version>
    </dependency>
上一篇下一篇

猜你喜欢

热点阅读