Netty中级篇

2020-08-09  本文已影响0人  贪挽懒月

一、Netty核心模块

它们的常用方法有:

- group:设置线程组
- channel:指定通道的实现类
- option:给channel添加配置
- childOption:给接收到的channel添加配置
- handler:设置bossGroup的handler
- childHandler:设置workGroup的handler
- addFirst:把handler添加到链表第一个位置
- addLast:把handler添加到链表的最后一个位置

欢迎大家关注我的公众号 javawebkf,目前正在慢慢地将简书文章搬到公众号,以后简书和公众号文章将同步更新,且简书上的付费文章在公众号上将免费。


二、用Netty实现聊天室功能

之前说过用NIO实现聊天室,现在来看看用netty如何实现聊天室。这里我将新建两个maven项目,一个服务端,一个客户端,最后可以打成jar包,服务端jar包运行在你电脑上,客户端jar包自己跑一份,还可以发给你的同事,然后就可以愉快的聊天了。

1、服务端:

<dependencies>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.50.Final</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass>com.zhusl.study.chatroom.NettyChatRoomClient</mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

客户端的pom.xml唯一的区别就是 <mainClass>换成了客户端启动类。

public class NettyChatRoomServer {
    
    public void run () throws Exception {
        // 创建两个线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            // 配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workGroup)
                     .channel(NioServerSocketChannel.class)
                     .option(ChannelOption.SO_BACKLOG, 128)
                     .childOption(ChannelOption.SO_KEEPALIVE, true)
                     .childHandler(new NettyChatRoomServerInitializer());
            // 监听端口
            ChannelFuture cf = bootstrap.bind(6666).sync(); 
            cf.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        NettyChatRoomServer ncrs = new NettyChatRoomServer();
        ncrs.run();
    }
}
public class NettyChatRoomServerInitializer extends ChannelInitializer<SocketChannel>{

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("decode",new StringDecoder());//解码器
        pipeline.addLast("encode",new StringEncoder());//编码器
        pipeline.addLast("handler",new NettyChatRoomServerHandler());
        
    }
}
public class NettyChatRoomServerHandler extends SimpleChannelInboundHandler<String>{

    private  static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
    /**
     * 当有channel加入时执行该方法(即当有客户端连接时)
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        String ip = ctx.channel().remoteAddress().toString().substring(9, 13);
        System.out.println("【" + ip + "】" + "进入聊天室");
        for (Channel channel : channelGroup) {
            // 给别的客户端提醒:xxx加入群聊
            if (ctx.channel() != channel) {
                channel.writeAndFlush("【" + ip + "】" + "进入聊天室");
            }
        }
        // 将当前channel加入到channelGroup中
        channelGroup.add(ctx.channel());
    }
    
    /**
     * 当有channel删除时执行该方法(即客户端断开连接)
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        String ip = ctx.channel().remoteAddress().toString().substring(9, 13);
        System.out.println("【" + ip + "】" + "离开聊天室");
        for (Channel channel : channelGroup) {
            // 给别的客户端提醒:xxx加入群聊
            if (ctx.channel() != channel) {
                channel.writeAndFlush("【" + ip + "】" + "离开聊天室");
            }
        }
        // 这里不需要channelGroup.remove,会自动remove
    }
    
    /**
     * 当有数据读取时执行该方法
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        String ip = ctx.channel().remoteAddress().toString().substring(9, 13);
        System.out.println("【" + ip + "】" + ":" + msg);
        for (Channel channel : channelGroup) {
            // 将消息转发给别的客户端
            if (ctx.channel() != channel) {
                channel.writeAndFlush("【" + ip + "】"  + ":" + msg);
            } else {
                channel.writeAndFlush("【我】:" + msg);
            }
        }
    }
    
    /**
     * 异常处理
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

2、客户端:

public class NettyChatRoomClient {

    @SuppressWarnings("resource")
    public void run() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                     .channel(NioSocketChannel.class)
                     .handler(new NettyChatRoomClientInitializer());
            // 连接服务器
            ChannelFuture channelFuture = bootstrap.connect("192.168.2.36", 7777).sync();
            Channel channel = channelFuture.channel();
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String msg = scanner.nextLine();
                channel.writeAndFlush(msg);
            }
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        NettyChatRoomClient ncrc = new NettyChatRoomClient();
        ncrc.run();
    }
}
public class NettyChatRoomClientInitializer extends ChannelInitializer<SocketChannel>{

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("decode",new StringDecoder());//解码器
        pipeline.addLast("encode",new StringEncoder());
        pipeline.addLast("handler",new NettyChatRoomClientHandler());
    }
}
public class NettyChatRoomClientHandler extends SimpleChannelInboundHandler<String>{
    
    /**
     * 异常处理
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        // 将从服务端接收到的消息打印出来
        System.out.println(msg);
    }
}

三、Netty心跳检测机制

客户端与服务端连接是否正常,需要有一个机制来检测,Netty提供了心跳检测机制。
1、Netty心跳检测案例:

客户端和以前一样,没有变换,主要是服务端加了日志handler以及childHandler重写了一个用于检测心跳的方法userEventTriggered,服务端代码如下:

public class HeartBeatServer {

    public static void main(String[] args) throws Exception {
        // 1. 创建boss group (boss group和work group含有的子线程数默认是cpu数 * 2)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 2. 创建work group
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            // 3. 创建服务端启动对象
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 4. 配置启动参数
            bootstrap.group(bossGroup, workGroup) // 设置两个线程组
                    .channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作为服务器的通道
                    .handler(new LoggingHandler(LogLevel.INFO)) // 日志处理器
                    .childHandler(new ChannelInitializer<SocketChannel>() { // 创建通道初始化对象
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            sc.pipeline().addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));
                            sc.pipeline().addLast(new HeartBeatServerHandler());
                        }
                    });
            // 5. 启动服务器并绑定端口
            ChannelFuture cf = bootstrap.bind(6666).sync();
            // 6. 对关闭通道进行监听
            cf.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}
public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter{
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            String info = null;
            switch (event.state()) {
            case READER_IDLE:
                info = "读空闲";
                break;
            case WRITER_IDLE:
                info = "写空闲";
                break;
            case ALL_IDLE:
                info = "读写空闲";
                break;
            }
            System.out.println(ctx.channel().remoteAddress() + ":" + info);
        }
    }
}

四、WebSocket长连接开发

1、http协议和websocket协议的区别:

2、案例代码:

public class WebSocketServer {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workGroup) // 设置两个线程组
                    .channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作为服务器的通道
                    .handler(new LoggingHandler(LogLevel.INFO)) // 日志处理器
                    .childHandler(new ChannelInitializer<SocketChannel>() { // 创建通道初始化对象
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            sc.pipeline().addLast(new HttpServerCodec()); // 使用http的编码解码器
                            sc.pipeline().addLast(new ChunkedWriteHandler()); // 是以块方式写,添加ChunkedWriteHandler处理器
                            sc.pipeline().addLast(new HttpObjectAggregator(8192)); // http数据在传输的时候是分段的,用这个处理器就可聚集分段
                            // 请求的url就是:ws://localhost:6666/hello
                            sc.pipeline().addLast(new WebSocketServerProtocolHandler("/hello"));
                            sc.pipeline().addLast(new WebSocketServerHandler());
                        }
                    });
            ChannelFuture cf = bootstrap.bind(80).sync();
            System.out.println("服务端准备好了");
            cf.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}
public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        System.out.println("服务器收到消息:" + msg.text());
        ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间:" + LocalDateTime.now()) + ",msg:" + msg.text());
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerAdded被调用:" + ctx.channel().id().asLongText());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerRemoved被调用:" + ctx.channel().id().asLongText());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("exceptionCaught被调用:" + ctx.channel().id().asLongText());
        ctx.close();
    }
}

然后编写一个页面,用来发送websocket请求:


<body>
  <script type="text/javascript">
     var socket;
     if(window.WebSocket) {
         socket = new WebSocket("ws://127.0.0.1/hello");
         // 接收服务器端返回的消息,显示在responseText中
         socket.onmessage = function(ev){
             var rt = document.getElementById("responseText");
             rt.value = rt.value + "\n" + ev.data;
         }
         // 相当于开启连接
         socket.onopen = function(ev){
             var rt = document.getElementById("responseText");
             rt.value = "连接开启了";
         }
         // 连接关闭
         socket.onclose = function(ev){
             var rt = document.getElementById("responseText");
             rt.value = rt.value + "\n" + "连接关闭了";
         }
     } else {
         alert("浏览器不支持websocket");
     }
     
     function send(message){
         if (!window.socket){
             return;
         }
         if (socket.readyState == WebSocket.OPEN){
             socket.send(message);
         } else {
             alert("连接没有开启");
         }
     }
  </script>

  <form onsubmit="return false">
     <textarea name="message" style="height:300px;width: 300px"></textarea>
     <input type="button" value="发送消息" onclick="send(this.form.message.value)">
     <textarea id="responseText" style="height:300px;width: 300px"></textarea>
  </form>
</body>

访问这个页面,服务端启动或者关闭会在框框中显示出来,同样,如果客户端关闭,服务端也会在控制台打印出来。

五、protobuf

1、编解码问题:
数据在网络中是以二进制字节码传输的,发送的数据需要编码,服务端收到后需要解码。Netty提供了StringDecoder、ObjectDecoder,底层采用的是java序列化技术,java序列化本身效率较低,而且无法跨语言,所以就有了protobuf。

2、protobuf简介:
它是Google的开源项目,轻便高效的结构化数据存储格式,可用于数据的序列化,且支持跨平台跨语言,很适合做数据存储和RPC数据交换格式。

3、protobuf的使用:

下面开始编码:



<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.6.1</version>
</dependency>
syntax = "proto3"; // 版本
option java_outer_classname = "StudentPOJO"; // 外部类名
message Student { // 内部类名
   int32 id = 1; // 1不是值,而是序号
   string name = 2;
}
protoc.exe --java_out=. Student.proto

执行完后会在protoc.exe所在目录生成一个StudentPOJO.java文件,这就是我们要的文件,复制到项目中。

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("client:" + ctx);
    // 发送student对象到服务端
    StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(666).setName("张三").build();
    ctx.writeAndFlush(student);
}

NettyClient中添加protobuf的编码器:

@Override
protected void initChannel(SocketChannel sc) throws Exception {
    // 加入protobuf的编码器
    sc.pipeline().addLast("encoder", new ProtobufEncoder());
    sc.pipeline().addLast(new NettyClientHandler());
}
@Override
protected void initChannel(SocketChannel sc) throws Exception {
    // 加入解码器,指定解码对象
    sc.pipeline().addLast("decoder", new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
    // 传入自定义的handler
    sc.pipeline().addLast(new NettyServerHandler());
    // 在这里,可以将SocketChannel sc保存到集合中,别的线程拿到集合就可以调用channel的方法了
} 

NettyServerHandler中读取student对象:

// 读取数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 读取客户端发送的student
    StudentPOJO.Student student = (Student) msg;
    System.out.println("客户端发送的数据是:id=" + student.getId() + ", name=" + student.getName());
}

启动服务端,再启动客户端,就可以看到服务端后台打印出了如下信息:

客户端发送的数据是:id=666, name=张三
上一篇 下一篇

猜你喜欢

热点阅读