Netty基础原理常用API分析以及Liunx句柄数修改

2020-03-04  本文已影响0人  强某某

Netty介绍

Netty是由JBOSS提供的一个java开源框架,是业界最流行的NIO框架,整合了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,精心设计的框架,在多个大型商业项目中得到充分验证。

那些主流框架产品在用?

BIO时间返回器

public class BioServer {
    public static final int PORT=3456;
    public static void main(String[] args) throws IOException {
        ServerSocket server=null;
        try {
            server=new ServerSocket(PORT);
            Socket socket=null;
            while (true) {
                socket= server.accept();
                new Thread(new TimerServerHandler(socket)).start();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            if (server != null) {
                server.close();
            }
        }
    }
}
public class TimerServerHandler implements Runnable {

    private Socket socket;

    public TimerServerHandler(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        BufferedReader in = null;
        PrintWriter out = null;

        try {
            in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
            out = new PrintWriter(this.socket.getOutputStream(), true);

            String body=null;
            while ((body = in.readLine()) != null && body.length() != 0) {
                System.out.println("客户端发送:"+body);
                out.println(new Date().toString());
            }

        } catch (Exception e) {

        } finally {
            if (in!=null) {
                try {
                    in.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (out!=null) {
                try {
                    out.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if (this.socket != null) {
                try {
                    this.socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
public class BioClient {

    public static final int PORT=3456;

    public static void main(String[] args) {
        Socket socket=null;
        BufferedReader in=null;
        PrintWriter  out=null;
        try {
            socket=new Socket("127.0.0.1",PORT);
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(), true);
            out.println("i am client");
            String s = in.readLine();
            System.out.println("服务器当前时间:"+s);
        } catch (Exception e) {

        } finally {
            if (in!=null) {
                try {
                    in.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (out!=null) {
                try {
                    out.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

BIO优缺点

案例:web服务器Tomcat7之前,都是使用BIO,7之后就使用NIO

改进:伪NIO,使用线程池去处理业务逻辑

网络IO模型

同步异步、堵塞和非堵塞

IO详解

权威:RFC标准,或者书籍 《UNIX Network Programming》中文名《UNIX网络编程-卷一》第六章
    1)阻塞式I/O;
    2)非阻塞式I/O;
    3)I/O复用(select,poll,epoll...);
I/O多路复用是阻塞在select,epoll这样的系统调用没有阻塞在真正的I/O系统调用如recvfrom进程受阻于select,等待可能多个套接口中的任一个变为可读
    
IO多路复用使用两个系统调用(select和recvfrom)
blocking IO只调用了一个系统调用(recvfrom)
select/epoll 核心是可以同时处理多个connection,而不是更快,所以连接数不高的话,性能不一定比多线程+阻塞IO好
            
多路复用模型中,每一个socket,设置为non-blocking,
阻塞是在select这
几个核心点:
   阻塞非阻塞说的是线程的状态(重要)
   同步和异步说的是消息的通知机制(重要)
   
   同步需要主动读写数据,异步是不需要主动读写数据
   同步IO和异步IO是针对用户应用程序和内核的交互
   异步需要内核层次的支持

IO多路复用技术

什么是IO多路复用:I/O多路复用,I/O是指网络I/O, 多路指多个TCP连接(即socket或者channel),复用指复用一个或几个线程。简单来说:就是使用一个或者几个线程处理多个TCP连接,最大优势是减少系统开销小,不必创建过多的进程/线程,也不必维护这些进程/线程

select:
    基本原理:监视文件3类描述符: writefds、readfds、和exceptfds,调用后select
    函数会阻塞住,等有数据 可读、可写、出异常 或者 超时 就会返回,select函数正常返回后,通过遍历fdset整个数组才能发现哪些句柄发生了事件,来找到
    就绪的描述符fd,然后进行对应的IO操作,几乎在所有的平台上支持,跨平台支持性好
    
缺点:
    1)select采用轮询的方式扫描文件描述符,全部扫描,随着文件描述符FD数量增多而性能下降            
    2)每次调用 select(),需要把 fd 集合从用户态拷贝到内核态,并进行遍历(消息传递都是从内核到用户空间)
    3)最大的缺陷就是单个进程打开的FD有限制,默认是1024,这个指的是jvm的限制,而不是linux的限制(可修改宏定义,但是效率仍然慢)                
    static final  int MAX_FD = 1024
poll:
    基本流程:
 select() 和 poll() 系统调用的大体一样,处理多个描述符也是使用轮询的方式,根据描述符的状态进行处理,一样需要把 fd 集合从用户态拷贝到内核态,并进行遍历。最大区别是: poll没有最大文件描述符限制(使用链表的方式存储fd)

select和poll基本没啥区别,主要是一个链表一个数组。

Epoll讲解

epoll 基本原理:
     在2.6内核中提出的,对比select和poll,epoll更加灵活,没有描述符限制,用户态拷贝到内核态只需要一次
     使用事件通知,通过epoll_ctl注册fd,一旦该fd就绪,内核就会采用callback的回调机制来激活对应的fd
   
     优点:
         1)没fd这个限制,所支持的FD上限是操作系统的最大文件句柄数,1G内存大概支持10万个句柄 
         2)效率提高,使用回调通知而不是轮询的方式,不会随着FD数目的增加效率下降
         3)通过callback机制通知,内核和用户空间mmap同一块内存实现
  
         Linux内核核心函数
         1)epoll_create()  在Linux内核里面申请一个文件系统 B+树,返回epoll对象,也是一个fd
         2)epoll_ctl() 操作epoll对象,在这个对象里面修改添加删除对应的链接fd, 绑定一个callback函数
         3)epoll_wait()  判断并完成对应的IO操作
  
     缺点:
         编程模型比select/poll 复杂
         例子:100万个连接,里面有1万个连接是活跃,在 select、poll、epoll分别是怎样的表现                
         select:不修改宏定义,则需要 1000个进程才可以支持 100万连接
         poll:100万个链接,遍历都响应不过来了,还有空间的拷贝消耗大量的资源
         epoll:通过回调通知,性能相比之下提升很大

Java的I/O演进历史

Netty线程模型和Reactor模式

使用场景: 对应小业务则适合,编码简单;对于高负载、大并发的应用场景不适合,一个NIO线程处理太多请求,则负载过高,并且可能响应变慢,导致大量请求超时,而且万一线程挂了,则不可用了

实际上的Reactor模式,是基于Java NIO的,在他的基础上,抽象出来两个组件——Reactor和Handler两个组件:

(1)Reactor:负责响应IO事件,当检测到一个新的事件,将其发送给相应的Handler去处理;新的事件包含连接建立就绪、读就绪、写就绪等。

(2)Handler:将自身(handler)与事件绑定,负责事件的处理,完成channel的读入,完成处理业务逻辑后,负责将结果写出channel。

总结:上面的单线程Reactor其实就可以看着一个特殊的handler。而多线程Reactor则分为两部分,一部分是Reactor(可以为多线程,线程组或者单线程),而handler也就是上面说的IO线程,必须是线程组或者多线程。

附属资料:
 为什么Netty使用NIO而不是AIO,是同步非阻塞还是异步非阻塞?
           
 答案:
 在Linux系统上,AIO的底层实现仍使用EPOLL,与NIO相同,因此在性能上没有明显的优势
 Netty整体架构是reactor模型,采用epoll机制,所以往深的说,还是IO多路复用模式,所以也可说netty是同步非阻塞模型(看的层次不一样)

 很多人说这是netty是基于Java NIO 类库实现的异步通讯框架
 特点:异步非阻塞、基于事件驱动,性能高,高可靠性和高可定制性。
   
 参考资料:
  https://github.com/netty/netty/issues/2515

基于netty搭建echo服务

常用服务组件

代码

public class EchoServer {
   private int port;

    public EchoServer(int port) {
        this.port = port;
    }

    /**
     * 启动流程
     */
    public void run() throws InterruptedException {
        //配置服务端线程组
        EventLoopGroup bossGroup=new NioEventLoopGroup();
        EventLoopGroup workGroup=new NioEventLoopGroup();

        try {
            //启动类
            ServerBootstrap serverBootstrap=new ServerBootstrap();
            serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,1024)
                    .option(ChannelOption.TCP_NODELAY,true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //串联很多要处理的handler
                            ch.pipeline().addLast(new EchoHandler());
                        }
                    });
            //绑定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
        }finally {
            //优雅退出,释放线程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }


    public static void main(String[] args) throws InterruptedException {
        int port=8080;
        if (args.length > 0) {
            port=Integer.parseInt(args[0]);
        }
        new EchoServer(port).run();
    }
}
public class EchoHandler extends ChannelInboundHandlerAdapter {

    //读取数据
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//        Channel channel = ctx.channel();
//        channel.writeAndFlush()


//        ChannelPipeline pipeline = ctx.pipeline();
//        pipeline.writeAndFlush()

        ByteBuf data= (ByteBuf) msg;
        System.out.println("服务端收到数据:"+data.toString(CharsetUtil.UTF_8));
        ctx.writeAndFlush(data);
    }

    //读取完成
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("EchoServerHandler channelReadComplete");
    }

    //异常捕获
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();//关闭管道
    }
}
public class EchoClient {
    private String host;
    private int port;

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }
    //https://blog.csdn.net/fd2025/article/details/79740226
    public void start() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress(host, port))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline().addLast(new EchoClientHandler());
                        }
                    });

            //连接到服务端,connect是异步连接,再调用同步async,等待连接成功从
            ChannelFuture channelFuture = bootstrap.connect().sync();
            //阻塞,直到客户端通道关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            //优雅退出,释放nio线程
            group.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws InterruptedException {
        new EchoClient("127.0.0.1", 8080).start();

    }
}
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {


    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf msg) throws Exception {
        System.out.println("Client Received: "+msg.toString(CharsetUtil.UTF_8));
    }

    //channel激活的时候
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive");
        ctx.writeAndFlush(Unpooled.copiedBuffer("哈哈测试",CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("EchoClientHandler Complate");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>net.xdclass</groupId>
    <artifactId>echo-project</artifactId>
    <version>1.0-SNAPSHOT</version>
    <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.32.Final</version>
        </dependency>
    </dependencies>
</project>

Netty的核心链路源码

剖析EventLoop和EventLoopGroup线程模型

线程模型

1. 传统IO模型:
每个请求都分配一个线程用来处理该请求,关于该请求
的read,handle,和send都放在一个线程中进行处理
2. 基于线程池的伪异步IO模型
针对传统IO模型中会造成线程资源极大浪费的缺点,通
过线程池来复用线程处理客户端连接和数据处理.

* 会有一个阻塞线程负责socket连接,即acceptor;
*会有一个线程池维护n个活跃线程和一个消息队列,来
处理socketTask,所以资源是可控的,所以无论客户端
多少并发连接,都会导致系统资源耗尽和宕机;

缺点:

- 无法解决通信阻塞的问题,因为socket.read()方法是
流式数据读取,因此只能读取完所有数据后才能正确处理,如果一个socket发送数据需要60秒那么该线程处理数
据至少要60秒,那么这段时间内的io事件,该线程是
无法及时处理的,如果这样的io事件出现多次,很可
能造成消息队列阻塞;

- 只有一个acceptor负责socket连接,如果线程池阻塞队列阻塞之后,那么所有新的客户端连接也将会被拒绝;如果大量连接拒绝,就可能会认定为系统故障;

3. Reactor模型(实时响应)
前面已经讲过这个模型;
IO复用结合线程池复用就是Reactor模型设计的基本思想

总结:线程模型其实就是IO模型的相关运用,可能还会搭配线程池服用,例如Reactor模型

Bootstrap模块讲解

设置channel通道类型NioServerSocketChannel、OioServerSocketChannel

粗略的理解为option是给bossGroup配置的,childOption是给workerGroup配置的;这两个线程组对应reactor模型的Acceptor和handler

Channel模块

一个Channel包含一个ChannelPipeline,所有ChannelHandler都会顺序加入到ChannelPipeline中 创建Channel时会自动创建一个ChannelPipeline,每个Channel都有一个管理它的pipeline,这关联是永久性的

特别注意:执行顺序channelRegistered-》channelActive=》channelInactive=》channelUnregistered

ChannelHandler和ChannelPipeline模块讲解

ChannelHandlerContext模块

入站出站Handler执行顺序

总结:需要保证最后一个outhandler的的上下文可以有next的指向,否则最后一个outhandler就不会执行了,也就是说最后一个inhanlder之后的outhandler都不会执行。所以一般最后都要有一个inhandler。

模块ChannelFuture

Netty网络数据传输编解码

1)无法跨语言
2) 序列化后的码流太大,也就是数据包太大
3) 序列化和反序列化性能比较差

解码器Decoder

编码器Encoder

编解码器类Codec

 组合解码器和编码器,以此提供对于字节和消息都相同的操作
       
        优点:成对出现,编解码都是在一个类里面完成    
        缺点:耦合在一起,拓展性不佳

        Codec:组合编解码
            1)ByteToMessageCodec
    
            2)MessageToMessageCodec
    
        decoder:解码
             1)ByteToMessageDecoder
    
             2)MessageToMessageDecoder
        
        encoder:编码
             1)ByteToMessageEncoder
    
            2)MessageToMessageEncoder

TCP粘包拆包

什么是粘包拆包

1)TCP拆包: 一个完整的包可能会被TCP拆分为多个包进行发送
2)TCP粘包: 把多个小的包封装成一个大的数据包发送, client发送的若干数据包 Server接收时粘成一包
    
发送方和接收方都可能出现这个原因
        
发送方的原因:TCP默认会使用Nagle算法
        
接收方的原因: TCP接收到数据放置缓存中,应用程序从缓存中读取 
       
UDP: 是没有粘包和拆包的问题,有边界协议

TCP半包读写常见解决方案

发送方:可以关闭Nagle算法
接受方: TCP是无界的数据流,并没有处理粘包现象的机制, 且协议本身无法避免粘包,半包读写的发生需要在应用层进行处理
     应用层解决半包读写的办法
     1)设置定长消息 (10字符)
        xdclass000xdclass000xdclass000xdclass000
                        
     2)设置消息的边界 ($$ 切割)
        sdfafwefqwefwe$$dsafadfadsfwqehidwuehfiw$$879329832r89qweew$$
    
     3)使用带消息头的协议,消息头存储消息开始标识及消息的长度信息
        Header+Body

Netty自带解决TCP半包读写方案

DelimiterBasedFrameDecoder: 指定消息分隔符的解码器

public void run() throws Exception{

        //配置服务端的线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();

        EventLoopGroup workGroup = new NioEventLoopGroup();

        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workGroup)

                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG,128)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                        ch.pipeline().addLast(new StringDecoder());
                        ch.pipeline().addLast(new ServerHandler());
                    }
                });

            System.out.println("Echo 服务器启动");
            //绑定端口,同步等待成功
            ChannelFuture channelFuture =  serverBootstrap.bind(port).sync();
            //等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();

        }finally {
            //优雅退出,释放线程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }

    }

LineBasedFrameDecoder解决TCP半包读写

自定义分隔符解决TCP读写问题

自定义长度半包读写器LengthFieldBasedFrameDecoder

maxFrameLength 数据包的最大长度
    
lengthFieldOffset 长度字段的偏移位,长度字段开始的地方,意思是跳过指定长度个字节之后的才是消息体字段

lengthFieldLength 长度字段占的字节数, 帧数据长度的字段本身的长度

lengthAdjustment 
    一般 Header + Body,添加到长度字段的补偿值,如果为负数,开发人员认为这个 Header的长度字段是整个消息包的长度,则Netty应该减去对应的数字

initialBytesToStrip 从解码帧中第一次去除的字节数, 获取完一个完整的数据包之后,忽略前面的指定位数的长度字节,应用解码器拿到的就是不带长度域的数据包

failFast 是否快速失败

缓冲ByteBuf

ByteBuf是为解决ByteBuffer的问题和满足网络应用程序开发人员的日常需求而设计的

JDK ByteBuffer的缺点:

增强

ByteBuf操作

1.png 2.png

ByteBuf动态扩容

capacity默认值:256字节,最大值:Integet.MAX_VALUE(2GB)
write*方法调用时,通过AbstractByteBuf.ensureWritable0进行检查
容量计算方法:AbstractByteBufAllocator.calculateNewCapacity(新capacity的最小要求,capacity最大值)

根据新capacity的最小值要求,对应有两套计算方法:
没超过4M:从64字节开始,每次增加一倍,直至计算出来的newCpacity满足新容量最小要求

示例:当前大小256,写250,继续写10字节数据,需要的容量最小要求是261,则新容量是6422*2=512

超过4M:新容量=新容量最小要求/4M*4M+4M

示例:当前大小3M,已写3M,继续写2M数据,需要的容量最小要求是5M,则新容量是9M(不能超过最大值)

4M的来源:一个固定的阈值AbstractByteBufAllocator.CALCULATE_THRESHOLD

ByteBuf实现

3.png

所谓池化,其实就是内存复用

Unsafe的实现

4.png

PooledByteBuf对象、内存复用

5.png

零拷贝机制

Netty的零拷贝机制,是一种应用层的实现。和底层JVM、操作系统内存机制并无过多的关联。
使用ByteBuf时netty高性能很重哟的一个原因。


6.png

说明:例如2.就是buffer持有array的引用,实际上数据没动,3也是,数据没动,只是其中ll的引用被buffer持有;还有1,如果是常规jdk的数组合并,其实是拷贝数据,同时新开内存生成新的数组

ByteBuf创建方法和常用的模式

 ByteBuf:传递字节数据的容器
    
   ByteBuf的创建方法
    1)ByteBufAllocator
      池化(Netty4.x版本后默认使用 PooledByteBufAllocator提高性能并且最大程度减少内存碎片
    
      非池化UnpooledByteBufAllocator: 每次返回新的实例
    
    2)Unpooled: 提供静态方法创建未池化的ByteBuf,可以创建堆内存和直接内存缓冲区
​           
     ByteBuf使用模式
        堆缓存区HEAP BUFFER:
            优点:存储在JVM的堆空间中,可以快速的分配和释放
            缺点:每次使用前会拷贝到直接缓存区(也叫堆外内存)
    
        直接缓存区DIRECR BUFFER:
            优点:存储在堆外内存上,堆外分配的直接内存,不会占用堆空间
            缺点:内存的分配和释放,比在堆缓冲区更复杂
    
        复合缓冲区COMPOSITE BUFFER:
            可以创建多个不同的ByteBuf,然后放在一起,但是只是一个视图
            选择:大量IO数据读写,用“直接缓存区”; 业务消息编解码用“堆缓存区”

Netty内部设计模式

Builder构造器模式:ServerBootstap 

责任链设计模式:pipeline的事件传播
            
工厂模式: 创建Channel
            
适配器模式:HandlerAdapter

单机百万连接

必备知识

Netty单机百万连接Linux内核参数优化

局部文件句柄限制(单个进程最大文件打开数)
    ulimit -n 一个进程最大打开的文件数 fd 不同系统有不同的默认值

    root身份编辑   vim /etc/security/limits.conf
                增加下面
                root soft nofile 1000000
                root hard nofile 1000000
                * soft nofile 1000000
                * hard nofile 1000000
    * 表示当前用户,修改后要重启
    
全局文件句柄限制(所有进程最大打开的文件数,不同系统是不一样,可以直接echo临时修改)
    查看命令
        cat /proc/sys/fs/file-max
    永久修改全局文件句柄, 修改后生效 sysctl -p
        vim  /etc/sysctl.conf
        增加 fs.file-max = 1000000
    
    启动
        java -jar millionServer-1.0-SNAPSHOT.jar  -Xms5g -Xmx5g -XX:NewSize=3g -XX:MaxNewSize=3g
上一篇下一篇

猜你喜欢

热点阅读