netty笔记
Netty是一个高性能事件驱动的异步的非堵塞的IO(NIO)框架,用于建立TCP等底层的连接,基于Netty可以建立高性能的Http服务器。
非堵塞IO(NIO):NIO和IO有相同的作用和目的,但实现方式不同,NIO主要用到的是块,所以NIO的效率要比IO高很多。
Java API中提供了两套NIO,一套是针对标准输入输出NIO,另一套就是网络编程NIO。
**Buffer和Channel是标准NIO中的核心对象
Channel是对原IO中流的模拟,任何来源和目的数据都必须通过一个Channel对象。
一个Buffer实质上是一个容器对象,发给Channel的所有对象都必须先放到Buffer中;同样的,从Channel中读取的任何数据都要读到Buffer中。
网络编程NIO中还有一个核心对象Selector,它可以注册到很多个Channel上,监听各个Channel上发生的事件,并且能够根据事件情况决定Channel读写。
通过一个线程管理多个Channel,就可以处理大量网络连接了。 image就是注册对各种 I/O 事件兴趣的地方,而且当那些事件发生时,就是这个对象告诉你所发生的事件。
Selector selector = Selector.open(); //创建一个selector
为了能让Channel和Selector配合使用,我们需要把Channel注册到Selector上。通过调用channel.register()方法来实现注册:
channel.configureBlocking(false); //设置成异步IO
SelectionKey key=channel.register(selector,SelectionKey.OP_READ); //对所关心的事件进行注册(connet,accept,read,write)
SelectionKey 代表这个通道在此 Selector 上的这个注册。
CallBack:回调是异步处理经常用到的编程模式,回调函数通常被绑定到一个方法上,并且在方法完成之后才执行,这种处理方式在javascript当中得到了充分的运用。回调给我们带来的难题是当一个问题处理过程中涉及很多回调时,代码是很难读的。
Futures:Futures是一种抽象,它代表一个事情的执行过程中的一些关键点,我们通过Future就可以知道任务的执行情况,比如当任务没完成时我们可以做一些其它事情。它给我们带来的难题是我们需要去判断future的值来确定任务的执行状态。
netty到底怎么工作的呢?
public class EchoServer {
private final static int port = 8007;
public void start() throws InterruptedException{
//引导辅助程序
ServerBootstrap bootstrap = new ServerBootstrap();
//通过nio的方式接受连接和处理连接
EventLoopGroup group = new NioEventLoopGroup();
try {
bootstrap.group(group)
.channel(NioServerSocketChannel.class) //设置nio类型的channel
.localAddress(new InetSocketAddress(port)) //设置监听端口
//有连接到达时会创建一个channel
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// pipline 管理channel中的handler,在channel队列中添加一个handler来处理业务
ch.pipeline().addLast("myHandler", new EchoServerHandler());
//ch.pipeline().addLast("idleStateHandler",new IdleStateHandler(0, 0, 180));
}
});
//配置完成,绑定server,并通过sync同步方法阻塞直到绑定成功
ChannelFuture future = bootstrap.bind().sync();
System.out.println(EchoServer.class.getName() + " started and listen on " + future.channel().localAddress());
future.channel().closeFuture().sync(); //应用程序会一直等待,直到channel关闭
} catch (Exception e) {
e.getMessage();
}finally {
group.shutdownGracefully().sync();
}
}
- 创建一个ServerBootstrap实例
- 创建一个EventLoopGroup来处理各种事件,如处理链接请求,发送接收数据等。
- 定义本地InetSocketAddress( port)好让Server绑定
- 创建childHandler来处理每一个链接请求
- 所有准备好之后调用ServerBootstrap.bind()方法绑定Server
handler 处理核心业务
@Sharable //注解@Sharable可以让它在channels间共享
public class EchoServerHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception {
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes("Hello World".getBytes());
ctx.write(buf);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客户端 连接
public class EchoClient {
private final int port;
private final String hostIp;
public EchoClient(int port, String hostIp) {
this.port = port;
this.hostIp = hostIp;
}
public void start() throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup group = new NioEventLoopGroup();
try {
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(hostIp, port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture future = bootstrap.connect().sync();
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("client connected");
} else {
System.out.println("server attemp failed");
future.cause().printStackTrace();
}
}
});
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully().sync();
}
}
客户端handler
@Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf>{
/**
*此方法会在连接到服务器后被调用
* */
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("Netty rocks!");
ctx.write(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
}
/**
* 接收到服务器数据时调用
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("Client received: " + ByteBufUtil.hexDump(msg.readBytes(msg.readableBytes())));
}
/**
*捕捉到异常
* */
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
BOOTSTRAP
Netty 应用程序通过设置 bootstrap(引导)类的开始,该类提供了一个 用于应用程序网络层配置的容器。
通过Bootstrap可以轻松的去配置并启动应用
服务器端
imageServerChannel实现负责创建子 Channel,它代表接受连接
客户端
image在调用 bind() 或 connect() 之后,Bootstrap 类负责创建管道给客户或应用程序,
CHANNEL
底层网络传输 API 必须提供给应用 I/O操作的接口,如读,写,连接,绑定等等。
对于我们来说,这层结构几乎总是会成为一个“socket”(一个通道代表了一个 socket 链接,或者能够进行IO处理的组件,因此这里用EventLoop来管理)。 Netty 中的接口 Channel 定义了与 socket 丰富交互的操作集:bind, close, config, connect, isActive, isOpen, isWritable, read, write 等等。 Netty 提供大量的 Channel 实现来专门使用。这些包括AbstractChannel,AbstractNioByteChannel,AbstractNioChannel,EmbeddedChannel, LocalServerChannel,NioSocketChannel 等等。
CHANNELHANDLER
ChannelHandler支持很多协议,并且提供用于数据处理的容器。Netty使用handler回调对象去处理特定的事件
我们已经知道 ChannelHandler 由特定事件触发。 ChannelHandler 可专用于几乎所有的动作,包括将一个对象转为字节(或相反),执行过程中抛出的异常处理。
常用的一个接口是 ChannelInboundHandler,这个类型接收到入站事件(包括接收到的数据)可以处理应用程序逻辑。当你需要提供响应时,你也可以从 ChannelInboundHandler 冲刷数据。一句话,业务逻辑经常存活于一个或者多个 ChannelInboundHandler。
ChannelInitializer
ChannelInitializer:那我们怎么去绑定 ChannelHandler 去处理我们需要发送或者接收的消息呢?这里就用到ChannelInitializer,它的指责就是将 ChannelHandler 的实现加入到 ChannelPipeline。(事实上ChannelInitializer本身就是一个ChannelHandler,只不过这个handler会在加入其他handler的同时将自己从ChannelPipeline中移除)
CHANNELPIPELINE
ChannelPipeline 提供了一个容器给 ChannelHandler 链并提供了一个API 用于管理沿着链入站和出站事件的流动。每个 Channel 都有自己的ChannelPipeline,当 Channel 创建时自动创建的。 ChannelHandler 是如何安装在 ChannelPipeline? 主要是实现了ChannelHandler 的抽象 ChannelInitializer。ChannelInitializer子类 通过 ServerBootstrap 进行注册。当它的方法 initChannel() 被调用时,这个对象将安装自定义的 ChannelHandler 集到 pipeline。当这个操作完成时,ChannelInitializer 子类则 从 ChannelPipeline 自动删除自身。
image image- Channel 绑定到 ChannelPipeline
- ChannelPipeline 绑定到 包含 ChannelHandler 的 Channel
- ChannelHandler
- 当添加 ChannelHandler 到 ChannelPipeline 时,ChannelHandlerContext 被创建
EVENTLOOP
EventLoop 用于处理 Channel 的 I/O 操作。一个单一的 EventLoop通常会处理多个 Channel 事件。一个 EventLoopGroup 可以含有多于一个的 EventLoop 和 提供了一种迭代用于检索清单中的下一个。
简单点来说,EventLoopGroup 是一个线程池,EventLoop是其中一个线程
但netty并不仅限于此
一旦 Channel 是分配给一个 EventLoop,它将使用这个 EventLoop 在它的生命周期里和同样的线程。你可以,也应该,依靠这个,因为它可以确保你不需要担心同步(包括线程安全、可见性和同步)在你 ChannelHandler实现。
而 EventLoopGroup 包含了了多个 EventLoop ,并能用于去获取 EventLoop。
CHANNELFUTURE
Netty 所有的 I/O 操作都是异步。因为一个操作可能无法立即返回,我们需要有一种方法在以后确定它的结果。出于这个目的,Netty 提供了接口 ChannelFuture,它的 addListener 方法注册了一个 ChannelFutureListener ,当操作完成时,可以被通知(不管成功与否)。
EventLoop
Netty 是一个非阻塞的,事件驱动的网络框架。初看,Netty是用多线程来处理IO事件的。接触过多线程编程的人可能会想,在这样需要同步我们的代码。但事实上,Netty的设计使我们不需要做过多的这些考虑。
image- Netty使用 EventLoopGroup 的组件里面有一个或者多个 EventLoop。
- 一,当一个通道(Channel)被注册进来,从EventLoopGroup中获取到一个EventLoop-n。
- 二,Netty会绑定这个通道到EventLoop (当然也是在一个单独的线程中),并且这个通道的生命周期只会与这一个 EventLoop绑定。这也就是为什么在我们的应用在Netty框架下不需要做同步处理(所有的IO操作都是在给定的通道及同一个线程中)
- 新来的消息交给EventLoop-n处理
如图:EventLoop 和 EventLoopGroup 是一种 "is-a"关系,EventLoopGroup是EventLoop的一个子类
BootStrap & ServeBootStrap
BootStrap:用于创建客户端;
ServerBootStrap:用于创建服务端;
不同点一:
ServerBootStrap 绑定到一个端口去监听客户端的链接;BootStrap 通常调用 connect() / bind(),然后在稍后使用 Channel (包含在ChannelFuture中)来进行连接。
不同点二:
客户端 BootStrap 使用一个单独的EventLoopGroup;然而,ServerBootStrap 使用两个 EventLoopGroup (事实上使用同一个也是可以的),第一个集合包含一个单独的 ServerChannel 代表服务端自己的socket(这个socket被绑定到本地的一个端口上了),第二个集合包含所有的服务端接收的链接通道。
image如图,EventLoopGroupA 唯一的目的是接收链接然后将它们交付到 EventLoopGroupB。
Netty这样做的根本目的是为了客服链接瓶颈。在一个高并发的场景下,可能会有极其多的链接接入,当只有一个Group时,处理已有链接已经很繁忙,以至于无法接收新的链接,这最终会导致很多链接会超时。而使用两个Group,接收链接和处理链接分开,这样所有的链接都可以被接收。
EventLoopGroup 可能包含多个EventLoop(不过也取决与我们的具体配置),每一个通道会有一个 EventLoop 与它绑定并且在整个生命周期内都不会更换。不过,由于 EventLoopGroup 中的 EventLoop 会比通道少,所以会有很多通道共享一个 EventLoop,这也意味着在同一个 EventLoop 中,一个通道处理繁忙的话,将不允许去处理其他的通道,因此不要使用阻塞EventLoop的原因。
image
如图,当只有一个group时,同一个实例会被使用两次。
ChannelHandler
我们很容易想到 ChannelHandler 是用来处理数据流的,但是实际上 ChannelHandler 还能有很多其他的应用。
image
如图,从类继承关系上可以看出,我们有两种 ChannelHandler,也反映出数据流是双向的(数据可以从我们的应用向外流出,也能从远端流入我们的应用)。
数据从一段流到另一端的过程中,会经过一个或者多个 ChannelHandler 的处理。这个 ChannelHandler 会被加入到应用中,并且它们加入的顺序决定了它们处理数据的顺序。
既然会设计到多个 ChannelHandler 协作,必然会有一定的规则需要遵守。这里的规则很简单:ChannelPipeline 就是这写 ChannelHandler 的约束。每一个 ChannelHandler 处理完自己的部分后都会将数据传递到同一个 ChannelPipeline 中的下一个 ChannelHandler,直到没有 ChannelHandler 为止。
image。
如图:反映了 ChannelInboundHandler 和 ChannelOutboundHandler 能够同时存在于一个 ChannelPipeline 中。
由于我们的 ChannelHandler 通常实现自 ChannelInboundHandler 或 ChannelOutboundHandler 所以Netty会知道各个handler的类型,这样在一个流出的事件中就可以跳过所有的 ChannelInboundHandler。
每一个加入 ChannelPipeline 中的 ChannelHandler 会得到一个 ChannelHandlerContext。通常获得 ChannelHandlerContext 的引用是安全的,但是在 UDP 协议下可能不一定。 这个 ChannelHandlerContext 可以用于获取底层的 channel 用于 write/send 消息。这样就存在两种方式来发送消息:直接写到通道 或者 通过 ChannelHandlerContext 来写消息,它们的主要区别是,直接写到通道中的消息会从 ChannelPipeline 的尾部开始,写到 ChannelHandlerContext 中的消息会传递给下一个handler
通过回调方法中携带的 ChannelHandlerContext 参数,我们可以将一个事件可以定向到下一个 ChannelInboundHandler 或者 前一个 ChannelOutboundHandler 中。(Netty为我们提供的抽象基类 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 只提供单方向的传递,但是我们不需要手动调用传递方法)
Encoder & Decoder
每一个通道都有传递Netty事件的职责,Netty类中 Adapter 结尾的类帮我们实现了这一过程,这样我们不需要去关注这部分的工作,我们只需要去处理我们感兴趣的部分。除了 Adapter 的类外,同样还有很多其他功能扩展的类我们可以使用,比如 encode/decode 消息。
当我们接收到消息时,我们必须将其从 bytes 转化成 Java对象。当发送消息时,我们同样需要将消息从Java对象转换成bytes。这样的操作很频繁,因此Netty为我们提供了很多基础类,类似于 ByteToMessageDecoder 和 MessageToByteEncoder 就提供这样的功能。我们应用中用的最多的可能是读取消息并解码然后再进行一系列的其他处理,我们可以继承 SimpleChannelInboundHandler<T> (T 就是我们要处理的消息类型),这个handler的主要方法channelRead0(ChannelHandlerContext,T),不能何时调用该方法,T 对象就是我们要处理的消息。
。
在IO线程中,不能进行阻塞的操作。Netty 允许在添加 ChannelHandler 到 ChannelPipeline 中时指定一个 EventExecutorGroup, 它会被用于获取一个 EventExecutor 对象,这个 EventExecutor 将用于执行所有的ChannelHandler的操作(EventExecutor 会使用一个另外的线程)