Android网络编程+RxJava

Netty 实现长连接消息推送初尝试

2019-05-22  本文已影响0人  ImWiki

由于我最近在研究推送功能,计划在项目中接入5大厂家推送服务,然后有部分厂家推送服务只提供通知栏推送功能,并不提供自定义消息推送(透传),所以我在想,如果我们自己搭建服务器实现是不是成本也不至于很高呢,虽然我们可以利用其中一家推送作为透传推送功能。我了解到Netty是一个很成熟的方案,所以做一些简单的尝试。
https://netty.io/wiki/user-guide-for-4.x.html#wiki-h2-3

什么是 Netty

Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

添加依赖

Gradle

    compile 'io.netty:netty-all:4.1.36.Final'

Maven

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.36.Final</version>
</dependency>

服务端代码

public class DiscardServer {
    private int port;
    public DiscardServer(int port) {
        this.port = port;
    }
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler(),new TimeServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
            ChannelFuture f = b.bind(port).sync(); // (7)
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }
        new DiscardServer(port).run();
    }
}

定义客户端连接Handler:如果收到了客户端的连接就发送给客户端一个小时“你好,欢迎建立长连接”。

public class TimeServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf byteBuf = ctx.alloc().buffer(8);
        byteBuf.writeBytes("你好,欢迎建立长连接".getBytes());
        ctx.writeAndFlush(byteBuf,ctx.channel().newPromise());
        System.out.println("TimeServerHandler,有新连接");
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

定义服务端消息接收Handler,如果收到客户端消息就打印出来

public class DiscardServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // Discard the received data silently.
        ByteBuf byteBuf = (ByteBuf) msg;
        try {
            byte[] result = new byte[byteBuf.readableBytes()];
            byteBuf.readBytes(result);
            String text = new String(result);
            System.out.println("收到客户端消息:" + text);
        } finally {
            byteBuf.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

客户端代码

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = "127.0.0.1";
        int port = Integer.parseInt("8080");
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)
            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

定义消息接收Handler,如果收到消息就回复服务端“我已经收到了,谢谢你。”。

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf byteBuf = (ByteBuf) msg;
        try {
            byte[] result = new byte[byteBuf.readableBytes()];
            byteBuf.readBytes(result);
            String text = new String(result);
            System.out.println("收到服务端消息:" + text);
        } finally {
            byteBuf.release();
        }
        final ByteBuf time = ctx.alloc().buffer(8);
        time.writeBytes("我已经收到了,谢谢你。".getBytes());
        final ChannelFuture f = ctx.writeAndFlush(time);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
        System.out.println("TimeClientHandler:exceptionCaught");
    }
}

测试结果

先运行服务端服务端,然后再运行客户端

  1. 客户端
收到服务端消息:你好,欢迎建立长连接
  1. 服务端
收到客户端消息:我已经收到了,谢谢你。
上一篇下一篇

猜你喜欢

热点阅读