Netty学习--传输

2019-01-09  本文已影响0人  何何与呵呵呵
传输迁移
public class PlainOioServer {
    public void serve(int port) throws IOException {
        // 将服务器绑定到指定端口
        final ServerSocket socket = new ServerSocket(port);
        try {
            for (;;) {
                final Socket clientSocket = socket.accept(); // 接受连接
                System.out.println("Accepted connection from " + clientSocket);
                new Thread(new Runnable() { // 创建一个新的线程来处理该连接
                    @Override
                    public void run() {
                        OutputStream out;
                        try {
                            out = clientSocket.getOutputStream();
                            out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8"))); // 将消息写给已连接的客户端
                            out.flush();
                            clientSocket.close(); // 关闭连接
                        }
                        catch (IOException e) {
                            e.printStackTrace();
                        }
                        finally {
                            try {
                                clientSocket.close();
                            }
                            catch (IOException ex) {
                                // ignore on close
                            }
                        }
                    }
                }).start();
            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }
}
public void serve(int port) throws IOException {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        ServerSocket ssocket = serverChannel.socket();
        InetSocketAddress address = new InetSocketAddress(port);
        ssocket.bind(address); // 将服务器绑定到选定的端口
        Selector selector = Selector.open(); // 打开Selector来处理 Channel
        serverChannel.register(selector, SelectionKey.OP_ACCEPT); // 将ServerSocket注册到Selector以接受连接
        final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
        for (;;) {
            try {
                selector.select(); // 等待需要处理的新事件;阻塞将一直持续到下一个传入事件
            } catch (IOException ex) {
                ex.printStackTrace();
                // handle exception
                break;
            }
            Set<SelectionKey> readyKeys = selector.selectedKeys(); // 获取所有接收事件的Selection-Key实例
            Iterator<SelectionKey> iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                try {
                    if (key.isAcceptable()) { // 检查事件是否是一个新的已经就绪可以被接受的连接
                        ServerSocketChannel server =
                                (ServerSocketChannel)key.channel();
                        SocketChannel client = server.accept();
                        client.configureBlocking(false);
                        // 接受客户端,并将它注册到选择器
                        client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate());
                        System.out.println("Accepted connection from " + client);
                    }
                    if (key.isWritable()) { // 检查套接字是否已经准备好写数据
                        SocketChannel client =
                                (SocketChannel)key.channel();
                        ByteBuffer buffer =
                                (ByteBuffer)key.attachment();
                        while (buffer.hasRemaining()) {
                            if (client.write(buffer) == 0) { // 将数据写到已连接的客户端
                                break;
                            }
                        }
                        client.close();
                    }
                } catch (IOException ex) {
                    key.cancel();
                    try {
                        key.channel().close();
                    } catch (IOException cex) {
                      // ignore on close
                    }
                }
            }
        }
    }
 public void server(int port) throws Exception {
        final ByteBuf buf = Unpooled.unreleasableBuffer(
                Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
        EventLoopGroup group = new OioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // 创建Server-Bootstrap
            b.group(group)
                    .channel(OioServerSocketChannel.class) // 使用OioEventLoopGroup以允许阻塞模式(旧的I/O)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() { // 指定Channel-Initializer,对于每个已接受的连接都调用它
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                            ch.pipeline().addLast(
                                    new ChannelInboundHandlerAdapter() {
                                        @Override
                                        public void channelActive(
                                                ChannelHandlerContext ctx)
                                                throws Exception {
                                            ctx.writeAndFlush(buf.duplicate()) // 将消息写到客户端,并添加ChannelFutureListener,以便消息一被写完就关闭连接
                                                    .addListener(
                                                            ChannelFutureListener.CLOSE);
                                        }
                                    });
                        }
                    });
            ChannelFuture f = b.bind().sync(); // 绑定服务器以接受连接
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();// 释放所有的资源
        }
    }
 public void server(int port) throws Exception {
        final ByteBuf buf = Unpooled.unreleasableBuffer(
                Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
       EventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // 创建Server-Bootstrap
            b.group(group).channel(NioServerSocketChannel.class) // 使用OioEventLoopGroup以允许阻塞模式(旧的I/O)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() { // 指定Channel-Initializer,对于每个已接受的连接都调用它
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                            ch.pipeline().addLast(
                                    new ChannelInboundHandlerAdapter() {
                                        @Override
                                        public void channelActive(
                                                ChannelHandlerContext ctx)
                                                throws Exception {
                                            ctx.writeAndFlush(buf.duplicate()) // 将消息写到客户端,并添加ChannelFutureListener,以便消息一被写完就关闭连接
                                                    .addListener(
                                                            ChannelFutureListener.CLOSE);
                                        }
                                    });
                        }
                    });
            ChannelFuture f = b.bind().sync(); // 绑定服务器以接受连接
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();// 释放所有的资源
        }
    }

通过代码比较,netty基本不受影响(通用性强).

传输API
Channel 接口的层次结构

如图所示,每个Channel 都将会被分配一个ChannelPipeline 和ChannelConfig。ChannelConfig 包含了该Channel 的所有配置设置,并且支持热更新。由于特定的传输可能具有独特的设置,所以它可能会实现一个ChannelConfig 的子类型。Comparable保证每一个channel都是独一无二的.
ChannelPipeline 持有所有将应用于入站和出站数据以及事件的ChannelHandler 实例.

  • 将数据从一种格式转换为另一种格式;
  • 提供异常的通知;
  • 提供Channel 变为活动的或者非活动的通知;
  • 提供当Channel 注册到EventLoop 或者从EventLoop 注销时的通知;
  • 提供有关用户自定义事件的通知。

channel是线程安全的,多个线程可以共用一个channel.

内置的传输
NIO——非阻塞I/O
选择并处理状态的变化
Epoll—用于Linux 的本地非阻塞传输

流程和NIO一样,失去了NIO的通用性,但运行在linux上更快,只需要将NioEventLoopGroup替换成EpollEventLoopGroup,并且将NioServerSocketChannel.class 替换为EpollServerSocketChannel.class 即可.

OIO—旧的阻塞I/O

建立在java.net 包的阻塞实现之上,不是异步的.Netty利用了SO_TIMEOUT这个Socket标志,它指定了等待一个I/O操作完成的最大毫秒数。如果操作在指定的时间间隔内没有完成,则将会抛出一个SocketTimeout Exception。Netty将捕获这个异常并继续处理循环。在EventLoop下一次运行时,它将再次尝试。这实际上也是类似于Netty这样的异步框架能够支持OIO的唯一方式.

OIO 的处理逻辑
用于JVM 内部通信的Local 传输

用于在同一个JVM 中运行的客户端和服务器程序之间的异步通信.

Embedded 传输

可以将一组ChannelHandler 作为帮助器类嵌入到其他的ChannelHandler 内部。通过这种方式,你将可以扩展一个ChannelHandler 的功能,而又不需要修改其内部代码。

上一篇下一篇

猜你喜欢

热点阅读