【Akka 精选】Akka 架构简介
1.Akka 是什么
Akka 基于 Actor 模型,提供了一个在 JVM 上构建高并发、分布式和高容错应用程序的平台。
① 高并发的事件驱动模型:
● 并发/并行程序的简单、高级抽象;
● 异步、非阻塞、高性能的事件驱动编程模型;
● 轻量级事件驱动(1G 可容纳数百万个 Actor);
② 高容错的分布式系统:
● “let-it-crash” 容错语义的监督层次体系;
● 监督层次体系跨越多个 JVM;
● 异常重启的情况下,支持消息重放;
2.Actor 模型
Actor 模型是计算机科学并行计算领域的数学模型,在消息驱动的模式下,实现消息发送者与已经发送的消息解耦。
Actor:基本的计算单元,具有唯一的地址。Actor 接收消息后,根据自定义的行为决定如何计算、创建更多的 Actor 或者发送创建更多的数据到属于它的邮箱 Mailbox。同时,Actor 也会维护自身的私有状态,并且不会直接被其他的 Actor 修改;
Mailbox:邮箱是存储消息的地方。当 Actor 发送计算结果的时候,Actor 会把数据先发送到自身的 Mailbox,然后 Mailbox 再发送到下一个 Actor。因此,实现了 Actor 之间的消息异步发送。
image.png3.基于 Actor 模型的 Akka 设计方案
3.1 Akka 中的 Actor 及其生命周期
trait Actor {
import Actor._
type Receive = Actor.Receive
implicit val context: ActorContext = {
val contextStack = ActorCell.contextStack.get
if ((contextStack.isEmpty) || (contextStack.head eq null))
throw ActorInitializationException(
s"You cannot create an instance of [${getClass.getName}] explicitly using the constructor (new). " +
"You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.")
val c = contextStack.head
ActorCell.contextStack.set(null :: contextStack)
c
}
implicit final val self = context.self //MUST BE A VAL, TRUST ME
final def sender(): ActorRef = context.sender()
def receive: Actor.Receive // 这个是在子类中一定要实现的抽象方法
protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = receive.applyOrElse(msg, unhandled)
protected[akka] def aroundPreStart(): Unit = preStart()
protected[akka] def aroundPostStop(): Unit = postStop()
protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = preRestart(reason, message)
protected[akka] def aroundPostRestart(reason: Throwable): Unit = postRestart(reason)
def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
@throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
def preStart(): Unit = () // 启动Actor之前需要执行的操作,默认为空实现,可以重写该方法
@throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
def postStop(): Unit = () // 终止Actor之前需要执行的操作,默认为空实现,可以重写该方法
@throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
def preRestart(reason: Throwable, message: Option[Any]): Unit = { // 重启Actor之前需要执行的操作,默认终止该Actor所监督的所有子Actor,然后调用postStop()方法,可以重写该方法
context.children foreach { child ⇒
context.unwatch(child)
context.stop(child)
}
postStop()
}
@throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
def postRestart(reason: Throwable): Unit = { // 重启Actor之前需要执行的操作,默认执行preStart()的实现逻辑,可以重写该方法
preStart()
}
def unhandled(message: Any): Unit = {
message match {
case Terminated(dead) ⇒ throw new DeathPactException(dead)
case _ ⇒ context.system.eventStream.publish(UnhandledMessage(message, sender(), self))
}
}
}
Akka 采用树状监督体系,其中每个节点都是路径的组成部分,从跟节点深度遍历组成了一个完整的 URL,即 Actor 的地址。Actor 初始化的时候,首先分配 1 个 Path 和 1个 UUID 添加到监督体系中,然后如果 Actor 实例异常,会重新创建替换旧实例,同时提供 ActorRef 用于其它 Actor 的调用。Actor 完整的生命周期,如下图所示。
image.png当 Actor 异常终止且重启失败,Actor 会自我销毁或者被它的监管者销毁。Actor 会释放所占用的资源,它引用中的邮箱会被系统的“死信邮箱“(dead letter mailbox)所替代,并将邮箱中所有未处理的消息和新消息转移到该邮箱,即将所有消息作为死信重定向到事件流中。
3.2 Akka 监督体系
监管描述了 Actors 之间的依赖关系。监管者将任务委托给 Child Actor,并对 Child Acto r的失败状况响应。当某个 Child Actor 抛出一个异常,它自己会将自己和自己所有的下属挂起,然后向其自身的监管者发送一个提示失败的消息。
Akka 提供了 4 种失败处理的方式:
① 恢复下属,保持 Child Actor 当前积累的内部状态;
② 重启下属,清除 Child Actor 当前积累的内部状态;
③ 永久停止下属;
④ 向上传递失败,从而失败自身。
因此恢复 Actor 会恢复其所有下属,重启一个 Actor 也必须重启其所有下属,类似地终止一个 Actor 会终止其所有下属。
Akka 提供了 2 种的监管策略,OneForOneStrategy:只将所获得的指令应用在发生故障的 Child Actor 上。AllForOneStrategy:将所获得的指令应用在所有 Child Actor 上
如下图所示,Actor 系统在其创建过程中至少要启动 3 个 Actor。
① /: 根守护者
监督所有在 Actor 路径的顶级作用域中定义的特殊 Actor,发现任何 Exception 就终止所有的子 Actor。所有的 Actor都有一个监管者,但是根守护者没有父 Actor。因此使用一个虚拟的 ActorRef,在发现问题后立即停掉根守护者,并在根守护者完全终止之后,立即把 Actor 系统的 isTerminated 置为 true。
② /user: 守护Actor
为所有用户创建 Actor 的父 Actor,其中使用 system.actorOf() 创建的 Actor 都是其子 Actor。
③ /system: 系统守护者
为了实现正确的关闭顺序。
3.3 Akka Remoting 和 Cluster
Akka Cluster 是一个容错(Fault-Tolerant)、去中心化(Decentralized)、基于 P2P 的集群服务,而且不会出现单点故障(SPOF, Single Point Of Failure)。Akka 基于 Gossip 实现集群服务,而且支持服务自动失败检测。
Akka 提供了 2 种方式来使用 Remoting 功能:
① ActorSelection 方法搜索一个 Actor,该方法输入的参数的模式为:akka.<protocol>://<actor system>@<hostname>:<port>/<actor path>
② ActorOf 方法创建一个 Actor