netty

Akka介绍和入门

2019-08-06  本文已影响0人  段永平

注:本文大部分文字摘抄《Akka 实战:快速构建高可用分布式应用》中内容,但此书中所用的akka版本比较老,所以本文中的代码是本文作者基于最新版本的akka以及官网所实现。

文中涉及到的所有Demo都在github上

网页地址为:https://github.com/boomblog/AkkaDemo

clone地址为:https://github.com/boomblog/AkkaDemo.git

Akka简介

Akka 是一款高性能、高容错性的分布式&并行应用框架,遵循 Apache 2 开源许可,底层通过 JVM 上另外一个流行的语言 Scala 实现,提供Java&Scala API 。它基于经典的 Actor 并发模型(即所有的消息都是基于 Actor 组件进行传递),拥有如下特点:

并行与并发:提供对并行与并发的高度抽象。

异步非阻塞:Akka-Actor 消息通信都是基于异步非阻塞。

高容错性:为跨多 JVM 的分布式模型提供强劲的容错处理,号称永不宕机。

持久化:Actor 携带的状态或消息可以被持久化,以便于在 JVM 崩溃后能恢复状态。

轻量级:每个 Actor 大约只占 300bytes,即 1G 内存可容纳接近 300 万个 Actor。

基本上,Akka 从底层就解决了我们大多数分布式&并行程序常见的难题,让工程师更专注于业务实现,同时,它也保留了多个扩展接口及配置,便于满足个性化定制的需要!

Akka应用场景

目前 Akka 已经在多家互联网&软件公司广泛使用,比如 eBay、Amazon、VMWare、PayPal、阿里、惠普、豌豆荚等,涉及行业包括游戏、金融投资、医疗保健、数据分析等。

使用场景包括:

服务后端:比如 rest web,websocket 服务,分布式消息处理等。

并发&并行:比如日志异步处理,密集数据计算等。

总之,对高并发和密集计算的系统,Akka 都是适用的!

Akka架构体系

Akka 采用 Scala 开发,运行于 JVM 之上,提供了 Scala 和 Java 两种 API,目前属于 Lightbend 公司(原名 Typesafe)。它实现了经典的 Actor 模型,同时也提供了丰富的组件,比如邮箱(MailBox)、路由(Routing)、持久化( Persistence )、网络(包括远程、集群)等,在底层对分布式&并行模式进行了高度且统一的抽象,使工程师用很少的代码就可以实现一个完整的分布式应用。

Actor模型

Actor 模型最早在 1973 年由Carl Hewitt 提出,它高度抽象了分布式并行程序的运行模式,从底层屏蔽了线程和锁机制的管理,为开发者提供了简单可依赖的开发方式。

Actor 模型认为,并行计算的最小单元就是一个 Actor 实例,而每个实例拥有自己的状态和行为,在一个大型系统中,可能存在成千上万个 Actor 实例,它们之间通过消息的方式进行通信,每个 Actor 都能发送消息给其他 Actor,也能从其他 Actor 接收消息。当我们在执行某个计算任务时,会给对应的 Actor 实例发送一个相关的消息,该 Actor 在接收消息后开始执行计算任务,由于整个消息通信的过程是异步的,所以不用等到 Actor 执行完整个过程就能执行下一步(发送消息后会马上返回),这种异步通信的方式大大提高了程序的响应性。

体系结构

Actor 是 Akka 最核心的概念,也是最基本的执行单元,所以对 Actor 管理和监控的有效性是极为重要的。在 Akka 中,每个 Actor 都有自己的监管对象,即该 Actor 的创建者,它们通常会负责子 Actor 的失败处理,另外,某些 Actor 也需要对生命周期进行监控(比如该 Actor 的终止),以便及时响应并做正确处理,这些监督和监控者本身也都是一个 Actor。在 Akka 中,整个 Actor 体系被抽象成一个 ActorSystem ,它是一个层级的结构,拥有公共行为的配置和管理。

在ActorSystem 基础上,Akka 也提供了一些配套的组件,比如持久化,HTTP 服务,网络服务等,它们都是构建高可用分布式应用不可或缺的部分,基本架构体系和周边产品如下图所示。

Actor组件

在 Akka 中,Actor 是一个高度抽象的对象引用,它包含以下几个要素:

引用( Actor Reference ):Actor 的引用不同于普通对象的引用,也不能通过传统「new」的方式直接创建一个 Actor 对象。很多时候需要通过 actorOf 或者 actor-Selection 等方式返回一个ActorRef 对象,该对象有可能存在于本地,也可能存在于远程节点,对我们来说,它是位置透明的。Actor 之间的通信和执行任务都是通过消息驱动的(而不是 API 的调用)。

状态(State):Actor 在不同时刻可能有着不同的状态,这些状态用变量来表示。在底层实现上,Actor 是运行于线程池之上的,肯定会存在多个 Actor 共享同一个线程的情况,那么会不会出现并发问题呢?实际上,Akka 为每个 Actor 都抽象出一个轻量级的执行「线程」(不是真的线程),在底层已经实现了隔离,所以基本上不用担心该问题的出现。另外,当 JVM 崩溃时,为了避免 Actor 状态的丢失,我们可以借助持久化方案来对状态进行持久化操作。

行为(Behavior):Actor 有接收和发送消息的能力,每当它接收一条消息后,就可以执行某个业务操作,同时也可以把消息转发到其他节点进行处理。

监管策略( Supervisor Strategy ):Actor 系统是一个层级结构,当任务被某个 Actor 分摊到子 Actor 时,父 Actor 就拥有监管子 Actor 的义务。在监管时,我们需要根据不同的情况选择不同的处理方案(比如停止、重启、恢复或者失败上溯)和策略(比如 1 vs 1、1 vs N 策略)。

邮箱(Mailbox)

每个 Actor 都有自己的邮箱,所有其他 Actor 发送过来的消息都会进入该邮箱。Akka 自带多种邮箱类型,也提供自定义邮箱的接口。

路由(Routing)

消息除了通过普通的 Actor 发送之外,也可以通过路由发送。当通过路由发送消息时,我们可以根据需求来选择不同的路由策略,比如轮询、广播等。路由也可以是一个 Actor。

状态持久化(Persistence )

任何程序都有失败的可能,即便是 JVM 如此强大稳定的平台也一样。当程序出错、JVM 崩溃时,任何关键状态的丢失,对我们后续的业务来讲都可能是致命的打击,所以状态数据的持久化变得非常重要。Akka 提供了 Actor 状态的持久化方案,以便我们在必要时恢复数据。

网络(远程和分布式集群)

网络功能是实现远程 Actor 和分布式集群的基础,这其中包含 I/O、网络通信(TCP/UDP)、序列化配置、分布式通信协议(Gossip)、节点(node)管理、集群分片等内容。

HTTP 模块

Akka 提供了简单易用的 HTTP 模块,支持完整的 HTTP 服务端与客户端开发,可以帮助我们快速构建性能极强的Rest Web 服务。

相关开源项目

Akka 具有高性能、可扩展、设计友好等诸多优点,非常适合用来作为分布式应用的基础框架,而且由于对 HTTP 有非常好的支持,也让它在 Web 服务领域占有一席之地。目前业界已经有多个基于 Akka 实现的开源项目,项目类型涵盖了 Web 开发、微服务、分布式文件或计算服务等。下面是 Akka 中两个具有代表性的开源项目:

Play 框架:一款大名鼎鼎的 Web 开发框架。它不同于其他 Servlet 系的框架,比如 Struts 或者SpringMVC ,底层基于 Akka 构建,天生拥有异步的特点,具有极佳的性能。它默认提供 RESTful 风格的 API,同时也对WebSocket 有不错的支持。

Lagom 框架:在目前 IT 界,最火爆的概念要属「微服务」了,微服务的理念是,把业务功能拆成小的、独立的单元,它们之间能够互相通信而且支持水平扩展。Lagom 就是这样一款微服务框架,它基于异步的消息驱动,对分布式集群、持久化(如 JPA、NoSQL)都有良好的支持。同时,它也拥有完整的集成开发环境,非常便于在线部署和管理。

基本使用

环境搭建

JDK版本最好是在 1.6 以上,不过后续会介绍Akka的一些新特性,所以最好采用1.8的版本。

Maven依赖

使用最新版akka依赖。

<dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_2.12</artifactId><version>2.5.23</version></dependency>

注意Akka中不同组件分为了不同的模块,我们这里暂时只是引用了核心模块,通过它可以使用akka的最基本功能。除开核心模块还包括akka-persistence 、akka-remote 、akka-cluster 等模块,分别用于实现持久化、远程、集群等功能。

创建一个Actor

publicclassActorDemoextendsAbstractActor{privateLoggingAdapterlog=Logging.getLogger(this.getContext().getSystem(),this);@OverridepublicReceivecreateReceive() {returnreceiveBuilder()                .match(String.class,s->log.info(s))                .matchAny(o->{log.info("any"+o.toString());                })                .build();    }publicstaticvoidmain(String[]args) {ActorSystemsystem=ActorSystem.create("sys");ActorRefactorRef=system.actorOf(Props.create(ActorDemo.class),"actorDemo");actorRef.tell("hello world",ActorRef.noSender());// ActorRef.noSender()实际上就是叫做deadLetters的actor}}

通常每个应用程序只需要创建一个ActorSystem对象

在创建 ActorSystem 和 Actor 时,建议都指定名字,如果不知道akka将自动生成,在该示例中,它们的名字分别是 sys 和 actorDemo ,在同一个 ActorSystem 中 Actor 不能重名。

actorOf方法返回的不是Actor对象本身,而是ActorRef,表示Actor对象的引用。我们在向某个Actor发送消息时都是通过该引用进行的。

在Actor内部,可以使用getContext()来创建该Actor的子Actor,比如ActorRef childActor=getContext().actorOf(Props.create(ChildActor.class),"childActor" );

工厂模式

ActorSystem 和 ActorContext 通过接收一个 Props 实例来创建 Actor,而 Props 实例本身有两种方式可以创建:

Props.create(ActorDemo.class, Object... args)

指定一个Props工厂。

对于比较简单的场景使用第一种就足够了,而如果需要按统一的规则创建Actor,我们可以在Actor内定义一个创建Props的方法,比如:

publicstaticPropscreateProps() {returnProps.create(newCreator<Actor>() {@OverridepublicActorcreate()throwsException{returnnewHelloWorldActor();            }        });    }

然后可以使用一下代码来创建Actor:

ActorRefactorRef=system.actorOf(ActorDemo.createProps(),"actorDemo");

可以看出,如果我们使用工厂模式,在创建Actor时调用工厂方法,我们不用关心Actor创建的具体过程了,有Actor自己封装即可。

发送与接收消息

Actor中的createReceive方法是用来接收并处理消息的。

createReceive方法需要放回Receive对象,而Receive对象就是真正用来处理消息的逻辑。

Receive对象可以定义规则使接收到的消息进入不同的处理逻辑:

match(Class

type):消息类型为指定type的进入此分支

matchEquals(P object):消息值与object相等的进入此分支

matchAny():任何消息都可以进入到此分支

当然,一个消息只会进入到一个分支,如果一个消息匹配多个条件优先进入定义靠前的条件。

可以使用tell和ask两种方式发送消息,它们都以异步的方式发送消息,不同的是,前者发完后立即返回,而后者期待得到一个返回结果,假如在设置的时间(Timeout)内没有得到返回结果,发送方会收到一个超时异常。

tell方法

第一个参数即"消息",它可以是任何可序列化的数据或对象,第二个参数表示发送者,noSender()表示无发送者(实际上是一个叫做 deadLetters 的 Actor,后面介绍)。假如想在Actor内部得到发送者,可以调用getSender ()方法。

在调用 tell 方法后,Actor 就会异步的去处理该消息,并不会阻塞后续代码的运行。

ask方法

这是一种“请求-响应”模式,ask方法会将返回结果包装在scala.concurrent.Future中。

对于一个Actor如何将响应返回给消息发送者呢,其实就是想响应作为消息发送给请求消息的发送者:

getSender().tell("hello "+msg,getSelf());

使用样例:

publicclassAskDemoextendsAbstractActor{@OverridepublicReceivecreateReceive() {returnreceiveBuilder().matchAny(o->{System.out.println("发送者是"+getSender());Thread.sleep(1000);getSender().tell("hello "+o,getSelf());        }).build();    }publicstaticvoidmain(String[]args) {ActorSystemsystem=ActorSystem.create("sys");ActorRefaskActor=system.actorOf(Props.create(AskDemo.class),"askActorDemo");Timeouttimeout=newTimeout(Duration.create(2,TimeUnit.SECONDS));Future<Object>f=Patterns.ask(askActor,"Akka Ask",timeout);System.out.println("ask ...");f.onSuccess(newOnSuccess<Object>() {@OverridepublicvoidonSuccess(Objectresult)throwsThrowable{System.out.println("收到消息:"+result);            }        },system.getDispatcher());    }}

消息转发

Actor可以将接收到的消息转发给其他Actor,语法为:

staticclassForwardActorextendsAbstractActor{privateActorReftarget=getContext().actorOf(Props.create(TargetActor.class),"targetActor");@OverridepublicReceivecreateReceive() {returnreceiveBuilder().matchAny((message)->{target.forward(message,getContext());            }).build();        }    }

TargetActor接收到转发过来的消息后,发送者任然是消息原始的发送者,而不是转发者。

查找Actor

对于已存在的 Actor,我们可以根据路径来进行查找,API 如下:

ActorSelection as = [ActorSystem/ActorContext].actorSelection([path]);

在调用它时,必须指定一个绝对或者相对路径(包括远程路径),比如:

/user/parentActor/childActor

../parentActor/childActor

akka.tcp://testuser@127.0.0.1:2554/user/parentActor/childActor

查找结果为ActorSelection,表示一个或多个Actor,所以向ActorSelection发送消息时,对应的一个或多个Actor都将接收到消息。

消息不可变

Akka中都是消息驱动的,所以消息是非常重要。理论上,Actor 可以传递或接收任何类型的消息,但是为了避免出现竞态条件(对共享资源的并发写)和不可预知的数据篡改,最好把消息设计成不可变的对象。在该对象中,我们通常会把所有变量设计成只读,在该对象的整个生命周期内,这些数据将不可修改。所以在 Java 中,我们借助final关键字来实现。

对于集合框架,有必要通过 Collections.unmodifiable 系列的 API 来保证不变性。Collections.unmodifiable 内部使用 UnmodifiableList 、UnmodifiableMap 等 API 对现有的集合进行包装,它们通过重写其中的 set、add、put 等方法来达到限制数据修改的目的。

Actor行为切换

在业务处理过程中,往往会面临一些关键数据的变化,而这些变化也会进一步影响着程序的执行逻辑,很多时候,我们把这些关键数据称之为「状态」。状态的处理需要依赖于具体的业务,而业务本身也可能充满不确定性,一种比较好的做法是,我们把多个状态的处理过程(即行为)封装起来,形成一个个独立的组件,在使用时就可以很方便地组装、重用或切换。在 Actor 中,行为可以通过Receive来定义,一旦抽取完我们所需要的Receive,就可以使用 become/unbecome 方法来切换他们,其中 become 表示切换为某个行为,unbecome 表示修改回上一个行为。

Actor生命周期

Actor 在运行时中会经历不同的阶段,比如创建、运行、重启和销毁等,这一系列的过程或状态,我们称之为生命周期。在理想情况下,Actor 应该像一头老黄牛一样任劳任怨地不停工作,但实际情况往往是,当某个不算常态的错误发生时(比如网络偶尔超时),我们想让它重启一下,然后重复之前的动作;又或者,当一个程序异常被抛出时,我们不得不停止该 Actor,让它不能继续走下去,这些行为有可能是手动触发的,也可能是 Actor 的监督-容错机制导致的。无论怎样,当这些行为发生时,我们都希望能及时感知并做有效处理。

Actor 生命周期主要包括启动(Start)、恢复(Resume)、重启(Restart)、停止(Stop)这四个阶段,每个阶段都会伴随着自身状态信息的变化。

创建并启动

当通过 actorOf 创建并启动 Actor 时,该 Actor 除了默认拥有已指定的 path 外,也会被分配一个 UID(可以通过 getSelf().path().uid()得到该值),作为它的唯一标识。在 Actor 启动后,会默认调用preStart 方法,在该方法里,我们可以做些资源初始化的操作。

恢复运行

当 Actor 出现某种异常后,通过容错机制,可以让该 Actor 恢复并继续运行,此时 Actor 会延用之前的实例,状态也会被保留下来。

重启

当 Actor 出现某种异常后,通过容错机制,可以让该 Actor 执行重启。重启的具体过程如下:

调用旧实例的preRestart 方法,该方法默认情况下会停掉所有子级 Actor,并调用postStop 方法;

创建新实例,并在新实例上调用postRestart 方法,该方法默认情况下会调用preStart 方法。

Actor 重启后,path 和 UID 不变,这意味着假如要继续使用该 Actor,不需要重新获取ActorRef 对象,使用之前那个就可以了。同时也得注意到:Actor 重启后不会保留自身状态。

停止

当 Actor 停止时,会调用postStop 方法,同时会发送一条Terminated信息给自己的监控者,告知自己已处于停止状态。

停掉一个Actor

停止 Actor 大致有三种方式,它们分别是:

调用ActorSystem 或者getContext() 的 stop 方法;

给 Actor 发送一个PoisonPill (毒丸)消息;

给 Actor 发送一个 Kill 的消息,此时会抛出ActorKilledException 异常,并上报到父级supervisor 处理。

Actor 在停止时都会遵循一套比较可靠的流程:

当停止 Actor 时,正在处理的消息会在完全停止之前处理完毕,后续信息将不再进行处理,邮箱(用来保存 Actor 的消息)将被挂起;

给所有子级 Actor 发送终止指令,当子级都停掉后,再停掉自己,停止完毕后会调用postStop 方法,在这里可以清理或释放资源;

向生命周期监控者(DeathWatch )发送Terminated 消息,以便监控者做相应的处理。

可以使用getContext().watch();方法监控某个节点的停止信息,当该节点停止后,监控者会接收到一个Terminated消息。

监督与容错处理

Actor 系统采用"父监督"的模式进行管理,即父 Actor 会监督子 Actor 的异常情况,然后根据默认或者预设的处理逻辑来确定到底是该恢复 Actor、停止 Actor、重启 Actor 还是把错误上溯到父级。

Akka 提供了两种监督策略,分别是 One-For-One Strategy 和 All-For-One Strategy ,前者表示当一个子 Actor 出现异常时,只对该 Actor 做处理,后者则表示对所有子 Actor 都做处理,大部分时候应该选择 One-For-One Strategy (这也是默认的策略),除非子 Actor 之间的业务有很强的关联或者互相依赖。当程序中没有显式指定策略时,会启动一个默认策略:

当抛出ActorInitializationException 和ActorKilledException时,会终止子 Actor;

当抛出Exception 时,会重启子 Actor;

抛出其他类型的Throwable 异常时,会上溯失败到父级。

在 Actor 执行任务时可能会抛出不同类型的异常,很多时候,我们应该知道,哪些异常需要让 Actor 停止或重启,哪些异常可以「视而不见」。为了更加细化这种判断,我们需要自定义监督逻辑。

自定义一个监督逻辑非常简单,只需要在父 Actor 中创建一个SupervisorStrategy 对象,并通过supervisorStrategy ()方法返回出来.

熔断(circuit breaker)

当一个Actor出现异常时,如果用户还继续向这个Actor发送消息,很有可能这个消息还是无法被处理,还额外的增加了服务器的压力,Akka中提供了熔断机制(circuit breaker)。

Circuit Breaker 状态图(图片来自 Akka 官网)

circuit breaker有三个状态,分别是:Closed、Open、Half-Open。

Close 状态

正常情况下,circuit breaker是closed状态:Actor出现异常或调用超过配置的callTimeout,就增加一次失败计数;

成功则重置失败计数为 0;

当失败次数达到maxFailures后,circuit breaker会进入Open状态。

Open 状态

Actor接收到的所有调用将抛出CircuitBreakerOpenException ,立即失败;

在resetTimeout后,circuit breaker 进入Half-Open状态。

Half-Open 状态

第一次调用将尝试运行而不会快速失败;

如果第一个调用成功,会重回 Closed 状态;

如果第一个调用失败,会进入 Open 状态,然后等待下一次resetTimeout 。

配置

Akka 程序在启动时会加载一些默认的配置项,所以我们不需要显式地提供配置文件,假如要个性化配置某些行为,则需要新建配置文件。在默认情况下,程序会加载 classpath 下的 application.conf 、application.json 、application.properties 文件。

在创建ActorSystem 时,可以显式加载自定义的配置文件,比如app.conf ,代码如下:

ActorSystemsystem=ActorSystem.create("myapp",ConfigFactory.load("app.conf"));

Akka 提供的配置项涵盖了几乎所有的核心功能,比如日志、远程、路由、消息序列化、分布式集群等.

上一篇下一篇

猜你喜欢

热点阅读