socket应用之多线程与NIO
by 等流星的牧羊人
在本次网络编程作业中,对HTTPServer一共采用了两种方案进行性能改进。
第一种是比较常规的多线程,第二种则是采用了NIO的多路复用模式。
多线程
在现有的HTTPServer中,一个很大的问题在于,它只有一个用户线程。当接受一个HTTPClient的请求,并进行处理的时候,由于响应设计I/O操作,需要一定响应时间。这期间,用户线程可以看作是阻塞的,无法响应新的Client提交过来的请求。
多线程可以解决这个问题,用户线程只要负责不断接受新来的请求,而对每个请求的处理,则是通过新启的子线程处理,这样就不会导致用户线程阻塞。从而服务器可以支持并发请求。
通过implements Runnable接口封装了一个响应请求的独立的线程类。
HTTPServer.java
package com.zaper.sea.river.socketwork;
import java.net.*;
/**
* Created by Zaper Ocean on 2016/11/16.
*/
public class HTTPServer{
public static void main(String args[]) {
System.out.println(HTTPServer.class.getResource("me/a.json"));
int port;
ServerSocket serverSocket;
try {
port = Integer.parseInt(args[0]);
}catch (Exception e) {
System.out.println("port = 8080 (默认)");
port = 8080; //默认端口为8080
}
try{
serverSocket = new ServerSocket(port);
System.out.println("服务器正在监听端口:" + serverSocket.getLocalPort());
while(true) { //服务器在一个无限循环中不断接收来自客户的TCP连接请求
try{
//等待客户的TCP连接请求
final Socket socket = serverSocket.accept();
System.out.println("建立了与客户的一个新的TCP连接,该客户的地址为:"+
socket.getInetAddress()+":" + socket.getPort());
//启动一个新线程响应客户请求
Thread myHTTPServer=new Thread(new HTTPServerRunnable(socket));
myHTTPServer.start();
}catch(Exception e){e.printStackTrace();}
} //#while
}catch (Exception e) {e.printStackTrace();}
}
}
HTTPServerRunnable.java
package com.zaper.sea.river.socketwork;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
/**
* Created by Zaper Ocean on 2016/11/17.
*/
public class HTTPServerRunnable implements Runnable {
private Socket socket = null;
public HTTPServerRunnable(Socket socket){this.socket = socket;}
public void run() {
try {
/*读取HTTP请求信息*/
InputStream socketIn= null; //获得输入流
socketIn = this.socket.getInputStream();
Thread.sleep(500); //睡眠500毫秒,等待HTTP请求
int size=socketIn.available();
byte[] requestBuffer=new byte[size];
socketIn.read(requestBuffer);
String request=new String(requestBuffer);
System.out.println(request); //打印HTTP请求数据
/*解析HTTP请求*/
//获得HTTP请求的第一行
String firstLineOfRequest=request.substring(0,request.indexOf("\r\n"));
//解析HTTP请求的第一行
String[] parts=firstLineOfRequest.split(" ");
String uri=parts[1]; //获得HTTP请求中的uri
/*决定HTTP响应正文的类型*/
String contentType;
if(uri.indexOf("html")!=-1 || uri.indexOf("htm")!=-1)
contentType="text/html";
else if(uri.indexOf("jpg")!=-1 || uri.indexOf("jpeg")!=-1)
contentType="image/jpeg";
else if(uri.indexOf("gif")!=-1)
contentType="image/gif";
else
contentType="application/octet-stream";
/*创建HTTP响应结果 */
//HTTP响应的第一行
String responseFirstLine="HTTP/1.1 200 OK\r\n";
//HTTP响应头
String responseHeader="Content-Type:"+contentType+"\r\n\r\n";
//获得读取响应正文数据的输入流
System.out.println(uri);
System.out.println(HTTPServer.class.getResource("/root/"+uri));
InputStream in=HTTPServer.class.getResourceAsStream("/root/"+uri);
/*发送HTTP响应结果 */
OutputStream socketOut=socket.getOutputStream(); //获得输出流
//发送HTTP响应的第一行
socketOut.write(responseFirstLine.getBytes());
//发送HTTP响应的头
socketOut.write(responseHeader.getBytes());
//发送HTTP响应的正文
int len=0;
byte[] buffer=new byte[128];
while((len=in.read(buffer))!=-1)
socketOut.write(buffer,0,len);
Thread.sleep(1000); //睡眠1秒,等待客户接收HTTP响应结果
socket.close(); //关闭TCP连接
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
HTTPServer运行 HTTPClient运行运行结果
NIO
除了第一种常规的MultiThread方案,还采取了NIO方案。
在我们的HTTPServer中,需要进行的IO操作是非常多的。读写文件与读写socket都是,而众所周知,IO操作是最耗时间的操作。那么如何减少这部分时间呢?JDK在1.4之后为我们提供了一个新思路,NIO。
对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是kernel。当一个read操作发生时,它会经历两个阶段:
- 等待数据准备
- 将数据从内核拷贝到进程中
接着介绍一下IO的几种模型,最权威的总结来自Stevens的UNP(Unix Network Progamming),有以下五种:
- blocking IO
- nonblocking IO
- IO multiplexing
- signal driven IO
- asynchronous IO
而无论原始的HTTPServer还是多线程方案都属于第一种BIO
NIO当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。
所以,blocking IO的特点就是在IO执行的两个阶段都被block了。
所以,很明显BIO对性能的影响很明显。
回到第一种方案,我们之所以使用多线程,主要原因在于socket.accept()、socket.read()、socket.write()三个主要函数都是同步阻塞的,当一个连接在处理I/O的时候,系统是阻塞的,如果是单线程的话必然就挂死在那里;但CPU是被释放出来的,开启多线程,就可以让CPU去处理更多的事情。其实这也是所有使用多线程的本质:
-
利用多核。
-
当I/O阻塞系统,但CPU空闲的时候,可以利用多线程使用CPU资源。
在NIO中,单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。
当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
这个图和blocking IO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。但是,用select的优势在于它可以同时处理多个connection。
所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。 select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。
NIO模型BIO模型之所以需要多线程,是因为在进行I/O操作的时候,一是没有办法知道到底能不能写、能不能读,只能"傻等",即使通过各种估算,算出来操作系统没有能力进行读写,也没法在socket.read()和socket.write()函数中返回,这两个函数无法进行有效的中断。所以除了多开线程另起炉灶,没有好的办法利用CPU。
NIO的读写函数可以立刻返回,这就给了我们不开线程利用CPU的最好机会:如果一个连接不能读写(socket.read()返回0或者socket.write()返回0),我们可以把这件事记下来,记录的方式通常是在Selector上注册标记位,然后切换到其它就绪的连接(channel)继续进行读写。
下面具体看下如何利用事件模型单线程处理所有I/O请求:
NIO的主要事件有几个:读就绪、写就绪、有新连接到来。
我们首先需要注册当这几个事件到来的时候所对应的处理器。然后在合适的时机告诉事件选择器:我对这个事件感兴趣。对于写操作,就是写不出去的时候对写事件感兴趣;对于读操作,就是完成连接和系统没有办法承载新读入的数据的时;对于accept,一般是服务器刚启动的时候;而对于connect,一般是connect失败需要重连或者直接异步调用connect的时候。
其次,用一个死循环选择就绪的事件,会执行系统调用,还会阻塞的等待新事件的到来。新事件到来的时候,会在selector上注册标记位,标示可读、可写或者有连接到来。
注意,select是阻塞的,无论是通过操作系统的通知(epoll)还是不停的轮询(select,poll),这个函数是阻塞的。所以我们可以放心大胆地在一个while(true)里面调用这个函数而不用担心CPU空转。
了解完NIO的multiplexing原理,然后还有几个概念。
Buffer ,是一个对象, 它包含一些要写入或者刚读出的数据.最常用的缓冲区类型是 ByteBuffer。常用状态变量包括 position,limit和capacity。
Channel ,是一个对象,可以通过它读取和写入数据。拿 NIO 与原来的 I/O 做个比较,通道就像是流。
所有数据都通过 Buffer 对象来处理。我们永远不会将字节直接写入通道中,相反,是将数据写入包含一个或者多个字节的缓冲区。同样,也不会直接从通道中读取字节,而是将数据从通道读入缓冲区,再从缓冲区获取这个字节。
最后就是代码了。
NioHTTPServer
package com.zaper.sea.river.socketwork.niosocket;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.logging.Logger;
/**
* Created by Zaper Ocean on 2016/11/19.
*/
public class NioHTTPServer {
private static final Logger log = Logger.getLogger(NioHTTPServer.class.getName());
private Selector selector;
public NioHTTPServer bindInet(String ip,int port) throws IOException {
ServerSocketChannel serverChannel=ServerSocketChannel.open();
/**
* 与Selector一起使用时,Channel必须处于非阻塞模式下。
*/
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(ip,port));
/**Opens a selector.
*
*/
selector=Selector.open();
/**
* Operation-set bit for socket-accept operations.
*
* <p> Suppose that a selection key's interest set contains
* <tt>OP_ACCEPT</tt> at the start of a <a
* href="Selector.html#selop">selection operation</a>. If the selector
* detects that the corresponding server-socket channel is ready to accept
* another connection, or has an error pending, then it will add
* <tt>OP_ACCEPT</tt> to the key's ready set and add the key to its
* selected-key set. </p>
*
* 通过Selector监听Channel时对连接感兴趣
*/
serverChannel.register(selector,SelectionKey.OP_ACCEPT);
return this;
}
public void polling() throws IOException {
log.info("Nio HTTP Server stated polling :");
while (true){
if(selector.select()>0){
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()){
SelectionKey sk=it.next();
if(sk.isAcceptable()){
log.info("Nio HTTP Server: SelectionKey is acceptable.");
ServerSocketChannel serverSocketChannel = (ServerSocketChannel)sk.channel();
/**
* Accepts a connection made to this channel's socket.
*
* <p> If this channel is in non-blocking mode then this method will
* immediately return <tt>null</tt> if there are no pending connections.
* Otherwise it will block indefinitely until a new connection is available
* or an I/O error occurs.
*
* 获得客户端连接通道
*/
SocketChannel socketChannel = serverSocketChannel.accept();
log.info("Nio HTTP Server: accept client socket " + socketChannel);
socketChannel.configureBlocking(false);
socketChannel.register(sk.selector(), SelectionKey.OP_READ);
}
else if(sk.isReadable()){
log.info("Nio HTTP Server: SelectionKey is readable.");
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = (SocketChannel)sk.channel();
/**SocketChannel.read()将数据从SocketChannel读到Buffer中。
* read()方法返回的int值表示读了多少字节进Buffer里。
* 如果返回的是-1,表示已经读到了流的末尾(连接关闭了)。
*/
while(true) {
int readBytes = socketChannel.read(byteBuffer);
if(readBytes>0) {
log.info("Nio HTTP Server: readBytes = " + readBytes);
String request=new String(byteBuffer.array(), 0, readBytes);
log.info("Nio HTTP Server: data = \n" + request);
byteBuffer.flip();
socketChannel.write(getResponseBuffer(request));
//socketChannel.write(byteBuffer);
sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
break;
}
}
socketChannel.close();
}
else if(sk.isWritable()){
log.info("Nio HTTP Server: SelectionKey is writable.");
//获取与该信道关联的缓冲区,里面有之前读取到的数据
ByteBuffer buf = (ByteBuffer) sk.attachment();
//重置缓冲区,准备将数据写入信道
buf.flip();
SocketChannel clntChan = (SocketChannel) sk.channel();
//将数据写入到信道中
clntChan.write(buf);
if (!buf.hasRemaining()){
//如果缓冲区中的数据已经全部写入了信道,则将该信道感兴趣的操作设置为可读
sk.interestOps(SelectionKey.OP_READ);
}
//为读入更多的数据腾出空间
buf.compact();
}
/**
* 注意每次迭代末尾的keyIterator.remove()调用。
*/
it.remove();
}
}
}
}
public ByteBuffer getResponseBuffer(String request) throws IOException {
//清空ByteBuffer
// byteBuffer.clear();
/*解析HTTP请求*/
//获得HTTP请求的第一行
String firstLineOfRequest=request.substring(0,request.indexOf("\r\n"));
//解析HTTP请求的第一行
String[] parts=firstLineOfRequest.split(" ");
String uri=parts[1]; //获得HTTP请求中的uri
/*决定HTTP响应正文的类型*/
String contentType;
if(uri.indexOf("html")!=-1 || uri.indexOf("htm")!=-1)
contentType="text/html";
else if(uri.indexOf("jpg")!=-1 || uri.indexOf("jpeg")!=-1)
contentType="image/jpeg";
else if(uri.indexOf("gif")!=-1)
contentType="image/gif";
else
contentType="application/octet-stream";
/*创建HTTP响应结果 */
//HTTP响应的第一行
String responseFirstLine="HTTP/1.1 200 OK\r\n";
//HTTP响应头
String responseHeader="Content-Type:"+contentType+"\r\n\r\n";
//获得读取响应正文数据的输入流
System.out.println(uri);
System.out.println(NioHTTPServer.class.getResource("/root/"+uri));
String path=NioHTTPServer.class.getResource("/root/"+uri).getPath();
FileInputStream fis = new FileInputStream(path);
Charset charset = Charset.forName("GBK");// 创建GBK字符集
// 得到文件通道
FileChannel fc = fis.getChannel();
// 分配与文件尺寸等大的缓冲区
ByteBuffer bf = ByteBuffer.allocate((int) fc.size());
try {
// 整个文件内容全读入缓冲区,即是内存映射文件
fc.read(bf);
System.out.println(bf.position());
// 把缓冲中当前位置回复为零
bf.rewind();
System.out.println(bf.position());
// 输出缓冲区中的内容
System.out.println(charset.decode(bf));
} catch (IOException e) {
e.printStackTrace();
}
bf.flip();
return bf;
}
public static void main(String[] args) throws IOException {
new NioHTTPServer().bindInet("localhost", 8080).polling();
}
}
NioHTTPServer运行 HTTPClient运行运行结果