Netty基础(未完待续)

2020-09-22  本文已影响0人  笔记本一号

不多BB,我们开门见山
Netty是一个提供了易于使用的API的NIO框架,具有高并发、高性能的特点

Netty的Reactor线程模型

https://blog.csdn.net/bingxuesiyang/article/details/89888664

单线程模型

所有的IO操作都是由一个NIO线程处理。

Reactor内部通过selector 监控连接事件,收到事件后通过dispatch进行分发,如果是连接建立的事件,则由Acceptor处理,Acceptor通过accept接受连接,并创建一个Handler来处理连接后续的各种事件,如果是读写事件,直接调用连接对应的Handler来处理。
Handler完成read->业务处理(decode->compute->encode)->send的全部流程。
这种模型好处是简单,坏处却很明显,当某个Handler阻塞时,会导致其他客户端的handler和accpetor都得不到执行,无法做到高性能,只适用于业务处理非常快速的场景。

缺点:单线程NIO负载过重,并发高时产生任务堆积,延迟过高,不适合并发高的场景

多线程模型

由一个NIO线程处理客户端连接,由一组NIO线程池处理IO操作

主线程中,Reactor对象通过selector监控连接事件,收到事件后通过dispatch进行分发,如果是连接建立事件,则由Acceptor处理,Acceptor通过accept接收连接,并创建一个Handler来处理后续事件,而Handler只负责响应事件,不进行业务操作,也就是只进行read读取数据和write写出数据,业务处理交给一个线程池进行处理。
线程池分配一个线程完成真正的业务处理,然后将响应结果交给主进程的Handler处理,Handler将结果send给client。
单Reactor承当所有事件的监听和响应,而当我们的服务端遇到大量的客户端同时进行连接,或者在请求连接时执行一些耗时操作,比如身份认证,权限检查等,这种瞬时的高并发就容易成为性能瓶颈。

缺点:连接处理能力有限,客户连接并发高时,会产生连接延迟过高

主从线程模型

一组NIO线程池处理连接,一组NIO线程池处理IO操作

存在多个Reactor,每个Reactor都有自己的selector选择器,线程和dispatch。
主线程中的mainReactor通过自己的selector监控连接建立事件,收到事件后通过Accpetor接收,将新的连接分配给某个子线程。
子线程中的subReactor将mainReactor分配的连接加入连接队列中通过自己的selector进行监听,并创建一个Handler用于处理后续事件
Handler完成read->业务处理->send的完整业务流程。

缺点:无

Netty的核心概念

EventLoop和EventLoopGroup

EventLoopGroup是一个线程组,包含了一组NIO线程,主要用于处理网络事件,例如客户端连接,Channel的事件。每个EventLoop负责处理多个 Channel 上的事件,而一个 Channel 只对应于一个EventLoop。EventLoopGroup实际上就是Reactor线程组,Netty 的 IO 线程 NioEventLoop 由于聚合了多路复用器 Selector,一组EventLoopGroup可以同时并发处理成百上千个客户端连接,另一组EventLoopGroup可以处理客户端的通道事件

ChannelHandler和ChannelPipeline

ChannelHandler是一个接口,专门处理 I/O 或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。ChannelPipeline是ChannelHandler的容器,而每个Channel都会有一个ChannelPipeline,负责ChannelHandler的管理和事件拦截与调度。Channel会将事件扔到ChannelPipeline中,然后事件会被ChannelPipeline安排一系列的ChannelHandler拦截处理,例如编解码事件、TCP的粘包拆包事件、用户自定义Handler等,经过一系列加工后,事件的消息会被添加缓冲区中等待Channel的刷新和发送。

事件出站(Outbound)和入站(Inbound)

ChannelPipeline为ChannelHandler链提供了一个容器并定义了用于沿着链传播入站和出站事件流的API,当一个数据流进入 ChannlePipeline 时,它会从 ChannelPipeline 头部开始传给第一个 ChannelInboundHandler ,当第一个处理完后再传给下一个,一直传递到管道的尾部。
与之相对应的是,当数据被写出时,它会从管道的尾部开始,先经过管道尾部的 “最后” 一个ChannelOutboundHandler,当它处理完成后会传递给前一个 ChannelOutboundHandler 。

事件出站

ChannelHandlerContext

保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象。I/O 事件由 ChannelInboundHandler 或 ChannelOutboundHandler 处理,并通过调用 ChannelHandlerContext 中定义的事件传播方法。

一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler。入站事件和出站事件在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 handler,两种类型的 handler 互不干扰。

ServerBootstrap和Bootstrap

ServerBootstrap是服务端的启动助手,Bootstrap是客户端的启动助手,它们的目的主要是降低开发的复杂度

TCP的粘包和拆包

在TCP协议中一个完整的数据包可能会被TCP拆分为多个包发送,或者将多个小的数据包封装成大的数据包发送,这就会发生TCP的粘包和拆包的问题。
产生原因:

TCP的粘包和拆包的解决方法

粘包问题解决

Netty的默认线程数

Netty 默认是 CPU 处理器数的两倍,bind 完之后启动。

Netty高性能的原因

Netty的序列化协议:

Netty实战:1、利用Netty实现的聊天室 2、利用Netty实现Http协议 3、利用Netty实现Websocket协议

1、利用Netty实现的聊天室:

Sever端

public class ChatServer {
    private int port;

    public ChatServer(int port) {
        this.port = port;
    }

    public void start(){
        EventLoopGroup boss=new NioEventLoopGroup();
        EventLoopGroup works=new NioEventLoopGroup();
        try {
            ServerBootstrap boot=new ServerBootstrap();
            boot.group(boss,works)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            ChannelPipeline pipeline = sc.pipeline();
                            pipeline.addLast("line",new LineBasedFrameDecoder(1024));
                            pipeline.addLast("encode", new StringEncoder());
                            pipeline.addLast("decode", new StringDecoder());
                            pipeline.addLast(new ServerHander());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG,128)
                    .option(ChannelOption.SO_KEEPALIVE,true);
            ChannelFuture future = boot.bind(this.port).sync();
            System.out.println("服务已经启动...............");
            future.channel().closeFuture().sync();
            System.out.println("服务已经关闭...............");
        }catch (Exception e){

        }finally {
            boss.shutdownGracefully();
            works.shutdownGracefully();
        }

    }

    public static void main(String[] args) {
        new ChatServer(8888).start();
    }
}

SeverHander

public class ServerHander extends SimpleChannelInboundHandler<String> {
    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 每当从客户端有消息写入时
     *
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        Channel inComing = channelHandlerContext.channel();
        for (Channel channel : channels) {
            if (channel != inComing) {
                channel.writeAndFlush("[用户" + inComing.remoteAddress() + " 说:]" + s + "\n");
            } else {
                channel.writeAndFlush("[我说:]" + s + "\n");
            }
        }
    }

    /**
     * 当有客户端连接时,handlerAdded会执行,就把该客户端的通道记录下来,加入队列
     *
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel inComing = ctx.channel();//获得客户端通道
        //通知其他客户端有新人进入
        for (Channel channel : channels) {
            if (channel != inComing) {
                channel.writeAndFlush("[欢迎: " + inComing.remoteAddress() + "] 进入聊天室!\n");
            }
        }
        channels.add(inComing);//加入队列
    }

    /**
     * 断开连接
     *
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel outComing = ctx.channel();//获得客户端通道
        //通知其他客户端有人离开
        for (Channel channel : channels) {
            if (channel != outComing) {
                channel.writeAndFlush("[再见: ]" + outComing.remoteAddress() + " 离开聊天室!\n");
            }
        }

        channels.remove(outComing);
    }


    /**
     * 当服务器监听到客户端活动时
     *
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel inComing = ctx.channel();
        System.out.println("[" + inComing.remoteAddress() + "]: 进入聊天室");
    }

    /**
     * 离线
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel inComing = ctx.channel();
        System.out.println("[" + inComing.remoteAddress() + "]: 离线");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel inComing = ctx.channel();
        System.out.println(inComing.remoteAddress() + "通讯异常!");
        ctx.close();
    }
}

Client端

public class ChatClient {
    private String host;
    private int port;

    public ChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() {
        EventLoopGroup works = new NioEventLoopGroup();
        try {
            Bootstrap boot = new Bootstrap();
            boot.group(works).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            ChannelPipeline pipeline = sc.pipeline();
                            pipeline.addLast("line",new LineBasedFrameDecoder(1024));
                            pipeline.addLast("encode", new StringEncoder());//编码器
                            pipeline.addLast("decode", new StringDecoder());//解码器
                            pipeline.addLast(new ClientHander());
                        }
                    });
            ChannelFuture future = boot.connect(this.host, this.port).sync();
            System.out.println("客户端已经连接");
            future.channel().closeFuture().sync();
            System.out.println("客户端已经关闭");
        } catch (Exception e) {

        } finally {
            works.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new ChatClient("localhost",8888).start();
    }
}

ClientHander

public class ClientHander extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println(s);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        new Thread(()-> {
                Scanner scanner = new Scanner(System.in);
                while (scanner.hasNextLine()) {
                    String s = scanner.nextLine()+"\n";
                    ByteBuf buffer = Unpooled.buffer(s.length());
                    buffer.writeBytes(s.getBytes());
                    ctx.writeAndFlush(buffer);
                }
        }).start();
    }
}

启动Sever

image.png

启动一个Client

image.png

启动第二个Client

image.png image.png image.png image.png

2、利用Netty实现的Http协议:

Server端

public class HttpServer {

    public static void main(String[] args) throws Exception {

        // 定义一对线程组
        // 主线程组, 用于接受客户端的连接,但是不做任何处理,跟老板一样,不做事
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 从线程组, 老板线程组会把任务丢给他,让手下线程组去做任务
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            // netty服务器的创建, ServerBootstrap 是一个启动类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)           // 设置主从线程组
                            .channel(NioServerSocketChannel.class)
                            .childHandler(new HttpServerInitializer()); 
            
            // 启动server,并且设置8088为启动的端口号,同时启动方式为同步
            ChannelFuture channelFuture = serverBootstrap.bind(8088).sync();
            
            // 监听关闭的channel,设置位同步方式
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

}

public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        // 通过SocketChannel去获得对应的管道
        ChannelPipeline pipeline = channel.pipeline();
        
        // 通过管道,添加handler
        // HttpServerCodec是由netty自己提供的助手类,可以理解为拦截器
        // 当请求到服务端,我们需要做解码,响应到客户端做编码
        pipeline.addLast("HttpServerCodec", new HttpServerCodec());
        
        // 添加自定义的助手类,返回 "hello netty~"
        pipeline.addLast("customHandler", new CustomHandler());
    }

}
public class CustomHandler extends SimpleChannelInboundHandler<HttpObject> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) 
            throws Exception {
        // 获取channel
        Channel channel = ctx.channel();
        
        if (msg instanceof HttpRequest) {
            // 显示客户端的远程地址
            System.out.println(channel.remoteAddress());
            
            // 定义发送的数据消息
            ByteBuf content = Unpooled.copiedBuffer("Hello netty~", CharsetUtil.UTF_8);
            
            // 构建一个http response
            FullHttpResponse response = 
                    new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, 
                            HttpResponseStatus.OK, 
                            content);
            // 为响应增加数据类型和长度
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
            
            // 把响应刷到客户端
            ctx.writeAndFlush(response);
        }
        
    }
/*
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel注册到NioEventLoop");
        super.channelRegistered(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel取消和NioEventLoop的绑定");
        super.channelUnregistered(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel准备就绪");
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel被关闭");
        super.channelInactive(ctx);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel读数据完成");
        super.channelReadComplete(ctx);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("用户事件触发。。。");
        super.userEventTriggered(ctx, evt);
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel可写更改");
        super.channelWritabilityChanged(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("捕获到异常");
        super.exceptionCaught(ctx, cause);
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("新事件被添加");
        super.handlerAdded(ctx);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("事件被移除");
        super.handlerRemoved(ctx);
    }
*/
}

3、利用Netty实现的Websocket协议:

由于Websocket的握手需要使用http,所以在pipline中需要注册支持http的事件HttpServerCodec
Server端

public class WSServer {

    public static void main(String[] args) throws Exception {
        
        EventLoopGroup mainGroup = new NioEventLoopGroup();
        EventLoopGroup subGroup = new NioEventLoopGroup();
        
        try {
            ServerBootstrap server = new ServerBootstrap();
            server.group(mainGroup, subGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new WSServerInitialzer());
            
            ChannelFuture future = server.bind(8088).sync();
            
            future.channel().closeFuture().sync();
        } finally {
            mainGroup.shutdownGracefully();
            subGroup.shutdownGracefully();
        }
    }
    
}

public class WSServerInitialzer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        
        // websocket 基于http协议,所以要有http编解码器
        pipeline.addLast(new HttpServerCodec());
        // 对写大数据流的支持 
        pipeline.addLast(new ChunkedWriteHandler());
        // 对httpMessage进行聚合,聚合成FullHttpRequest或FullHttpResponse
        // 几乎在netty中的网络编程,都会使用到此hanler
        pipeline.addLast(new HttpObjectAggregator(1024*64));

    // 针对客户端,如果在1分钟时没有向服务端发送读写心跳(ALL),则主动断开
        // 如果是读空闲或者写空闲,不处理
        pipeline.addLast(new IdleStateHandler(8, 10, 12));
        // 自定义的空闲状态检测
        pipeline.addLast(new HeartBeatHandler());



        /**
         * websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws
         * 本handler会帮你处理一些繁重的复杂的事
         * 会帮你处理握手动作: handshaking(close, ping, pong) ping + pong = 心跳
         * 对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同
         */
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        
        // 自定义的handler
        pipeline.addLast(new ChatHandler());
    }

}

TextWebSocketFrame: 在netty中,是用于为websocket专门处理文本的对象,frame是消息的载体

public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    // 用于记录和管理所有客户端的channle
    private static ChannelGroup clients = 
            new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) 
            throws Exception {
        // 获取客户端传输过来的消息
        String content = msg.text();
        System.out.println("接受到的数据:" + content);
        
//      for (Channel channel: clients) {
//          channel.writeAndFlush(
//              new TextWebSocketFrame(
//                      "[服务器在]" + LocalDateTime.now() 
//                      + "接受到消息, 消息为:" + content));
//      }
        // 下面这个方法,和上面的for循环效果是一样的
        clients.writeAndFlush(
                new TextWebSocketFrame(
                        "[服务器在]" + LocalDateTime.now() 
                        + "接受到消息, 消息为:" + content));
        
    }

    /**
     * 当客户端连接服务端之后(打开连接)
     * 获取客户端的channle,并且放到ChannelGroup中去进行管理
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        clients.add(ctx.channel());
    }

      @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        
        String channelId = ctx.channel().id().asShortText();
        System.out.println("客户端被移除,channelId为:" + channelId);
         //System.out.println("客户端断开,channle对应的长id为:" + ctx.channel().id().asLongText());
        //System.out.println("客户端断开,channle对应的短id为:" + ctx.channel().id().asShortText());
        // 当触发handlerRemoved,ChannelGroup会自动移除对应客户端的channel
        clients.remove(ctx.channel());
    }
/**
发生异常时进行捕获
*/
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        // 发生异常之后关闭连接(关闭channel),随后从ChannelGroup中移除
        ctx.channel().close();
        clients.remove(ctx.channel());
    }   
    
}

心跳检测

/**
 * @Description: 用于检测channel的心跳handler 
 *               继承ChannelInboundHandlerAdapter,从而不需要实现channelRead0方法
 */
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        
        // 判断evt是否是IdleStateEvent(用于触发用户事件,包含 读空闲/写空闲/读写空闲 )
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent)evt;     // 强制类型转换
            
            if (event.state() == IdleState.READER_IDLE) {
                System.out.println("进入读空闲...");
            } else if (event.state() == IdleState.WRITER_IDLE) {
                System.out.println("进入写空闲...");
            } else if (event.state() == IdleState.ALL_IDLE) {
                
                System.out.println("channel关闭前,users的数量为:" + ChatHandler.users.size());
                
                Channel channel = ctx.channel();
                // 关闭无用的channel,以防资源浪费
                channel.close();
                
                System.out.println("channel关闭后,users的数量为:" + ChatHandler.users.size());
            }
        }
        
    }
    
}
上一篇下一篇

猜你喜欢

热点阅读