Akkanetty

Akka分片集群的实现-邓草原

2018-04-25  本文已影响0人  mango_knight

最近在学习 邓草原用akka实现的实时流式消息处理的架构,整合一下演讲和ppt,方便自己理解。
http://www.infoq.com/cn/presentations/akka-cluster-realization/

Actor模式,一种计算粒度,


Actor三定则(actor行为的改变围绕异步消息流驱动)当actor收到一条消息:
1.创建另外一些actor
2.向已知的actor发送消息
3.指定接收下一条消息的行为


Actor - 适合并行计算的最小的粒度 也是做增量计算的最小粒度
原因:

基于以上两点 单个actor总是线程安全的

并行是多个actor的行为


在企业级应用、互联网应用中
Entity(实体通常是带状态的)应该是actor
是actor可以带来以下的优点


在 Akka中 actor的实现


分片 - IdExtractor / ShardResolver
根据实体的id按一定的规则分散到不同的节点中,下面是解决该问题定义的几个基本的type

    type EntryId = String //entity id
    type ShardId = String  //region id 每个节点上有很多region
    type Msg = Any     //往每个entity actor 发送的消息
    type IdExtractor = PartialFunction[Msg, (EntryId, Msg)] //id 抽取
    type ShardResolver = Msg => ShardId //根据消息 转换成 region id

具体例子:

    //把消息定义成Command
    sealed trait Command extends Msg with Serializable {
      def sessionId: String  
    }
    // cluster 按 sessionId 与 actor 一一对应,按需即时创建或定位转发
    lazy val idExtractor: ShardRegion.IdExtractor = {
      case cmd: Command => (cmd.sessionId, cmd)
    }
    // cluster 依据 sessionId ,按一定规则,将 actor 分片到 Region
    // 比如 100 个 regions , cluster 会在每个节点分配若干个 Regions
    lazy val shardResolver: ShardRegion.ShardResolver = {
      case cmd: Command =>
        (math.abs(cmd.sessionId.hashCode) % 100).toString
        //分片方式
    }

持久化的实现 persist /recover

    class ClusterConnectionActive(val namespaceMediator: ActorRef, 
                                  val broadcastMediator: ActorRef) extends
                ConnectionActive with EventsourcedProcessor {
      override def receiveRecover: Receive = {
        case event: Event => updateState(event) // 重演持久化的消息历史以恢复状态
      }
      // 只持久化会改变状态的消息
      override def receiveCommand: Receive = {
        case connected: Connected =>
          persist(connected)(updateState(_))
        case packets: UpdatePackets =>
          persist(packets)(updateState(_))
        case _ => // 处理其它消息
       }
      def updateState(event: Event) = {//具体的业务逻辑
        event match {
          case x: Connected =>
            connectionContext.foreach(_.bindTransport(x.transport))
          case x: UpdatePackets =>
            pendingPackets = immutable.Queue(x.packets: _*)
        }
      }

spray - socket .io 集群架构


故障场景

上一篇下一篇

猜你喜欢

热点阅读