Netty
介绍
摘自Netty官网:
“Netty是一个NIO C/S框架,能够快速、简单的开发协议服务器和客户端等网络应用。它能够很大程度上简单化、流水线化开发网络应用,例如TCP/UDP socket服务器。”
Netty是一个主要用于编写高并发网络系统、网络应用和服务的Java库。Netty和 标准的Java APIs一个主要的差别在于它的异步API。这个名词对于不同的人来说意味着不同的东西,可能和非阻塞、事件驱动有相同的意义。先不管这些,如果你之前从未使用过异步API,习惯于编写顺序执行的程序,那么转换思路编写Netty程序有点麻烦。在这里简要的介绍我如何解决这个问题。
编写一个Netty示例并启动。和其他Java API一样,发起request很容易。在处理response需要转换思维,因为没有response。几乎每个方法调用都是异步的,这意味着没有返回值,而且调用常常是立刻返回的。结果(如果有的话)是 由 其他线程传回的。这是普通API和异步API的基本区别。假如一个客户端API提 供一个方法来从服务器获取widget的 数量。
标准API
public int getWidgetCount();
当一个线程调用getWidgetCount(),一段时间过后一个int值会被会返回。
异步API
public WidgetCountListener myListener = new WidgetCountListener() {
public void onWidgetCount(intwidgetCount) {
...... doyour thing with the widget count
}
};
在我举例的getWidgeCount API异步版本中,方法调用没有返回数据,而且即刻执行完毕。然而,它接受了一个response处理器参数,当获取到widget数 据会回调这个监听器,然后监听器可以利用这个结果执行任意有用的操作。
这个复杂的附加层看起来是不必要的,但这却是利用Netty等API实现的高性能的应用、服务的一个关键特征。客户端不需要为线程被阻塞在等待服务器响应而浪费资源。当数据可用时会通知他们。对于一个线程发起一个调用的情况,这种方式可能有些过度设计,但是设想有上百个线程百万次的执行这个方法。而且,NIO最重要的好处是Selectors能够用来将我们感兴趣的事件通知委托给底层的操作系统,甚至是硬件来完成。当16bytes已经成功从网络写入到一个服务器或者14bytes经过网络从一个服务器读到本地等操作会被回调通知,很明显是一个底层而且细节的实现方式。但是通过一系列的抽象,开发人员使用Java NIO和Netty API能 够从更宏观、抽象的级别上来处理这些问题。
在本篇介绍中,我会从一些基本的概念讲起,然后列举部分核心的构建代码块,最后介绍一些代码示例。
这些是如何工作的呢?
Netty的基本数据结构是ChannelBuffer。Netty的JavaDocs定义:
一个可以被随机或顺序访问任意个数字节的序列。这个接口为原生byte数组(byte[])和Java NIO buffers提供了一个抽象的概念。
Netty中数据如何被分发也是由ChannelBuffer完成的。如果你的程序只是处理byte数组和byte缓冲区,那么你会很幸运因为使用ChannelBuffer。 然而,如果你需要处理更高级的数据,例如java对象,把这个数据发送到一个远程服务器,然后再接受另外一个返回的对象,那么这些byte计划需要被转义。或者,如果你需要发起一个请求到一个HTTP服务器,你的请求或许是从发起一个字符串类型URL来开始它的生命周期,但是它需要被包装成一个HTTP服 务器能够理解的request,然后分解为原始的byte,最后通过网络发送。HTTP服务器需要接收这些byte,并且把它们解析一个符合格式的HTTP request。一旦请求成功,这个流程会被反过来执行:服务器的响应(或许是一个JPEG图片或者JavaScript文件)必须被包装成一个HTTP response,转换为bytes,然后返回给发起请求的客户端。
在Netty中,byte在网络流通的基本抽象是Channel。 再看一下Netty的API说明:
网络socket或者网络组件的连接器(Nexus)可以实现I/O操作,例如:读、写、连接和绑定。
用Nexus来描述Channel是 很恰当的(来自dictionary.com对nexus的解释):
- a means of connection; tie; link
- a connected series or group
- the core or center, as of a matter or situation
hannel被提供给用户来与Netty交互的API。更符合实际情况的说法,Channel是socket的抽象描述。但是Channel不一定是一个socket,也可以是一个文件、或者其他抽象的东西。因此,Nexus很适合描述Channel。总而言之,Channel提供接口来连接目标,并写入数据。你或许会疑问为什么没有读操作?确实没有。要知道,这和上面提到的异步getWidgetCount方法一样,没有返回值。
所有基于Channel的方法可以分为2种:
- 同步的属性获取方法:用于提供Channel本身的信息。
- I/O操 作,例如:绑定、断开连接、写入。
类型2中的方法是异步的,他们都返回一个ChannelFuture的延迟对象。ChannelFuture是保存未知结果的容器,但是结果可用时肯定会被返回的。ChannelFuture有点像WidgetCountListener,不同的地方在于你需要把ChannelFuture注册到你的监听器中,而不是把监听器注册到你的代码流程中。(更简单的API,对吧?)你可以实现ChannelFutureListener接口来完成一个监听器,这个接口有一个方法会在操作完成的时候被调用:public void operationComplete(ChannelFuture future)。在操作完成后这个方法被调用,但并不意味这操作成功,所以这个延迟对象可以用来查询操作的最终结果-成功或失败。
Netty主要用于NIO,但Netty的Channel实现类支持老版本的、同步的IO(OIO)。OIO有一些优点,而且Netty对它的实现方式和NIO一样,这些实现模块能够被替换和重用。
创建Channel连接
Channel不是直接被创建的,而是通过ChannelFactory来创建。ChannelFactory有2种类别,一种用来实现客户端Channel,另一种用于服务器端的Channel。对于这2种分类,都有不同的实现类来处理相应的I/O通讯协议:
- TCP NIO Channels: NioClientSocketChannelFactory 和 NioServerSocketChannelFactory
- UDP NIO Channels: NioDatagramChannelFactory
- TCP OIO Channels: OioClientSocketChannelFactory和OioServerSocketChannelFactory
- UDP OIO Channels: OioDatagramChannelFactory
UDP的ChannelFactory在客户端和服务器端的实现相同的,因为UDP是无连接协议。还有其他2种类型:
- HTTP 客户端: HttpTunnelingClientSocketChannelFactory: 这个ChannelFactory能够方便地生成channel用于通过指定的Netty Servlet来 连接Netty服 务器。
- Local Channels: DefaultLocalClientChannelFactory 和 DefaultLocalServerChannelFactory 分别是客户端和服务器端In-VM类 型的Channel组 件,这种Channel貌似是网络Channel实际上是在同一个JVM中来处理调用。这样在某些情况下,本地请求可以通过统一的Channel抽象接口被分发、处理。
在本篇博客中主要涉及TCP NIO Channel, 但是要注意创建不同ChannelFactory的方式上有细微的差别。
TCP NIO ChannelFactory的构造方法使用相同类型的参数,还有一些重载方法。基本上,这个Factory需 要2个 线程池/Executors:
- Boss线程:由这个线程池提供的线程是boss种类的,用于创建、连接、绑定socket,然后把这些socket传 给worker线程池。在服务器端每个监听的socket都 有一个boss线 程来处理。在客户端,只有一个boss线程来处理所有的socket。
- Worker线程:Worker线 程执行所有的异步I/O。 他们不是通用的线程,开发人员需要注意不要把与其不同的任务赋给线程,这可能导致线程被阻塞、无法处理他们真正关心的任务,反过来会导致死锁和一些莫名其妙的性能问题。
在客户端只有一个boss线程,为什么NioClientSocketChannelFactory还需要一个Executors?
- boss线程能够被延迟加载,而且没有任务需要处理的时候可以被释放,但是在线程池中保留少量的线程比在需要的时候创建一个新的线程、然后在空闲的时候销毁它更有效率。
- 还有可能多个不同的ChannelFactory被创建,应该让这些ChannelFactory共用一个线程池,而不是每个工厂独享一个线程池。
因为NIO ChannelFactory是唯一可以异步的处理socket连接、服务器端socket的绑定,所以是唯一使用boss线 程池的ChannelFactory。 其他的种类有的使用虚拟连接(Local),有的是同步的连接(OIO) 或者无状态连接(UDP)。HttpTunelingClientSocketChannelFactory是客户端socket ChannelFactory的简单封装,是否使用boss线程是可选的,而且也没给它配置boss线程。
关于ChannelFactory需要注意:在Netty中处理逻辑的过程中,ChannelFactory需要申请资源,包括线程池。如果使用ChannelFactory之 后,一定要调用它的releaseExternalResources()方 法来保证它申请的所有资源被释放。
总之,发送东西到一个监听状态的服务器:
- 创建一个Channel。
- 将Channel连接到远程监听的socket。
- 调用Channel的write(Object message)方法。
传对象到Channel很灵活?不是如此,如果按照下面的方式做会出现什么?
Netty会抛出这个异常:
那支持什么样的对象呢?ChannelBuffer。但是Channel有一个叫做Pipeline的构造器(准确的说是ChannelPipeline)。一个Pipeline是一组拦截器组成的,这些拦截器能够处理和转换传给他们的值。当一个拦截器处理完成后,处理后的值会被传给下一个拦截器。这些拦截器被称作ChannelHandler。Pipeline会严格保证ChannelHandler实例的顺序。通常,第一个ChannelHandler会 接收一个原始的ChannelBuffer,而最后一个ChannelHandler(被称作Sink)会任何转给它的东西输出。在Pipeline的某个流程中,你可以实现一个ChannelHandler来做些有用的操作。ChannelHandler只是一个标识性接口,没有任何方法,因此处理器的实现很灵活,但是任何一个处理器都需要相应或者转发ChannelEvent(这里有很多专业的术语)。
这个ChannelEvent又是什么鬼东西呢?在这两段落中,把ChannelEvent当做一个含有ChannelBuffer的包裹。至于ChannelBuffer已经介绍过,它是Channel的基本数据单位。在下一篇中会有ChannelEvent的详细介绍。
ChannelEvent—真的吗?
ChannelEvent不仅仅包含ChannelBuffer。在Netty几乎所有的东西都是异步的,代码驱动模式中有些代码用来生成事件(例如连接、断开连接、写入),有些代码是在ChannelFactory线程池中执行时处理这些事件。在ChannelEvent的文档中有一个全面的列表介绍所有的事件及区别。事件处理器比较好的例子可以参考SimpleChannelHandler的文档。这个处理器实现类几乎为每种类型的事件都提供了方法。甚至还有一个IdleStateEvent事件,当一个Channel处于空闲状态会触发这个事件,非常有用。
为了实现一个不仅仅能支持简单的ChannelBuffer发送器/接收器,我们需要在Pipeline中添加ChannelHandler。通常情况下,ChannelHandler用作编码器和解码器,功能如下:
- 编码器:将一个非ChannelBuffer类 型的对象转换为适合传送到其他地方的ChannelBuffer。 这个对象或许不会直接被转换为ChannelBuffer,而是部分的被转化,然后由Pipeline中的其他处理器来完成整个流程。不管怎样,如果你不是直接发送ChannelBuffer数 据,那么Pipeline中的处理器会在这个数据被发送出去之前协同将它转换为ChannelBuffer。ObjectEncoder就是一个例子,它将普通的可序列化的Java对 象转换为ChannelBuffer中表示对象的字节。
- 解码器: 编码器的反向流程,将ChannelBuffer中的内容转换为有用的数据。ObjectEncoder对 应的解码器就是ObjectDecoder。
Netty SDK提供了不同种类的编解码器,例如Google ProtoBufs、Compression、Http Request/Response和Base64。
并不是所有的ChannelHandler都是编码器/解码器(尽管核心API中的大多数都是)。ChannelHandler可以用来做各种有用的事情。例如:
- ExecutionHandler: 将ChannelEvent转发给其他线程池。将事件从Worker(I/O 处 理)线程池中踢出来保证它一直有效。
- BlockingReadHandler:允许你的代码将一个Channel以非异步的方式来读取传入数据。
- LoggingHandler:仅仅记录当前哪些事件经过这个处理器了,是非常有用的调试工具……
那么我应该如何使用ChannelHandler来帮助我处理java.util.Data问题呢?答案是你需要在Channel的Pipeline中添加一个ObjectEncoder,它可以将Date转义为ChannelBuffer。下面展示了流程图,稍后会讲解代码:
[图片上传失败...(image-f08358-1530380241068)]
ObjectEncoder是Netty最好的特性之一。只要你在Pipeline的另一端使用Netty ObjectEncoder就能顺利的执行。如果不可以,你需要使用基于Java的标准序列化编码器CompatibleObjectEncoder 。
新的关注点:我们需要一个Channel,可以从ChannelFactory获取,此外还需要定义一个 ChannelPipeline。 有一些方式能够做到这些,但是Netty提 供了一个Bootstrap构建器能够将所有的构件很漂亮的包装在一起。下面展示了这些模块如何组装在一起,创建一个Netty客户端来发送Date。这个示例要实现 NioClientSocketChannelFactory来创建一个NioClientSocketChannel实例。顺便说一句,在大多数情况下Netty公共API只 是简明地返回一个SocketChannel、一个抽象的父类实例。你可以看作一个简单的Channel,实现了他们本身公共的相同行为和公开的功能。和你在其他代码中看到的相比,这个代码可能有些简单,但是我已经讲解的很清楚了。
Executor bossPool = Executors.newCachedThreadPool();
Executor workerPool = Executors.newCachedThreadPool();
ChannelFactory channelFactory = new NioClientSocketChannelFactory(bossPool, workerPool);
ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new ObjectEncoder()
}
};
Bootstrap boostrap = new ClientBootstrap(channelFactory);
boostrap.setPipelineFactory(pipelineFactory);
// Phew. Ok. We built all that. Now what ?
InetSocketAddress addressToConnectTo = new InetSocketAddress(remoteHost, remotePort);
ChannelFuture cf = bootstrap.connect(addressToConnectTo);
这就是如何获取一个Channel。实际上这是一个ChannelFuture。 在Netty中几乎所有的操作都是异步的,当你需要一个连接,创建这个连接的实际流程是异步的。因此,Bootstrap返 回一个ChannelFuture,用于处理连接创建完成的事件。ChannelFuture提供请求处理的状态,假定连接成功创建,并且提供一个Channel。 调用线程有多种方式等待连接创建完成,因为基于channels的异步操作(断开连接、写操作等)也返回ChannelFuture,这些方式也适用于这些操作,下面列出了这些方式:
【A】 等待戈多(戈多是指一次Channel请求)
不考虑等待Channel操作完成的实现策略,操作完成并不意味着操作成功,简单来说,完成时的结果有如下几种类型:
- 成功:操作成功完成。
- 失败:操作失败,可以通过ChannelFuture.getCause()来获取失败的原因(异常)。
- 超时:需要考虑的一种失败结果。
- 取消:调用ChannelFuture.cancle()来取消channel请 求。
话已至此,下面是如何等待操作完成:
-
等待(Await):Await是wait的另一种说法。(或许是为了避免和Object.wait()混淆?)ChannelFuture提供了一些方法是连接线程一直等待直到连接操作完成。注意这并不意味这线程会wait直到连接成功被创建,而在连接创建失败、创建超时、被取消的时候也会结束等待状态。如果感觉内容太多,可以跳过下面的章节,异步操作需要有不同类型的wait方式,但基本分为如下类型:
- Interruptibility:处于wait状态的线程可以被中止。这是线程生 命周期的本质特性,当一个线程被置为wait状态(sleep、join等)时,Java线程API一定会抛出一个需要处理的异常(InterruptedException)。然而ChannelFuture提供了一个不需要try/catch包装的调用方法awaitUninterruptibly,这个方法会忽略interrupt()调用。如果需要可中断的方法,则调用await。
- Timeout:因为各种原因,操作可能因不明原因一直被阻塞,明智的做法是在Channel操作上设置超时时间,可以保证在抛出超时异常之前,线程能够一直处于wait状态。
- 不等待:连接线程可以通过在ChannelFuture上注册一个监听器来实现自动引导模式,在操作完成后这个监听器会被触发。监听器需要实现ChannelFutureListener接口,操作完成之后会将ChannelFuture座位入参回调这个接口。
总之,下面的示例是关于wait种类的,使用连接操作做为唤醒wait的异步事件,同样也适用于大多数操作:
// Waiting on a connect. (Pick one)
ChannelFuture cf = bootstrap.connect(addressToConnectTo);
// A. wait interruptibly
cf.await();
// B. wait interruptibly with a timeout of 2000 ms.
cf.await(2000, TimeUnit.MILLISECONDS);
// C. wait uninterruptibly
cf.awaitUninterruptibly();
// D. wait uninterruptibly with a timeout of 2000 ms.
cf.awaitUninterruptibly(2000, TimeUnit.MILLISECONDS);
// E. add a ChannelFutureListener that writes the Date when the connect is complete
cf.addListener(new ChannelFutureListener(){
public void operationComplete(ChannelFuture future) throws Exception {
// chek to see if we succeeded
if(future.isSuccess()) {
Channel channel = future.getChannel();
channel.write(new Date());
// remember, the write is asynchronous too !
}
}
});
// if a wait option was selected and the connect did not fail,
// the Date can now be sent.
Channel channel = cf.getChannel();
channel.write(new Date());
因为你的客户端线程在等待的过程中可能不会有太多操作,awaitXXX很适合用于客户端应用。在服务器代码中(或许是一些代理服务器),因为很可能会有大量的任务在排队等待处理,线程不应该等待IO事件完成,更好的选择是连接事件的回调。
在Netty文档说明的很清晰, 但依然值得在这里强调:为了保证你的工作线程执行而不是等待,需要避免在工作线程中调用任何await方法。
连接创建完成最后一个或几个步骤是触发一些有用的操作。在上面的示例中,当链接成功创建 后会注册一个ChannelFutureEvent,并且在监听器中执行写操作。此处不清晰的地方当ChannelStateEvent广播的时候operationComplete方法会被回调。ChannelStateEvent是个ChannelEvent的一个实例,当前Channel状态修改后,会广播这个事件。ChannelFuture会调用一个预定义的回调接口处理这个事件,但是ChannelEvent会被传送到pipeline中的所有处理器中,所以你可以在某个处理器中实现连接创建后的操作。我之所以在这里指出这个流程是因为Netty大多示例代码实现方式很不直观。例如,ObjectEchoClient类完成了创建一个Channel所有的任务,却没有任何写入操作。实际的对象回写操作是在pipeline(ObjectEchoClientHandler)中最后一个处理器完成的,当处理器接收表示连接已创建的ChannelSateEvent事件后会被执行。
Server端的执行情况
根据DateSender,我们 只知道服务器端socket是用来监听客服端socket发送的数据。服务器端很像客户端的反面。
[图片上传失败...(image-41005a-1530380241068)]
服务器socket将承载从客户端接收的byte流的ChannelEvent送到pipeline中的处理器sink中。需要为pipeline配置解码器ObjectDecoder来将byte数组转换为Java对象,在本例中,Java对象是Date类型的。当Date被解码之后会被pipeline中的下一个处理器、自定义的DateHandler处理。你可能会注意到虽然Channels有一个write方 法,却没有相应的read方法。在同步API中 很多这种情况:一个线程需要等待直到数据可用才能从OutputStream(译注:这里应该是InputStream) 读取数据。这也是为什么上面的图表中没有一个指向ServerChannel箭头。同时,pipeline中最后一个处理器使用从客户端发送的所有解码 后的信息做一些有用的操作。
假设服务器端的Channel pipeline是 你真实的业务服务DateReceiver的 调用发起者,pipeline为DataReceiver提供合适的解码数据。当然,你可以创建一个只包含一个处理器的pipeline,使用这个处理器完成所有的操作,但是将多个小模块链接在一起来使用更灵活。例如,如果不仅仅发送一个Date,而是一个拥有300Date的数组,我还想在客户端和服务器的pipeline中添加Compression/Decompression处理器来减少网络传输过程中的负载量,实现这个需求只需要简单的修改pipeline的 配置就行了,而不是在那个无所不能的处理器中添加新代码。或者我想增加一些认证功能防止任何客户端都能给服务器端发送Date、调整Date的时区……模块化的方式更容易实现。
服务器端的DateSender很简单。创建一个Channel工厂、pipeline工 厂,将ObjectDecorder和自定义的DateHandler放到pipeline中。 使用与ClientBootstrap相 应的ServerBootstrap, 不用类似在客户端做的连接操作,然后将ServerBootstrap绑 定到服务器端socket来监听来自于客户端的请求。
public static void bootServer() {
// More terse code to setup the server
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())),
new DateHandler()
);
};
});
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress("0.0.0.0", 8080));
slog("Listening on 8080");
}
static class DateHandler extends SimpleChannelHandler {
public void messageReceived(ChannelHandlerContext ctx,MessageEvent e) throws Exception {
Date date = (Date)e.getMessage();
// Here's the REALLY important business service at the end of the pipeline
slog("Hey Guys ! I got a date ! [" + date + "]");
// Huh ?
super.messageReceived(ctx, e);
}
}
DateHandler继承了SimpleChannelHandler,这是一种好的方式:当你没有特殊的逻辑需要处理,可以使用定义好的回调。也就是说你不需要监听所有的ChannelEvents、确认是否是你需要的类型。在这种方式下,仅仅需要重 载表示接收数据的messageReceived回 调方法就可以了。
slog和clog是System.out.println的 简单封装,只是输出时的前缀不同:一个是[Server],一个是[Client],可以用来区别不同的输出。
messageReceived简单介绍
之前已经说过,messageReceived是一个当接收到数据时用于回调的方法。这个方法会处理属于ChannelEvent类型的事件:MessageEvent。为了获取MessageEvent中的数据信息,只需要很简单地调用getMessage()方法就能返回一 个java.lang.Object对象,这个对象能被转换为需要的类型数据。注意:如果在这个处理器前面没有转换处理器来转换数据,那么这个数据就是ChannelBuffer类 型的。在这个例子中,ObjectDecorder已经将ChannelBuffer中的数据流转换为java.util.Data类型的日期。至于其他的对象ChannelHandlerContext, 一会就会涉及。
假设业务中关键点就是记录接收 数据的日志,处理器已经从技术上完成了这个要求。那么第30行的代码( super.messageReceived(ctx, e);) 有什么作用呢?因为在pipeline有 可能还有其他的处理器来做进一步的处理,当处理完接收的数据之后总是通过这种方式传送数据是一种很好的方案。如果没有多余的处理 器,pipeline会丢弃这个数据。
如果想了解整个细节,可以去GitHub查 看DateSender的 源码,下面是代码执行的输出:
[Server]:Listening on 8080
[Client]:DateSender Example
[Client]:Issuing Channel Connect...
[Client]:Waiting for Channel Connect...
[Client]:Connected. Sending Date
[Server]:Hey Guys ! I got a date ! [Sat May 19 14:00:58 EDT 2012]
注意这个示例是单向的,只是将日期数据上传导服务器,并没有任何返回。如果 需要返回,在服务器端和客户端的pipeline的都需要相应的处理器, 在后面会讨论。
Server端返回日期数据
客户端和服务器端都需要双向的读写操作。例如DateSender示 例,如果服务器端想将日期数据增加任意的天数,然后将修改后的日期返回,应该如何做呢?有个代码几乎和DateSender相 同的示例:DateModifier ,这个示例的服务器端 修改日期然后将修改后的数据返 回给客户端。为了实现这个功能,需要做如下修改:
- 服务器端需要在pipeline中添加一个ObjectEncoder, 才能返回一个日期数据给客户端。
- 客户端接收从服务器端返回的日期数据需要在pipeline中添加ObjectDecorder。
- 客户端需要添加一个额外的处理器来利用接收到的数据完成一些逻辑。
下面代码创建了新客户端的pipeline:
ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new ObjectEncoder(),
// Next 2 Lines are new
new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())),
new ClientDateHandler()
);
}
};
客户端的处理器DateHandler非常简单,当客户端接收数据并且将数据解码之后会调用这个处理器
static class ClientDateHandler extends SimpleChannelHandler {
public void messageReceived(ChannelHandlerContext ctx,MessageEvent e) throws Exception {
Date date = (Date)e.getMessage();
clog("Hey Guys ! I got back a modified date ! [" + date + "]");
}
}
服务器端的pipeline几乎和客户端一样:
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())),
// Next 2 Lines are new
new ObjectEncoder(),
new ServerDateHandler()
);
};
});
下面的代码是新的服务器端日期处理器DateHandler,里面有一些新的代码:
static class ServerDateHandler extends SimpleChannelHandler {
Random random = new Random(System.nanoTime());
public void messageReceived(ChannelHandlerContext ctx,MessageEvent e) throws Exception {
Date date = (Date)e.getMessage();
// Here's the REALLY important business service at the end of the pipeline
long newTime = (date.getTime() + random.nextInt());
Date newDate = new Date(newTime);
slog("Hey Guys ! I got a date ! [" + date + "] and I modified it to [" + newDate + "]");
// Send back the reponse
Channel channel = e.getChannel();
ChannelFuture channelFuture = Channels.future(e.getChannel());
ChannelEvent responseEvent = new DownstreamMessageEvent(channel, channelFuture, newDate, channel.getRemoteAddress());
ctx.sendDownstream(responseEvent);
// But still send it upstream because there might be another handler
super.messageReceived(ctx, e);
}
}
在第10-13行是一些新代码基于一个精妙的理念,主要用于将修改后的日期返回给调用的客户端。
- 在第10行,从MessageEvent中获取了一个Channel的引用。
- 日期数据即将被异步(会出 现重复调用写操作的风险)写回客户端,因此总会调用ChannelFuture。在第11行,使用Channels创建一个ChannelFuture对象,Channels有 很多有用的类似future的静态方法。
- 第12行创建一个新MessageEvent:DownstreamMessageEvent。基本 上,返回值已经被打包准备回传给客户端。
- 第13行调用ChannelHandlerContext的sendDownstream方法将MessageEvent发送给客户端。
有什么需要注意的吗?ChannelHandlerContext基本上是pipeline功能上的引用。它能和pipeline一样 访问所有的处理器。更重要的是,它提供了一种和接收信息一样的路径来将数据发送回去。通过如下方法实现:
- 注意,在pipeline中除了ServerDateHandler之外可能还有其他的处理器,希望响应客户端将修改后的日期数据发送回去。然后,将当前接收的数据继续传送到pipeline中的下一个处理器。
- 如果直接将返回的日期数据直接写到Channel中,日期数据会从pipeline的上层开始推送到某些不期望被调用的处理器中。
所有这些东西引出了一 些额外的细节:pipeline中的处理器如何处理工作的,并解释了Upstream和Downstream。
Upstream、Downstream
你可能会有疑问ObjectEncoder和ObjectDecorder在pipeline的一个处理流程中为什么不会被混淆调用,毕竟它们都在同一个pipeline中,如何做到不被同时调用呢?这是因为处理器被声明Upstream类型、Downstream类型或者Upstream+Downstream类型。当pipeline发送数据给远端会调用Downstream处理器,当从远端读取数据的时候会调用Upstream处理器。如果这些专业不够直观,可以类比下面的描述:
如果需要学习东西,你需要阅读(read up)然后做笔记(write down)。无论如何,pipeline中的处理器需要直接或者间接实现下面的某个或者所有接口:
- org.jboss.netty.channel.ChannelDownstreamHandler:由downstream调用。
- org.jboss.netty.channel.ChannelUpstreamHandler: 由upstream调用。
因此,在downstream写请求的时候,pipeline安排数据传送给所有实现ChannelDownstreamHandler的处理器,读请求也是相同的流程。需要注意的是,一个处理器可以同时实现这2个接口,就可以处理upstream和download事件了。
[图片上传失败...(image-707ec5-1530380241068)]
Upstream和Downstream作用于客户端和服务器端。在客户端和服务器端的一次会话中,双方都需要一对设定正确顺序的编码器和解码器。下解释了一个简单的HTTP服务器给客户端请求提供文件的流程:
[图片上传失败...(image-2868c4-1530380241068)]
pipleline中ChannelHandlers 的执行顺序
为了实现有序的修改数据,pipeline严格保持处理器的顺序。例如一个pipeline类似这样:
1、JSON 编码器
- 接收: 对象部件
- 生成: 根据对象生成的JSON字符串
2、String 编码器
- 接收: 字符串
- 生成: 容纳编码后的字符串的ChannelBuffer
如果这些编码的顺序偶然被调转,String编码器就会接收到一个对象部件。String编码器认为这是一个字符串,然后就会抛出类型转换异常。有一些构造器负责执行pipeline中的处理顺序。最简单的是org.jboss.netty.channel.Channels类中的pipeline静态方法,它创建了一个用于存储handlers的pipeline,pipeline中handler的顺序以数组中的顺序为准。构造器的声明如下:
static ChannelPipeline Channels.pipeline(ChannelHandler...handlers)
在pipeline的结构允许指定处理器的顺序。最简单的是org.jboss.netty.channel.Channels类中的静态方法 pipeline,会创建一个pipeline然后根据传入的处理器数组元素的顺序将处理器存放到pipeline中。方法:
static ChannelPipeline Channels.pipeline(ChannelHandler...handlers)
在ChannelPipeline接口中提供了可以将处理器添加到指定位置的方法,因此我们可以给pipeline中的处理器指定特定的逻辑名称来标识每个处理器的唯一性。名称可以是任意的字符串,但是有时能够根据名称来访问处理器,或者通过名称来确定处理器的位置。处理器的名称是必须存在的,如果没有人为设定,会被自动指定一个名称。pipeline中的方法如下:
- addAfter(String baseName, String name, ChannelHandler handler):在pipeline中添加一个处理器放置到名为baseName的处理器后面,并且将新添加的处理器命名为name。
- addBefore(String baseName, String name, ChannelHandler handler):在pipeline中添加一个处理器放置到名为baseName的处理器前面,并且将新添加的处理器命名为name。
- addFirst(String name, ChannelHandler handler):在pipeline中添加一个处理器放置到pipeline第一个位置,并且将新添加的处理器命名为name。
- addLast(String name, ChannelHandler handler):在pipeline中添加一个处理器放置到pipeline最后一个位置, 并且将新添加的处理器命名为name。
动态修改pipeline中的ChannelHandler
pipeline另一个特性是可以被修改的,所以在运行过程中可以添加或者删除pipeline的处理器。为了实现这个特性,Netty的pipeline是线程安全的。上面的4个方法可以在运行时被调用来调整pipeline适应某种特定的执行流程或转换状态。
实际上有个不可被修改的pipeline实现:StaticChannelPipeline。如果这个对象被创建后就不可以被修改,因为pipeline在运行过程中不需要考虑会被修改,可以提供更好的性能。
在创建pipeline的过程中已经完成了这些繁琐的步骤,为什么还需要修改pipeline中的处理器呢?这里有个假设示例:在客户端的pipeline中有一串的处理器,最后一个处理器是ZlibEncoder,用于在数据发送之前压缩数据。如果数据量少于1024byte,压缩操作会影响性能:降低性能。这意味着需要有条件的使用压缩处理器处理数据。在将数据传入到处理器之前是不知道数据的大小的,不确定是创建一个支持压缩的pipeline或者一个不支持压缩的pipeline。为了解决这个问题,可以创建一个内容审计处理器用来检测数据的大小来决定是否需要压缩处理器。将这个审计处理放置在压缩处理器之前、其他处理器之后就可以了。
现在,你可以得出有多种方案,这个场景是假设的,请听我继续。
首先,创建一个支持压缩特性的pipeline,然后注意ConditionalCompressionHandler,现在 pipeline是这样的:
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
/// Add all the pre-amble handlers
//pipeline.addLast("Foo", new SomeHandler());
//pipeline.addLast("Bar", new SomeOtherHandler());
// etc.
pipeline.addLast("IAmTheDecider", new ConditionalCompressionHandler(1024, "MyCompressionHandler"));
pipeline.addLast("MyCompressionHandler", new ZlibEncoder());
return pipeline;
}
ConditionalCompressionHandler一定要检查传输的ChannelBuffer的大小,然后决定压缩处理器是否会被调用。然而如果压缩处理器被移除,下次传入的数据需要被压缩,那么就要将压缩处理器重新添加到pipeline中。
public class ConditionalCompressionHandler extends SimpleChannelDownstreamHandler {
/** The minimum size of a payload to be compressed */
protected final int sizeThreshold;
/** The name of the handler to remove if the payload is smaller than specified sizeThreshold */
protected final String nameOfCompressionHandler;
/** The compression handler */
protected volatile ChannelHandler compressionHandler = null;
/**
* Creates a new ConditionalCompressionHandler
* @param sizeThreshold The minimum size of a payload to be compressed
* @param nameOfCompressionHandler The name of the handler to remove if the payload is smaller than specified sizeThreshold
*/
public ConditionalCompressionHandler(int sizeThreshold, String nameOfCompressionHandler) {
this.sizeThreshold = sizeThreshold;
this.nameOfCompressionHandler = nameOfCompressionHandler;
}
/**
* see org.jboss.netty.channel.SimpleChannelDownstreamHandler
*/
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) {
// If the message is not a ChannelBuffer, hello ClassCastException !
ChannelBuffer cb = (ChannelBuffer)e.getMessage();
// Check to see if we already removed the handler
boolean pipelineContainsCompressor = ctx.getPipeline().getContext(nameOfCompressionHandler)!=null;
if(cb.readableBytes() < sizeThreshold) {
if(pipelineContainsCompressor) {
// The payload is too small to be compressed but the pipeline contains the compression handler
// so we need to remove it.
compressionHandler = ctx.getPipeline().remove(nameOfCompressionHandler);
}
} else {
// We want to compress the payload, let's make sure the compressor is there
if(!pipelineContainsCompressor) {
// Oops, it's not there, so lets put it in
ctx.getPipeline().addAfter(ctx.getName(), nameOfCompressionHandler , compressionHandler);
}
}
}
}
ChannelHandlerContext
在响应所有的处理器事件的时候,会将ChannelHandlerContext传递给处理器。上述的示例中主要展示ChannelHandlerContext是一个用于ChannelHandler和容纳此Handler的ChannelPipeline交互的适配器。而且,还允许一个处理器查询其他处理器,并且间接和其他处理器进行交互。
- 在26行,确认压缩处理器是否在ChannelHandlerContext的pipeline中是否存在,然后从 ChannelHandlerContext中获取压缩处理器.如果处理器不存在,当前调用返回null。
- 在31行,访问pipeline根据处理器的名称删除压缩处理器。
- 在37行,访问pipeline将压缩处理器添加到pipeline中。通过ChannelHandlerContext的getName()方法获取当前处理器的名称。