redis

Redis高性能原理探秘-IO模型

2021-12-25  本文已影响0人  javacoo

Redis的性能由哪些因素决定?

网络通信模型

最终目标: 增加客户端的访问连接数量

Redis 为什么那么快

Redis的高性能主要依赖于几个方面。

从请求处理开始分析

当我们在客户端向Redis Server发送一条指令,并且得到Redis回复的整个过程中,Redis做了什么呢?

redis-req.png

要处理命令,则redis必须完整地接收客户端的请求,并将命令解析出来,再将结果读出来,通过网络回
写到客户端。整个工序分为以下几个部分:

其中解析执行是纯cpu/内存操作,而接收和返回主要是IO操作,首先我们先来看通信的过程。

网络IO的通信原理

网络通信.png

同样,用一幅图来描述网络数据的传输流程

IO多路复用机制

Redis的通信采用的是多路复用机制,什么是多路复用机制呢?

在理解多路复用之前,我们先来了解一下BIO。

BIO模型

在Java中,如果要实现网络通信,我们会采用Socket套接字来完成。
Socket不是一个协议,而是一个通信模型。其实它最初是BSD发明的,主要用于一台电脑的两个进程间通信,后来把它用到了两台电脑的进程间通信。所以,可以把它简单理解为进程间通信,不是什么高级的东西。主要做的事情不就是:

可见,Socket其实就是I/O操作,Socket并不仅限于网络通信,在网络通信中,它涵盖了网络层、传输层、会话层、表示层、应用层——其实这都不需要记,因为Socket通信时候用到了IP和端口,仅这两个就表明了它用到了网络层和传输层;而且它无视多台电脑通信的系统差别,所以它涉及了表示层;一般Socket都是基于一个应用程序的,所以会涉及到会话层和应用层。

构建基础的BIO通信模型
@Slf4j
public class BIOServerSocket {
    public static final int PORT = 8080;
    public static void main(String[] args) {
        try(ServerSocket serverSocket = new ServerSocket(PORT)) {
            log.info("启动服务:监听端口:{}",PORT);
            //阻塞等待监听一个客户端连接,返回的socket表示连接的客户端信息
            Socket socket = serverSocket.accept();
            log.info("客户端:{}连接成功",socket.getPort());
            //阻塞(InputStream是阻塞的)等待获取客户端请求报文
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String clientStr = bufferedReader.readLine();
            log.info("收到客户端发送的消息:{}",clientStr);
            //构建输出流,写回客户端
            BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            bufferedWriter.write("server return message:" + clientStr + "\n");
            //清空缓冲区,发送消息
            bufferedWriter.flush();
        } catch (IOException ioException) {
            ioException.printStackTrace();
        }
    }
}

测试效果:

bio-1.png

我们通过对BIOServerSocket进行改造,关注case1和case2部分。

@Slf4j
public class BIOServerSocket2 {
    public static final int PORT = 8080;
    public static void main(String[] args) {
        try(ServerSocket serverSocket = new ServerSocket(PORT)) {
            log.info("启动服务:监听端口:{}",PORT);
            //循环接收请求
            while (true){
                //阻塞等待监听一个客户端连接,返回的socket表示连接的客户端信息
                Socket socket = serverSocket.accept();
                log.info("客户端:{}连接成功",socket.getPort());
                //阻塞(InputStream是阻塞的)等待获取客户端请求报文
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String clientStr = bufferedReader.readLine();
                log.info("收到客户端发送的消息:{}",clientStr);
                //等待20秒
                Thread.sleep(20*1000);
                //构建输出流,写回客户端
                BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
                bufferedWriter.write("server return message:" + clientStr + "\n");
                //清空缓冲区,发送消息
                bufferedWriter.flush();
            }
        } catch (IOException ioException) {
            ioException.printStackTrace();
        } catch (InterruptedException interruptedException) {
            interruptedException.printStackTrace();
        }
    }
}

客户端代码:BIOClientSocket

@Slf4j
public class BIOClientSocket {
    public static final String HOST = "127.0.0.1";
    public static final int PORT = 8080;
    public static void main(String[] args) {
        try(Socket socket = new Socket(HOST,PORT)) {
            log.info("客户端连接端口:{}:{}",HOST,PORT);
            //构建输出流,请求服务端
            BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            bufferedWriter.write("client1 msg: hello \n");
            //清空缓冲区,发送消息
            bufferedWriter.flush();
            //阻塞(InputStream是阻塞的)等待获取客户端请求报文
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String clientStr = bufferedReader.readLine();
            log.info("收到服务端返回的消息:{}",clientStr);

        } catch (IOException ioException) {
            ioException.printStackTrace();
        }
    }
}

接着,把BIOClientSocket复制两份(client1、client2),同时向BIOServerSocket发起请求。

现象: client1先发送请求到Server端,由于Server端等待20s才返回,导致client2的请求一直被阻塞。

bio-2.png

这个情况会导致一个问题,如果服务端在同一个时刻只能处理一个客户端的连接,而如果一个网站同时有1000个用户访问,那么剩下的999个用户都需要等待,而这个等待的耗时取决于前面的请求的处理时长,如图所示。

bio-req.png
基于多线程优化BIO

为了让服务端能够同时处理更多的客户端连接,避免因为某个客户端连接阻塞导致后续请求被阻塞,于是引入多线程技术,代码如下。

BIOServerSocketWithThread

@Slf4j
public class BIOServerSocketWithThread {
    public static TaskExecutor taskExecutor = new TaskExecutor() {
        ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(),
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(), Executors.defaultThreadFactory());
        @Override
        public void execute(Runnable task) {
            executorService.execute(task);
        }
    };
    public static final int PORT = 8080;
    public static void main(String[] args) {
        try(ServerSocket serverSocket = new ServerSocket(PORT)) {
            log.info("启动服务:监听端口:{}",PORT);
            //循环接收请求
            while (true){
                //阻塞等待监听一个客户端连接,返回的socket表示连接的客户端信息
                Socket socket = serverSocket.accept();
                log.info("客户端:{}连接成功",socket.getPort());
                // I/O异步执行
                taskExecutor.execute(new SocketThread(socket));
            }
        } catch (IOException ioException) {
            ioException.printStackTrace();
        }
    }
}

SocketThread

@Slf4j
public class SocketThread implements Runnable{
    private Socket socket;

    public SocketThread(Socket socket) {
        this.socket = socket;
    }
    @Override
    public void run() {
        try {
            //阻塞(InputStream是阻塞的)等待获取客户端请求报文
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String clientStr = bufferedReader.readLine();
            log.info("收到客户端发送的消息:{}",clientStr);
            //等待20秒
            Thread.sleep(20*1000);
            //构建输出流,写回客户端
            BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            bufferedWriter.write("server return message:" + clientStr + "\n");
            //清空缓冲区,发送消息
            bufferedWriter.flush();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //TODO 关闭IO流
        }
    }
}

结果:

bio-3.png

当引入了多线程之后,每个客户端的链接(Socket),我们可以直接给到线程池去执行,而由于这个过程是异步的,所以并不会同步阻塞影响后续链接的监听,因此在一定程度上可以提升服务端链接的处理数量。

bi-req-2.png
NIO非阻塞IO
nio-1.png

客户端

nio-2.png nio-loop.png

上述这种NIO的使用方式,仍然存在一个问题,就是客户端或者服务端需要通过一个线程不断轮询才能
获得结果,而这个轮询过程中会浪费线程资源。

多路复用IO
多路复用.png

单线程Reactor 模型(高性能I/O设计模式)

Reactor模型有三个重要的组件:

多线程单Reactor模型

reactor-2.png

多线程改造-MultiDispatchHandler,我们直接将前面的Reactor单线程模型改成多线程,其实我们就是把IO阻塞的问题通过异步的方式做了优化,源码如下:

@Slf4j
public class MutilDispatchHandler implements Runnable{

    SocketChannel channel;

    private Executor executor= Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    public MutilDispatchHandler(SocketChannel channel) {
        this.channel = channel;
    }

    @Override
    public void run() {
        processor();
    }
    private void processor(){
        executor.execute(new ReaderHandler(channel));
    }
    static class ReaderHandler implements Runnable{
        private SocketChannel channel;

        public ReaderHandler(SocketChannel channel) {
            this.channel = channel;
        }

        @Override
        public void run() {
            log.info("{}------",Thread.currentThread().getName());
            ByteBuffer buffer=ByteBuffer.allocate(1024);
            try {
                Thread.sleep(100000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            int len=0,total=0;
            String msg="";
            try {
                do {
                    len = channel.read(buffer);
                    if(len>0){
                        total+=len;
                        msg+=new String(buffer.array());
                    }
                } while (len > buffer.capacity());
                System.out.println("total:"+total);

                //msg=表示通信传输报文
                //耗时2s
                //登录: username:password
                //ServetRequets: 请求信息
                //数据库的判断
                //返回数据,通过channel写回到客户端
                log.info("{}: Server receive Msg:{}",channel.getRemoteAddress(),msg);
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                if(channel!=null){
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}
上一篇 下一篇

猜你喜欢

热点阅读