nio

socket应用之多线程与NIO

2016-11-22  本文已影响424人  等流星的牧羊人

by 等流星的牧羊人

在本次网络编程作业中,对HTTPServer一共采用了两种方案进行性能改进。

第一种是比较常规的多线程,第二种则是采用了NIO的多路复用模式。

多线程

在现有的HTTPServer中,一个很大的问题在于,它只有一个用户线程。当接受一个HTTPClient的请求,并进行处理的时候,由于响应设计I/O操作,需要一定响应时间。这期间,用户线程可以看作是阻塞的,无法响应新的Client提交过来的请求。

多线程可以解决这个问题,用户线程只要负责不断接受新来的请求,而对每个请求的处理,则是通过新启的子线程处理,这样就不会导致用户线程阻塞。从而服务器可以支持并发请求。

通过implements Runnable接口封装了一个响应请求的独立的线程类。

HTTPServer.java

package com.zaper.sea.river.socketwork;

import java.net.*;
/**
 * Created by Zaper Ocean on 2016/11/16.
 */
public class HTTPServer{
    public static void main(String args[]) {
        System.out.println(HTTPServer.class.getResource("me/a.json"));
        int port;
        ServerSocket serverSocket;

        try {
            port = Integer.parseInt(args[0]);
        }catch (Exception e) {
            System.out.println("port = 8080 (默认)");
            port = 8080; //默认端口为8080
        }

        try{
            serverSocket = new ServerSocket(port);
            System.out.println("服务器正在监听端口:" + serverSocket.getLocalPort());

            while(true) { //服务器在一个无限循环中不断接收来自客户的TCP连接请求
                try{
                    //等待客户的TCP连接请求
                    final Socket socket = serverSocket.accept();
                    System.out.println("建立了与客户的一个新的TCP连接,该客户的地址为:"+
                            socket.getInetAddress()+":" + socket.getPort());
                    //启动一个新线程响应客户请求
                    Thread myHTTPServer=new Thread(new HTTPServerRunnable(socket));
                    myHTTPServer.start();
                }catch(Exception e){e.printStackTrace();}
            } //#while
        }catch (Exception e) {e.printStackTrace();}
    }
}


HTTPServerRunnable.java

package com.zaper.sea.river.socketwork;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

/**
 * Created by Zaper Ocean on 2016/11/17.
 */
public class HTTPServerRunnable implements Runnable {
    private Socket socket = null;

    public HTTPServerRunnable(Socket socket){this.socket = socket;}

    public void run() {
        try {
            /*读取HTTP请求信息*/
            InputStream socketIn= null; //获得输入流
            socketIn = this.socket.getInputStream();
            Thread.sleep(500);  //睡眠500毫秒,等待HTTP请求
            int size=socketIn.available();
            byte[] requestBuffer=new byte[size];
            socketIn.read(requestBuffer);
            String request=new String(requestBuffer);
            System.out.println(request); //打印HTTP请求数据

            /*解析HTTP请求*/
            //获得HTTP请求的第一行
            String firstLineOfRequest=request.substring(0,request.indexOf("\r\n"));
            //解析HTTP请求的第一行
            String[] parts=firstLineOfRequest.split(" ");
            String uri=parts[1]; //获得HTTP请求中的uri

            /*决定HTTP响应正文的类型*/
            String contentType;
            if(uri.indexOf("html")!=-1 || uri.indexOf("htm")!=-1)
                contentType="text/html";
            else if(uri.indexOf("jpg")!=-1 || uri.indexOf("jpeg")!=-1)
                contentType="image/jpeg";
            else if(uri.indexOf("gif")!=-1)
                contentType="image/gif";
            else
                contentType="application/octet-stream";


            /*创建HTTP响应结果 */
            //HTTP响应的第一行
            String responseFirstLine="HTTP/1.1 200 OK\r\n";
            //HTTP响应头
            String responseHeader="Content-Type:"+contentType+"\r\n\r\n";
            //获得读取响应正文数据的输入流
            System.out.println(uri);
            System.out.println(HTTPServer.class.getResource("/root/"+uri));
            InputStream in=HTTPServer.class.getResourceAsStream("/root/"+uri);

            /*发送HTTP响应结果 */
            OutputStream socketOut=socket.getOutputStream(); //获得输出流
            //发送HTTP响应的第一行
            socketOut.write(responseFirstLine.getBytes());
            //发送HTTP响应的头
            socketOut.write(responseHeader.getBytes());
            //发送HTTP响应的正文
            int len=0;
            byte[] buffer=new byte[128];
            while((len=in.read(buffer))!=-1)
                socketOut.write(buffer,0,len);

            Thread.sleep(1000);  //睡眠1秒,等待客户接收HTTP响应结果
            socket.close(); //关闭TCP连接
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

运行结果

HTTPServer运行 HTTPClient运行

NIO

除了第一种常规的MultiThread方案,还采取了NIO方案。

在我们的HTTPServer中,需要进行的IO操作是非常多的。读写文件与读写socket都是,而众所周知,IO操作是最耗时间的操作。那么如何减少这部分时间呢?JDK在1.4之后为我们提供了一个新思路,NIO。

对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是kernel。当一个read操作发生时,它会经历两个阶段:

  1. 等待数据准备
  2. 将数据从内核拷贝到进程中

接着介绍一下IO的几种模型,最权威的总结来自Stevens的UNP(Unix Network Progamming),有以下五种:

而无论原始的HTTPServer还是多线程方案都属于第一种BIO

NIO

当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。
所以,blocking IO的特点就是在IO执行的两个阶段都被block了。

所以,很明显BIO对性能的影响很明显。

回到第一种方案,我们之所以使用多线程,主要原因在于socket.accept()、socket.read()、socket.write()三个主要函数都是同步阻塞的,当一个连接在处理I/O的时候,系统是阻塞的,如果是单线程的话必然就挂死在那里;但CPU是被释放出来的,开启多线程,就可以让CPU去处理更多的事情。其实这也是所有使用多线程的本质:

IO multiplexing

在NIO中,单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。

当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
这个图和blocking IO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。但是,用select的优势在于它可以同时处理多个connection。

所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。 select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。

NIO模型

BIO模型之所以需要多线程,是因为在进行I/O操作的时候,一是没有办法知道到底能不能写、能不能读,只能"傻等",即使通过各种估算,算出来操作系统没有能力进行读写,也没法在socket.read()和socket.write()函数中返回,这两个函数无法进行有效的中断。所以除了多开线程另起炉灶,没有好的办法利用CPU。

NIO的读写函数可以立刻返回,这就给了我们不开线程利用CPU的最好机会:如果一个连接不能读写(socket.read()返回0或者socket.write()返回0),我们可以把这件事记下来,记录的方式通常是在Selector上注册标记位,然后切换到其它就绪的连接(channel)继续进行读写。

下面具体看下如何利用事件模型单线程处理所有I/O请求:

NIO的主要事件有几个:读就绪、写就绪、有新连接到来。

我们首先需要注册当这几个事件到来的时候所对应的处理器。然后在合适的时机告诉事件选择器:我对这个事件感兴趣。对于写操作,就是写不出去的时候对写事件感兴趣;对于读操作,就是完成连接和系统没有办法承载新读入的数据的时;对于accept,一般是服务器刚启动的时候;而对于connect,一般是connect失败需要重连或者直接异步调用connect的时候。

其次,用一个死循环选择就绪的事件,会执行系统调用,还会阻塞的等待新事件的到来。新事件到来的时候,会在selector上注册标记位,标示可读、可写或者有连接到来。

注意,select是阻塞的,无论是通过操作系统的通知(epoll)还是不停的轮询(select,poll),这个函数是阻塞的。所以我们可以放心大胆地在一个while(true)里面调用这个函数而不用担心CPU空转。

了解完NIO的multiplexing原理,然后还有几个概念。

Buffer ,是一个对象, 它包含一些要写入或者刚读出的数据.最常用的缓冲区类型是 ByteBuffer。常用状态变量包括 position,limit和capacity
Channel ,是一个对象,可以通过它读取和写入数据。拿 NIO 与原来的 I/O 做个比较,通道就像是流。
所有数据都通过 Buffer 对象来处理。我们永远不会将字节直接写入通道中,相反,是将数据写入包含一个或者多个字节的缓冲区。同样,也不会直接从通道中读取字节,而是将数据从通道读入缓冲区,再从缓冲区获取这个字节。

最后就是代码了。

NioHTTPServer

package com.zaper.sea.river.socketwork.niosocket;

import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.logging.Logger;

/**
 * Created by Zaper Ocean on 2016/11/19.
 */
public class NioHTTPServer {
    private static final Logger log = Logger.getLogger(NioHTTPServer.class.getName());
    private Selector selector;

    public NioHTTPServer bindInet(String ip,int port) throws IOException {
        ServerSocketChannel serverChannel=ServerSocketChannel.open();
        /**
         *  与Selector一起使用时,Channel必须处于非阻塞模式下。
         */
        serverChannel.configureBlocking(false);
        serverChannel.socket().bind(new InetSocketAddress(ip,port));

        /**Opens a selector.
         *
         */
        selector=Selector.open();

        /**
         * Operation-set bit for socket-accept operations.
         *
         * <p> Suppose that a selection key's interest set contains
         * <tt>OP_ACCEPT</tt> at the start of a <a
         * href="Selector.html#selop">selection operation</a>.  If the selector
         * detects that the corresponding server-socket channel is ready to accept
         * another connection, or has an error pending, then it will add
         * <tt>OP_ACCEPT</tt> to the key's ready set and add the key to its
         * selected-key set.  </p>
         *
         * 通过Selector监听Channel时对连接感兴趣
         */
        serverChannel.register(selector,SelectionKey.OP_ACCEPT);
        return this;
    }

    public void polling() throws IOException {
        log.info("Nio HTTP Server stated polling :");
        while (true){
            if(selector.select()>0){
                Iterator<SelectionKey> it = selector.selectedKeys().iterator();
                while (it.hasNext()){
                    SelectionKey sk=it.next();
                    if(sk.isAcceptable()){
                        log.info("Nio HTTP Server: SelectionKey is acceptable.");

                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel)sk.channel();

                        /**
                         * Accepts a connection made to this channel's socket.
                         *
                         * <p> If this channel is in non-blocking mode then this method will
                         * immediately return <tt>null</tt> if there are no pending connections.
                         * Otherwise it will block indefinitely until a new connection is available
                         * or an I/O error occurs.
                         *
                         * 获得客户端连接通道
                         */
                        SocketChannel socketChannel = serverSocketChannel.accept();

                        log.info("Nio HTTP Server: accept client socket " + socketChannel);
                        socketChannel.configureBlocking(false);
                        socketChannel.register(sk.selector(), SelectionKey.OP_READ);
                    }
                    else if(sk.isReadable()){
                        log.info("Nio HTTP Server: SelectionKey is readable.");

                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        SocketChannel socketChannel = (SocketChannel)sk.channel();

                        /**SocketChannel.read()将数据从SocketChannel读到Buffer中。
                         * read()方法返回的int值表示读了多少字节进Buffer里。
                         * 如果返回的是-1,表示已经读到了流的末尾(连接关闭了)。
                         */
                        while(true) {
                            int readBytes = socketChannel.read(byteBuffer);
                            if(readBytes>0) {
                                log.info("Nio HTTP Server: readBytes = " + readBytes);
                                String request=new String(byteBuffer.array(), 0, readBytes);
                                log.info("Nio HTTP Server: data = \n" + request);
                                byteBuffer.flip();
                                socketChannel.write(getResponseBuffer(request));
                                //socketChannel.write(byteBuffer);
                                sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                                break;
                            }
                        }
                        socketChannel.close();
                    }
                    else if(sk.isWritable()){
                        log.info("Nio HTTP Server: SelectionKey is writable.");

                        //获取与该信道关联的缓冲区,里面有之前读取到的数据
                        ByteBuffer buf = (ByteBuffer) sk.attachment();
                        //重置缓冲区,准备将数据写入信道
                        buf.flip();
                        SocketChannel clntChan = (SocketChannel) sk.channel();
                        //将数据写入到信道中
                        clntChan.write(buf);
                        if (!buf.hasRemaining()){
                            //如果缓冲区中的数据已经全部写入了信道,则将该信道感兴趣的操作设置为可读
                            sk.interestOps(SelectionKey.OP_READ);
                        }
                        //为读入更多的数据腾出空间
                        buf.compact();
                    }
                    /**
                     * 注意每次迭代末尾的keyIterator.remove()调用。
                     */
                    it.remove();

                }
            }
        }
    }

    public ByteBuffer getResponseBuffer(String request) throws IOException {
        //清空ByteBuffer
//        byteBuffer.clear();

        /*解析HTTP请求*/
        //获得HTTP请求的第一行
        String firstLineOfRequest=request.substring(0,request.indexOf("\r\n"));
        //解析HTTP请求的第一行
        String[] parts=firstLineOfRequest.split(" ");
        String uri=parts[1]; //获得HTTP请求中的uri

        /*决定HTTP响应正文的类型*/
        String contentType;
        if(uri.indexOf("html")!=-1 || uri.indexOf("htm")!=-1)
            contentType="text/html";
        else if(uri.indexOf("jpg")!=-1 || uri.indexOf("jpeg")!=-1)
            contentType="image/jpeg";
        else if(uri.indexOf("gif")!=-1)
            contentType="image/gif";
        else
            contentType="application/octet-stream";

        /*创建HTTP响应结果 */
        //HTTP响应的第一行
        String responseFirstLine="HTTP/1.1 200 OK\r\n";
        //HTTP响应头
        String responseHeader="Content-Type:"+contentType+"\r\n\r\n";
        //获得读取响应正文数据的输入流
        System.out.println(uri);
        System.out.println(NioHTTPServer.class.getResource("/root/"+uri));

        String path=NioHTTPServer.class.getResource("/root/"+uri).getPath();
        FileInputStream fis = new FileInputStream(path);
        Charset charset = Charset.forName("GBK");// 创建GBK字符集
        // 得到文件通道
        FileChannel fc = fis.getChannel();
        // 分配与文件尺寸等大的缓冲区
        ByteBuffer bf = ByteBuffer.allocate((int) fc.size());

        try {
            // 整个文件内容全读入缓冲区,即是内存映射文件
            fc.read(bf);
            System.out.println(bf.position());
            // 把缓冲中当前位置回复为零
            bf.rewind();
            System.out.println(bf.position());
            // 输出缓冲区中的内容
            System.out.println(charset.decode(bf));
        } catch (IOException e) {
            e.printStackTrace();
        }
        bf.flip();
        return bf;
    }

    public static void main(String[] args) throws IOException {
        new NioHTTPServer().bindInet("localhost", 8080).polling();
    }
}

运行结果

NioHTTPServer运行 HTTPClient运行
上一篇下一篇

猜你喜欢

热点阅读