深入浅出Netty源码剖析netty程序员

Netty4(三):快速入门

2018-03-15  本文已影响166人  聪明的奇瑞
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.22.Final</version>
</dependency>

TIME 协议(服务器)

目标:编写一个 TIME 协议,服务器端在接收到客户端的连接时会向客户端发送一个 32 位的时间戳,并且一旦消息发送成功就会立即关闭

编写 Handler

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) {
        final ByteBuf time = ctx.alloc().buffer(4);
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        final ChannelFuture f = ctx.writeAndFlush(time);
        f.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

编写服务器类

public class Server {

    private int port;

    public Server(int port) {
        this.port = port;
    }

    public void run() {
        EventLoopGroup bossGroup = new NioEventLoopGroup();         // 处理客户端连接事件的线程池
        EventLoopGroup workerGroup = new NioEventLoopGroup();       // 处理连接后所有事件的线程池
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();      // NIO 服务的辅助启动类
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)          // 指定连接该服务器的 Channel 类型为 NioServerSocketChannel
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new TimeServerHandler());              // 指定需要执行的 Handler
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)          // 设置 bossGroup 的相关参数
                    .childOption(ChannelOption.SO_KEEPALIVE, true);         // 设置 workerGroup 相关参数

            ChannelFuture f = bootstrap.bind(port).sync();          // 绑定端口,调用 ChannelFuture 的 sync() 阻塞方法等待绑定完成
            // 调用 closeFuture() 方法返回此通道关闭时的 ChannelFuture
            // 调用 ChannelFuture 的 sync() 阻塞方法直到服务端关闭链路之后才退出 main() 函数
            f.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 优雅退出机制。。。退出线程池(该方法源码没读过,也不知怎么个优雅方式)
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }


    public static void main(String[] args) {
        int port = (args.length > 0) ? Integer.parseInt(args[0]) : 8080;
        new Server(port).run();
    }
}

TIME 协议(客户端)

目标:连接服务端并接收服务端发送的时间戳消息,输出到控制台

编写 Handler

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; 
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

编写客户端类

public class Client {
    public static void main(String[] args) throws Exception {
        String host = (args.length == 1) ? args[0] : "localhost";
        int port = (args.length == 2) ? Integer.parseInt(args[1]) : 8080;

        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(workerGroup);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
                }
            });

            ChannelFuture f = bootstrap.connect(host, port).sync();     // 连接服务端,调用 ChannelFuture 的 sync() 阻塞方法等待连接完成
            // 调用 closeFuture() 方法返回此通道关闭时的 ChannelFuture
            // 调用 ChannelFuture 的 sync() 阻塞方法直到客户端关闭链路之后才退出 main() 函数
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

处理基于流的传输

回到 TIME 客户端例子,服务端发送的数据是一个 32位 的时间戳,如果服务端发送了 16位 的数据呢,那客户端读取的数据就不准确了

解决方法一

public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    private ByteBuf byteBuf;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        byteBuf = ctx.alloc().buffer(4);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        byteBuf.release();
        byteBuf = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        byteBuf.writeBytes(m);
        m.release();

        if (byteBuf.readableBytes() >= 4){
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

解决方法二

public class TimeDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        if (byteBuf.readableBytes() < 4) {
            return;
        }
        list.add(byteBuf.readBytes(4));
    }
}
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(),new TimeClientHandler());
    }
});

用 POJO 代替 ByteBuf

之前例子使用 ByteBuf 作为协议消息的数据结构,目前读取的仅仅是一个 32 位 的数据,直接使用 ByteBuf 不是问题,然而在真实的协议中,数据量肯定不止如此,通过 ByteBuf 处理数据将变的复杂困难,因此下面介绍如何使用 POJO(普通 Java 对象) 代替 ByteBuf

public class UnixTime {
    private final long value;

    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }

    public UnixTime(long value) {
        this.value = value;
    }

    public long value() {
        return value;
    }

    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}
protected void decode(ChannelHandlerContext channelHandlerContext,ByteBuf byteBuf, List<Object> list) throws Exception {
    if (byteBuf.readableBytes() < 4) {
        return;
    }
    list.add(new UnixTime(byteBuf.readInt()));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    UnixTime unixTime = (UnixTime) msg;
    ctx.close();
}
@Override
public void channelActive(final ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
}
public class TimeEncoder extends ChannelOutboundHandlerAdapter{
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int)m.value());
        ctx.write(encoded, promise); 
    }
}
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt((int)msg.value());
    }
}

关闭应用

上一篇 下一篇

猜你喜欢

热点阅读