netty

Netty多客户端通信机制

2017-06-06  本文已影响0人  东升的思考

上篇文章讲解了客户端与服务端通信示例,本篇来讲解下多客户端之间是如何通信的,我们以一个聊天室的程序为例。
具体需求:
客户端1、2、3(通过remoteAddress来标识),当客户端1上线后,发送一条消息给服务端,当客户端2上线后,通知客户端1:“客户端2已经上线”,当客户端3上线后,通知客户端1和客户端2:“客户端3已经上线”。

按照Netty服务构建步骤进行,可以参见Netty构建服务的基本步骤文章来了解构建过程以及具体说明。

  1. 首先构建聊天室服务端入口,多个客户端通信都是经由服务端作为传输和通信载体。
    示例代码:
public class MyCatServer {

    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup =  new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyChatServerInitializer());

            ChannelFuture channelFuture = bootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
  1. 创建MyChatClientInitializer,添加编解码处理器ChannelPipeline.
    示例代码:
public class MyChatServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline channelPipeline = ch.pipeline();

        // 添加基于\r \n界定符的解码器
        channelPipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
        channelPipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); // 添加字符串解码器
        channelPipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); // 添加字符串编码器
        channelPipeline.addLast(new MyChatServerHandler()); // 自定义处理器
    }
}
  1. 创建自定义处理器MyChatServerHandler, 这里涉及到一个重要的组件ChannelGroup,它是线程安全的,ChannelGroup存储了已连接的Channel,Channel关闭会自动从ChannelGroup中移除,无需担心Channel生命周期。同时,可以对这些Channel做各种批量操作,可以以广播的形式发送一条消息给所有的Channels,调用它的writeAndFlush方法来实现。
    ChannelGroup可以进一步理解为设计模式中的发布-订阅模型,其底层是通过ConcurrentHashMap进行存储所有Channel的。
    示例代码:
public class MyChatServerHandler extends SimpleChannelInboundHandler<String> {

    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        // 这里要区分下是否是自己发的消息
        Channel channel = ctx.channel();
        // 这里使用了Java8的lambda表达式
        channelGroup.forEach(ch -> {
            if (ch == channel) { // 两个channel对象地址相同
                System.out.println("服务器端转发聊天消息:【自己】发送的消息, 内容:" + msg + "\n");
                ch.writeAndFlush("【自己】发送的消息, 内容:" + msg + "\n");
            } else {
                System.out.println("服务器端转发聊天消息:"+ ch.remoteAddress() + "发送的消息,内容:" + msg + "\n");
                ch.writeAndFlush(ch.remoteAddress() + "发送的消息,内容:" + msg + "\n");
            }
        });
    }

    // -----------以下覆写的方法是ChannelInboundHandlerAdapter中的方法---------------
    @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();

        channelGroup.writeAndFlush("[服务器] - " + channel.remoteAddress() + " 加入了\n");

        // 先写入到客户端,最后再将自己添加到ChannelGroup中
        channelGroup.add(channel);
    }

    @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();

        channelGroup.writeAndFlush("[服务器] - " + channel.remoteAddress() + " 离开了\n");

        // 这里channelGroup会自动进行调用,所以这行代码不写也是可以的。
        channelGroup.remove(channel);
    }

    /**
     * 只要有客户端连接就会执行
     *
     * @param ctx
     * @throws Exception
     */
    @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + " 上线了\n");
    }

    @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + " 下线了\n");
    }

    @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}
  1. 构建客户端入口,连接服务端8899端口,示例中通过控制台输入形式给服务端发送消息。
    示例代码:
public class MyCatClient {

    public static void main(String[] args) {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new MyChatClientInitalizer());

            ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
            // channelFuture.channel().closeFuture().sync();

            // 从控制台不断的读取输入
            boolean running = true;
            try (BufferedReader br = new BufferedReader(new InputStreamReader(System.in))){
                while (running) {
                    channelFuture.channel().writeAndFlush(br.readLine() + "\r\n");
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}
  1. 创建客户端MyChatClientInitializer,跟服务端基本类似的处理器。
    示例代码:
public class MyChatClientInitalizer extends ChannelInitializer<SocketChannel> {

    @Override protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline channelPipeline = ch.pipeline();

        channelPipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
        channelPipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        channelPipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        channelPipeline.addLast(new MyChatClientHandler());
    }
}
  1. 创建自定义处理器MyChatClientHandler, 很简单,只是输出一条消息。
    示例代码:
public class MyChatClientHandler extends SimpleChannelInboundHandler<String> {

    @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg);
    }
}

以上几个步骤就完成了聊天室程序的编码工作,下面运行服务端和客户端程序,运行结果如下:

MyChatClient(1) MyChatClient(2) MyChatClient(3) MyChatServer
Run None None /127.0.0.1:51049 上线了
[服务器] - /127.0.0.1:51055 加入了 Run None /127.0.0.1:51055 上线了
[服务器] - /127.0.0.1:51114 加入了 [服务器] - /127.0.0.1:51114 加入了 Run /127.0.0.1:51114 上线了

None:表示未运行 Run:表示运行

上一篇下一篇

猜你喜欢

热点阅读