Reactor模式与NIO[译]

2019-03-26  本文已影响0人  库洛琪

Reactor模式

Reactor模式并不是近年来的一个新发明,在1995年出版的由Jim Coplien和Douglas C. Schmidt的叫做“Parttern of Program Design”的书中曾描述过。简而言之,该模式用于处理并发服务请求,这些请求被多路分解并同步分派给相关的请求处理程序。

现在我们来看看这意味着什么。并发,这意味着同时有大量的请求并且资源是有限的(例如CPU时间)。

更进一步,它是关于service和request handler的。在基于reactor模式提供服务的应用中,对于每个service的请求都将会引入一个独立的request handler来处理。传入的请求将被注册并排队等待处理。

多路分解和分派的任务由event loop完成。

img

总的来说,reactor模式的好处就是可以避免对处理每个请求的线程的重复创建。

Vert.x

Vert.x是运行于JVM上的reactor工具包。它是基于Netty的事件驱动、非阻塞异步框架。支持多种语言。例如Java、Javascript、Scala、Groovy等。Vert.x由任职于VMware的Tim Fox在2011年发起。该项目2013年移到Eclipse Foundation,现在成熟版本为3.4.x。

Reactor模式描述了单个线程在循环中运行以将事件分发给处理程序。对于现在的计算机,一般都是多核,所以Vert.x扩展了Reactor模式,实现了Multi-reactor模式,每一个Vert.x实例都可以维护多个event loop,默认为每个CPU核心两个event loop。

因为这个模式十分重要,Vert.x也帮助你正确的实现它。阻塞event loop对于Vert.x是不可取的,并且你会收到WARNING,提示你慎重考虑该线程和你自己的编程模型。

WARNING: The Thread[vert.x-eventloo-thread-1,5,min] has been blocked for 13223 ms, time limit is 2000

io.vertx.core.VertxException: Thread blocked

C10k问题

现在回到阻塞和C10k问题,这两个问题经常与本主题同时提到。C10k指的是处理10000(10k)并发连接的问题。

首先,我们来看看典型的web server/servlet容器如何处理http请求。当接收到请求,web服务器将建立TCP连接,读取、解析请求中的content。之后,该请求移交到特定的业务逻辑处理,在IO相关的应用中,通常是访问文件系统、网络资源或者数据库。众所周知,与数据的计算处理相比,IO操作相当慢。我们可以通过数据来看这一点Google Research

回到上面的http请求,当前由于CPU需要等待IO设备加载所有数据而阻塞,它除了等待并不能做其他事情。这意味着CPU资源空闲吗? 当然。这不是我们希望看到的,我们希望整个过程尽可能利用CPU,最终将相应返回给客户端。

这仅是一个请求或者是一个线程的情况,在并发的情况下操作系统使用多任务和上下文切换避免过长的等待时间。当某一线程阻塞等待IO时,切换到另一线程。但是,这带来了额外的成本,因为线程本身在内存和上下文切换方面也有所需要的开销。

img

现在,我们讨论了C10k问题的核心。同时处理数千客户端连接和线程需要利用到非阻塞IO,这是C10k问题的可伸缩解决方案。

Reactive vs. blocking实现

我们用一个分别通过reactive non-blocking和blocking方式实现的例子来感受两者之间的区别。

这个例子是一简单的基于HTTP的服务,它计算给定id文件的校验和,并通过休眠的方式来模拟阻塞。这些文件大小基本相同,约为200KB,因此不会因为计算某一文件的校验和而产生较大影响。

HTTP请求类似于这样:

http://localhost:8080/hashser?id=17

请求的响应类似于下面:

90491c182f1f9ac6a86845c9b34c6f41e1485e8dfc5f4bbfd646dc6f661ce7986eed287c3a7314a368e55a2167b8597c2a974ef2c7c536e44455b1316474bcd4

Servlet实现

首先来看看基于servlet的实现方式:

@WebServlet(name = "hashServlet", urlPatterns = {"/hasher"}, loadOnStartup = 1)
public class HashingServlet extends HttpServlet { 
     @Override
     protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws
ServletException, IOException {
       try {     
        Thread.sleep(1000);  
        } catch (InterruptedException e) {     
          //ignore
        }   
        String id = req.getParameter("id");
        String pathToFile = getFileBasedOnId(id);
         try (FileInputStream fileInputStream = new FileInputStream(new
File(pathToFile)); PrintWriter writer = resp.getWriter()) {
           String sha512Hex = DigestUtils.sha512Hex(fileInputStream);
           resp.setContentType("text/plain");
           writer.println(sha512Hex);
         } catch (Exception e) {
           System.err.println(e.getMessage());
           }
  }
private String getFileBasedOnId(String id) {
   return "D:/tmp/pic_" + id + ".png";
 }
}

继承HttpServlet并实现doGet方法。首先,通过休眠1s模拟远程调用,这会阻塞执行这个请求的线程。接下来,从文件系统读取请求的文件并使用标准库commons-codec中的DigestUtils计算文件的校验和。计算结果作为palin text写入到响应中。这段代码稍后将会部署到Servlet容器中以处理请求。

Vert.x实现

接下来,我们看看基于Vert.x的实现(在spring boot中集成Vert.x)。

package cmcc.iot.reactor;

import cmcc.iot.reactor.verticle.HashingVerticle;
import io.vertx.core.Vertx;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import javax.annotation.PostConstruct;

@SpringBootApplication
public class ReactorApplication {

    @Autowired
    private HashingVerticle hashingVerticle;

    @PostConstruct
    public void deployVerticle() {
        Vertx.vertx().deployVerticle(hashingVerticle);
    }

    public static void main(String[] args) {
        SpringApplication.run(ReactorApplication.class, args);
    }

}

package cmcc.iot.reactor.verticle;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class HashingVerticle extends AbstractVerticle {
    @Override
    public void start() throws Exception {
        Router router = Router.router(vertx);

        router.route().handler(BodyHandler.create());
        router.get("/hasher").handler(this::handleHash);

        vertx.createHttpServer().requestHandler(router::accept).listen(8080);
    }

    private void handleHash(RoutingContext routingContext) {
        String id = routingContext.request().getParam("id");
        String pathToFile = getFileBaseOnId(id);

        HttpServerResponse response = routingContext.response();
        vertx.setTimer(1000, timer ->{
            vertx.fileSystem().readFile(pathToFile, handler -> {
                if (handler.succeeded()) {
                    byte[] bytes = handler.result().getBytes();
                    String hash = DigestUtils.sha512Hex(bytes);
                    response.putHeader("content-type", "text/plain").end(hash);
                } else {
                    log.error(handler.cause().getMessage());
                }
            });
        });
    }

    private String getFileBaseOnId(String id) {
        return "D:/tmp/pic_" + id + ".png";
    }
}

Vert.x附带可选的部署模型,在该模型下,你的应用由一组Verticle构成。甚至你可以使用不同的类加载器,以便部署不同版本的Verticle。在一本介绍微服务的书中,Eberhard Wolff将Vert.x描述为Nanoservice(对比Microservice),因为它与微服务有相似之处,但事实上它们可以在一个JVM中并行运行,使得它们更小(对于微服务,通常独立应用,独立的JVM)。

在Vert.x中有不同类型的Verticle,但鉴于我们的例子相对简单,使用标准的Verticle就足够了。Verticle需要被指定给event loop线程,通过继承AbstractVerticle时实现start方法来实现。Vert.x保证Verticle中的所有代码在同一个event loop中执行。因此,你不必担心同步、其他线程、扩展这些问题。

start方法中,创建了一个router用来定义哪个Handler处理哪个路由。之后,创建了一个监听在8080端口的http服务器。并不需要Servlet容器或者类似的容器。

handleHash方式使用timer来实现阻塞,相对于前面的Thread.sleep实现。timer并不会阻塞event loop线程,1s之后调用handler从文件系统读取请求的文件并使用标准库commons-codec中的DigestUtils计算文件的校验和,并调用end方法将计算结果写入到响应中。由于是异步处理,在reactive实现中会非常频繁地使用lambda回调函数。

负载测试

为了测试和对比这两种HTTP service的实现,我们将使用Gatling作为负载测试框架。有了Gatling,将能使用强大的DSL来表达测试用例。它是基于Netty并使用Scala开发的非阻塞HTTP,也非常适用于这篇博文。

对于我们的service,使用Gatling非常容易编写负载测试:

class LoadRunner extends Simulation {
  val idFeeder = Iterator.continually(
    Map("randomId" -> Random.nextInt(4000))
  )
  val httpConf = http
    .baseURL("http://localhost:8080")
  val scn = scenario("Hashing LoadTest")
      .feed(idFeeder)
      .exec(http("Hasher").get("/hasher?id=${randomId}")
      .check(status.is(200)))
  setUp(    
      scn.inject(rampUsersPerSec(10) to 300 during (60 seconds))
   ).protocols(httpConf)
}

LoadRunner继承于Simulation基类,并且需要setUp方法和针对具体protocol(httpConf)sceario(scn)scenario描述了我们的测试用例,使用随机id执行HTTP get调用。随机id来自于idFeeder,生成一个4000以内的随机正整数,因为需要我们计算校验和的文件数为4000。最后,使用inject方法设置进行压力测试的用户数。Gatling使用起来非常灵活,提供了广泛的可注入的条件。在我们的例子中,进行了在60s内从10个用户每秒到300用户每秒请求的测试。

gatling-maven-plugin提供了一种简单的执行simulation的方法:mvn gatling:excute

结论

基于servlet实现的例子运行在Apache Tomcat 8.5.5上,并设置了每个HTTP connetor最多100线程。

运行上面的压力测试得到了以下表格中的结果。执行了9300请求,没有失败请求。最短耗时1002ms,平均值约为16s。

执行结果
Total OK KO
9300 9300 9300 0
平均 req/s 90.201 90.201
响应时间(ms)
Total OK KO
最小 1002 1002
50百分位 14273 14214
75百分位 27748 27745
95百分位 39557 39557
99百分位 42030 42030
最大 42685 42685
平均值 16489 16489
标准差 13158 13158

观察下面的统计图可以获取到更多细节

img

橙色的线代表了活跃用户数,当达到最大响应时间约40s时,用户数上升到4000。开始时,响应时间比预期略高1s。随着请求的增加响应时间发生了戏剧性的增长。

[图片上传失败...(image-a3553e-1553563532908)]

在这张图中,可以看出从每秒100请求开始,由于1s的阻塞调用,servlet方式无法跟上。在这样小的规模上, 我们也面临着C10k问题。

img

将响应数量控制在100(即将servlet容器线程限制为100),耗时45s完成所用用户的请求。

同样的场景下使用Vert.x服务器运行10个Verticle实例。意味着10个event loop线程,是Tomcat线程的1/10。下面的结果展示了在相同数量请求下,Vert.x更短的响应时间。

执行结果
Total OK KO
9300 9300 9300 0
平均 req/s 152.459 152.459
响应时间(ms)
Total OK KO
最小 1001 1001
50百分位 1004 1004
75百分位 1007 1007
95百分位 1012 1012
99百分位 1035 1035
最大 1252 1252
平均值 1006 1006
标准差 8 8
img

观察图标中的响应时间,几乎是一条直线,仅仅只有少数波动,这也证实了8ms的低标准差。下面的两张图也显示出,随着负载的增加,响应和请求能维持一个较好的状态。

img img

使用timer是一个非常理想的解决方案,但是相比使用Thread.sleep并不是100%公平,但是在我们的示例中,关注的焦点在阻塞和非阻塞上。为了使这两种实现更加公平,移除vertx.timer和Thread.sleep再进行一次压力测试。

从配置的角度来看,在Tomcat中的HTTP connector配置最大500线程,同时保持10个Verticle实例,稍多于8核的机器。Gatling测试中,在30s内每秒10到200个用户,每个用户100个请求,让系统的负载增大。

结论

基于servlet实现的结果如下:

执行结果
Total OK KO
31500 31500 0
平均 req/s 2460.938 2460.938
响应时间(ms)
Total OK KO
最小 0 0
50百分位 903 905
75百分位 1204 1204
95百分位 1890 1890
99百分位 2592 2592
最大 22319 22319
平均值 897 897
标准差 584 584

可以看出,平均响应时间稍高于1s,基本上每个请求完成时间为2.6s,最大响应时间超过22s。

[图片上传失败...(image-cbd802-1553563532908)]

约40%的请求800ms内完成,约25%的请求超过1200ms。

[图片上传失败...(image-6e9b43-1553563532908)]

在响应时间中,除了两个峰值外,整体比较平缓。

img

在每秒响应数上也没有比较特殊的点,下面我们看看reactive的实现方式。

Vert.x实现方式的数据:

执行结果
Total OK KO Servlet.impl Diff
31500 31500 0 31500 0
平均 req/s 2863.636 2863.636 2460.938 -402.698
响应时间(ms)
Total OK KO Servlet.impl Diff
最小 0 0
50百分位 751 751 905 154
75百分位 935 935 1204 269
95百分位 1263 1263 1890 627
99百分位 1528 1528 2592 1064
最大 2085 2085 22319 20234
平均值 694 694 897 203
标准差 367 367 584 217

相比于servlet的实现方式,有了更好的测试结果。

img

50%请求小于800ms,很小的百分比大于1200ms。

img

响应时间这次没有峰值,50%线至90%线和活跃用户数很接近,表现出了良好的性能。

img

随着时间的推移,每秒的响应数也很稳定。

从这些图表,很难解释为什么数据有区别。仅仅知道servlet实现方式在这种场景下,随着负载的增加没有表现良好的性能。因此需要更多的数据来分析这个结果,例如,在两次运行中使用Profiler获取的数据。

img img

基于servlet的实现方式,上面的数据显示出有500 http-nio-thread 不仅处于运行状态(绿色),还经常处于阻塞状态(红色)。查看这些线程的dump数据,它们都阻塞在摘要计算中,等待底层资源,因为所有线程都希望被同时处理。

img

在reactive实现方式中,则完全不一样。使用了10个event loop线程利用可用的CPU资源,而不是500个线程。同时,Vert.x维护了一个默认20线程的线程池用来处理阻塞操作。这非常清楚的表明了为什么非阻塞模式完全优于阻塞模式,这正如reactor模式所描述的那样。

总结

我们经历了从深入reactor模式理论,到使用提供该模式高级API的Vert.x实现。通过了解C10k问题,研究了两种负载测试场景,第一种场景中比较了两种极端的blocking版本和non-blocking版本的实现。第二种场景中切换到了更加实际的例子,请求需要大量的IO操作并且竞争CPU资源,这种情况下reactive non-blocking方式也有良好的表现。

原文

The reactor pattern and non blocking IO

上一篇下一篇

猜你喜欢

热点阅读