Java技术Java Web架构设计

浅谈netty

2019-08-03  本文已影响90人  梦想成为一个码农

要理解netty,我们需要先了解I/O Models和JAVA NIO,还有观察者模式、多Reactors线程模型等等这些内容。

I/O Models

在这里我们先要回顾一些操作系统的IO相关基础知识:

当一个输入(input)操作发生时,这里会经历两个不同的阶段:
1.等待数据就绪。
2.将数据从内核的缓冲区拷贝到进程中。

下面是5种IO模型
1.blocking I/O
2.nonblocking I/O
3.I/O multiplexing (select and poll)
4.signal driven I/O (SIGIO)
5.asynchronous I/O (the POSIX aio_functions)

Blocking I/O Model
image.png

整个读取IO的数据是同步、阻塞的。

Nonblocking I/O Model
image.png

当一个应用程序像这样对一个非阻塞描述符循环调用recvfrom时,我们称之为轮询(polling),应用程序持续轮询内核,以查看某个操作是否就绪,这么做往往耗费大量CPU时间。通常是在专门提供某一种功能的系统中才有

I/O Multiplexing Model
image.png
signal driven I/O (SIGIO)
image.png
asynchronous I/O (the POSIX aio_functions)

告知内核启动某个操作,并让内核在整个操作(包括将数据从内核复制到我们自己的缓冲区)完成后通知我们。信号驱动式I/O是由内核通知我们何时可以启动一个I/O操作,而异步I/O模型是由内核通知我们I/O操作何时完成。通过状态、通知和回调来通知调用者的输入输出操作。

image.png
各种I/O模型的比较
image.png

随着linux内核的不断发展,IO也在不断发展,所以后面有了IO多路复用模型。IO 多路复用是通过linux内核的select、poll、epoll这些来完成的。

select函数

该函数允许进程指示内核等待多个事件中的任何一个发生,并只在有一个或多个事件发生或经历一段指定的时间后才唤醒它。
例如:我们可以调用select,告知内核仅在下列情况发生时才返回:

#include <sys/select.h>
#include <sys/time.h>

//返回:若有就绪描述符则为其数目,若超时则为0,若出错则为-1
int select(int maxfdp1,fd_set *readset,fd_set *writeset, fd_set *exceptest,const struct timeval *timeout);

struct timeval{
    long tv_sec;       /* seconds */
    long tv_used;    /* microseconds */
}

select以后最大的优势是用户可以在一个线程内同时处理多个socket的IO请求
pselect : 能够处理信号阻塞并提供了更高时间分辨率的select的增强版本

poll函数
#include <poll.h>

//返回:若有就绪描述符则为其数目,若超时则为0,若出错则为-1
int poll (struct pollfd *fdarray, unsigned long nfds, int timeout);

//一个pollfd结构体表示一个被监视的文件描述符
struct pollfd {
int fd; /* descriptor to check */
short events; /* events of interest on fd */
short revents; /* events that occurred on fd */
};

select机制的问题
1.每次调用select,都需要把fd_set集合从用户态拷贝到内核态,如果fd_set集合很大时,那这个开销也很大
2.同时每次调用select都需要在内核遍历传递进来的所有fd_set,如果fd_set集合很大时,那这个开销也很大
3.为了减少数据拷贝带来的性能损坏,内核对被监控的fd_set集合大小做了限制,并且这个是通过宏控制的,大小不可改变(限制为1024) 【poll用数组结构体解决了大小限制问题】

它没有最大连接数的限制,原因是它是基于链表来存储的,但是同样有一些缺点:

epoll函数

epoll在Linux2.6内核正式提出,是基于事件驱动的I/O方式,相对于select来说,epoll没有描述符个数限制,使用一个文件描述符管理多个描述符,将用户关心的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

int epoll_create(int size);  // epoll_create 函数创建一个epoll句柄
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); // epoll_ctl 函数注册要监听的事件类型
//  epoll_wait 函数等待事件的就绪,成功时返回就绪的事件数目,调用失败时返回 -1,等待超时返回 0
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
struct epoll_event {    
    __uint32_t events;  /* Epoll events */    
    epoll_data_t data;  /* User data variable */};

typedef union epoll_data {    
   void *ptr;   
    int fd;    
    __uint32_t u32;   
    __uint64_t u64;
} epoll_data_t;

epoll优点:
1.没有最大并发连接的限制,能打开的FD的上限远大于1024(1G的内存上能监听约10万个端口);
2.效率提升,不是轮询的方式,不会随着FD数目的增加效率下降。只有活跃可用的FD才会调用callback函数;即Epoll最大的优点就在于它只管你“活跃”的连接,而跟连接总数无关,因此在实际的网络环境中,Epoll的效率就会远远高于select和poll。

  1. 内存拷贝,利用mmap()文件映射内存加速与内核空间的消息传递;即epoll使用mmap减少复制开销。

它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。

对比:


image.png
总结

综上,在选择select,poll,epoll时要根据具体的使用场合以及这三种方式的自身特点。

1.表面上看epoll的性能最好,但是在连接数少并且连接都十分活跃的情况下,select和poll的性能可能比epoll好,毕竟epoll的通知机制需要很多函数回调。

2.select低效是因为每次它都需要轮询。但低效也是相对的,视情况而定,也可通过良好的设计改善。

JAVA NIO

Java NIO提供了与标准IO不同的IO工作方式:

java nio 的几个重要组件:


image.png

Channels and Buffers

基本上,所有的 IO 在NIO 中都从一个Channel 开始。Channel 有点象流。 数据可以从Channel读到Buffer中,也可以从Buffer 写到Channel中。


image.png

常见的几种channel:

常见的几种Buffer:

selector

Selector允许单线程处理多个 Channel。如果你的应用打开了多个连接(通道),但每个连接的流量都很低,使用Selector就会很方便。例如,在一个聊天服务器中。


image.png
             A Thread uses a Selector to handle 3 Channel's 
IO VS NIO
image.png

netty原理

Netty是一个事件驱动、异步IO的网络框架。高性能,吞吐量更高,延迟更低、高性能之处主要来自于其I/O 模型和线程处理模型(Reactor),前者决定如何收发数据,后者决定如何处理数据。


image.png
Reactor模式
image.png

Reactor模式(反应器模式)是一种处理一个或多个客户端并发交付服务请求的事件设计模式。当请求抵达后,服务处理程序使用I/O多路复用策略,然后同步地派发这些请求至相关的请求处理程序。

核心组件交互图如下:

image.png
Basic Reactor Design
image.png
NIO实现Reactor
image.png
image.png image.png
image.png
image.png

上面是用java nio实现基本Reactor模式,需要自己写很多代码。

Worker Thread Pools 版 Reactor模式
image.png
多Reactor模式
image.png

netty就是使用的就是多Reactor模式:

image.png
Netty的异步处理:
image.png
常见操作:
image.png
netty功能特性
image.png
Netty核心组件
image.png
ChannelPipeline处理入站事件和出站操作
image.png
image.png
image.png
Netty Reactor 工作架构图
image.png
image.png
image.png

下面有一个基于netty的简单的IM demo,可以简单了解netty的编程方法和思想:
Server:

package com.yuanjia.im.netty.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;

/**
 * Created by bruce on 2019/6/10.
 */
public class Server {
    private int port;

    public Server(int port) {
        this.port = port;
    }

    public void start() {

        // netty服务端ServerBootstrap启动的时候,默认有两个eventloop分别是bossGroup和 workGroup

        EventLoopGroup boosGroup = new NioEventLoopGroup(1);   // bossGroup
        EventLoopGroup workerGroup = new NioEventLoopGroup();  // workGroup
        try {
            ServerBootstrap sbs = new ServerBootstrap().group(boosGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast("decoder", new StringDecoder());
                            ch.pipeline().addLast("encoder", new StringEncoder());
                            //ch.pipeline().addLast(new DiscardInboundHandler());
                            ch.pipeline().addLast(new ServerHandler());
                        };
                    }).option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture future = sbs.bind(port).sync();
            System.out.println("Server start listen at " + port);
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            boosGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8090;
        }
        new Server(port).start();
    }
}

Client:

package com.yuanjia.im.netty.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;

/**
 * Created by bruce on 2019/6/10.
 */
public class Client {

    //server 's ip 这里需要用户根据自己server的ip来做修改,例如我这里是10.1.132.194
    private static final String HOST = System.getProperty("host", "10.1.132.194");
    //port 8090
    private static final int PORT = Integer.parseInt(System.getProperty("port", "8090"));

    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>(){
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            p.addLast("decoder", new StringDecoder());
                            p.addLast("encoder", new StringEncoder());
                            p.addLast(new ClientHandler());
                        }
                    });
            ChannelFuture future = b.connect(HOST, PORT).sync();
            //控制台输入消息给服务端让服务端转给给另外一个客户端
            //消息如:  认识你真高兴我的小伙伴@10.1.8.30
            //消息就转发给了10.1.8.30
            Scanner sc = new Scanner(System.in);
            while(sc.hasNext()){
                String message = sc.nextLine();
                future.channel().writeAndFlush(message);
            }
            future.channel().closeFuture().sync();
        } finally {
            group.spliterator();
        }
    }

}

ServerHandler:

package com.yuanjia.im.netty.server;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.internal.PlatformDependent;

import java.util.concurrent.ConcurrentMap;

/**
 * Created by bruce on 2019/6/10.
 */
@ChannelHandler.Sharable
public class ServerHandler implements ChannelInboundHandler {

    //存放客户端和服务端之间的连接
    private static ConcurrentMap<String,ChannelHandlerContext> channelConcurrentMap = PlatformDependent.newConcurrentHashMap();


    @Override
    public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        //获取客户端的ip
        String hostString = ((SocketChannel)channelHandlerContext.channel()).remoteAddress().getHostString();
        System.out.println(hostString + " online");
        //将客户端和服务端之间的连接存放在concurrentHashMap中
        channelConcurrentMap.put(hostString,channelHandlerContext);
    }

    @Override
    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
        System.out.println("ServerHandler channelRead....");
        //客户端通过Terminal连接后的输入格式为  message@ip ,这个消息接收者ip会收到message消息
        //例如:   你最近还好吗,Bruce@10.1.128.1
        String messageString = o.toString();
        String[] messages = messageString.split("@");
        String message = messages[0];
        String targetHost = messages[1];
        System.out.println(channelHandlerContext.channel().remoteAddress()+"->Server :"+o.toString());
        ChannelHandlerContext targetChannelHandlerContext = channelConcurrentMap.get(targetHost);
        targetChannelHandlerContext.write(channelHandlerContext.channel().remoteAddress() + " say : " + message);
        targetChannelHandlerContext.flush();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {

    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {

    }
}

ClientHandler

package com.yuanjia.im.netty.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Created by bruce on 2019/6/10.
 */
public class ClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Client01Handler Active");
        //ctx.fireChannelActive();  // 若把这一句注释掉将无法将event传递给下一个ClientHandler
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

代码地址:https://github.com/bruceChi/nettyIM
参考资料:
1、http://tutorials.jenkov.com/java-nio
2、 UNIX Network Programming
3、 https://www.jianshu.com/p/63a006e5e22d
4、http://tutorials.jenkov.com/netty
5、http://www.dre.vanderbilt.edu/~schmidt/PDF/reactor-siemens.pdf
6、 https://www.cnblogs.com/winner-0715/p/8733787.html
7、 http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

上一篇 下一篇

猜你喜欢

热点阅读