java特性之NIO及neety

2018-10-15  本文已影响0人  机器不能学习

NIO及new io是java1.4之后的引入的api,之所以是新的io API,那么它可以代替标准的IO。

首先先比较新IO和标准IO

新IO是面向Buffer和Channel的,而标准是面向流的。面向流意味着每次从流中读取数据不缓存在任何地方,所以不能对读取到的数据进行移动操作。而面向Buffer可以做到这一点。

最重要的一点是阻塞与非阻塞。首先了解下同步异步,阻塞非阻塞。同步:请求后必须等待结果,结果返回后才能做其他事。异步:请求后不必等待结果,结果产生后会通知调用者。阻塞非阻塞是描述线程的状态,阻塞是指请求结果后,线程挂起的状态,非阻塞指不能立即返回结果但不会阻塞当前线程。

NIO是非阻塞,IO是阻塞的。在标准IO时,每次调用write(),read()都会使该线程阻塞。直到操作结束。在这个过程中线程只有等待。为了解决这个问题只有用多线程来保障同时的读写操作,但是多线程的引入使资源大量被浪费(线程是珍贵资源),而且频繁的进行上下文切换使得CPU大量浪费。由此引入了NIO,NIO的操作是无阻塞的,这个意味着一个线程可以多个读取,从而可以做到一个线程管理多个Channel。

标准IO NIO

标准IO虽然有缺点,但是它的读写效率是高于NIO的(在对一个流大量读写时),所以它们各有使用场景。NIO用于处理大量的链接,但是带宽都不大的场景。而标准IO用于少量连接带宽超大的场景。



NIO关键概念

Buffer

缓存区是与通道进行数据传输的工具。NIO提供了多个缓存区

byteBuffer

CharBuffer

DoubleBuffer

FloatBuffer

IntBuffer

LongBuffer

ShortBuffer

其包含了各个数据类型。

Buffer的内部有position,limit,capacity。分别代表了,当前指针位置,最大指针位置,容量。在写操作时,limit=capacity。读操作是position不能超过limit。

生成一个buffer(以byte类型为例)  ByteBuffer buf = ByteBuffer.allocate(48);//指定了大小

写入buffer,从Channel中读入读取,put写入数据。

channel.read(buffer)  或者 buffer.put("content")

读取buffer,Channel从buffer中读取数据,get()获取数据

channel.write(buffer)或者buffer.get()

读写转换,一个buffer既可以读也可以写,其原理实际是改变了position指针。

调用flip()

rewind()方法,使position归零,可以实现重读的操作。

clear(),compact()。前者是清除所有的数据,后者是清除刚读完的数据。实际上数据都没有被清除,而是操作了指针。

Channel

通道类似于流。但是通道既可以双向读写,也是异步操作。而且通道读写都必须经过buffer。

这些是Java NIO中最重要的通道的实现:

FileChannel:从文件中读写数据

DatagramChannel:能通过UDP读写网络中的数据

SocketChannel:能通过TCP读写网络中的数据

ServerSocketChannel:可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel

通道也可以对buffer进行多个的读写,叫做scatter(分散)和Gather(聚合)。前者是一个通道写入多个buffer,后者是多个buffer读取一个通道。读写都有前后顺序。

channel.read({buffer1,buffer2}) buffer1被读取完后buffer2才会进行。

通道间的传输。NIO提供了两个函数

transferFrom(position, count, FromChannel)  ,  transferTo(position, count, toChannel),可以由名字知道,第一个函数从FromChannel运输数据到操作通道,数据量为position+count。函数二则是相反,从操作通道运输到toChannel。



文件传输

可以连接到一个文件,对文件进行操作,但是是阻塞式的。不能设为无阻塞。

RandomAccessFile aFile =newRandomAccessFile("data/nio-data.txt","rw");

  FileChannel inChannel = aFile.getChannel();

先建立一个地址,再创建一个文件传输通道

它有read(),write(),close(),size(),force()方法 就不一一介绍了。



网络编程

通道

服务器channel -> serverSocketChannel

是一个用来监听请求的通道。

客户端channel -> SocketChannel

它是一个网络连接的套接字通道,它会连接到服务器的ServerSocketChannle并会创建一个SocketChannel与之连接。

Selector

通道管理者,可以监听多个通道,并且知晓读写每个通道的读写操作,所以个一个Selector可以管理多条通道。

channle通过注册,把自己交给通道管理者,在注册之前要设置自己的阻塞属性为非阻塞。因为Selector只支持非阻塞的操作。注册的时,还需要填写自己感兴趣的操作,方便Selector进行监听并分配到相应的方法。操作包括了

Connect

Accept

Read

Write

其中client端只有Connet操作,server端四个都可以使用。但是只有ServerSocketChannel才能使用Accept操作。后面会细说这个Accept属性。

通道触发了一个事件意思是该事件已经就绪。所以,某个channel成功连接到另一个服务器称为“连接就绪”。一个server socket channel准备好接收新进入的连接称为“接收就绪”。一个有数据可读的通道可以说是“读就绪”。等待写数据的通道可以说是“写就绪”。

这四种事件用SelectionKey的四个常量来表示:

SelectionKey.OP_CONNECT

SelectionKey.OP_ACCEPT

SelectionKey.OP_READ

SelectionKey.OP_WRITE

对channle注册完之后,会返回一个SelectorKey,这个对象像一个标识,每个client请求的封装类。里面包含了多个属性

interest集合                                                所支持的兴趣属性

ready集合                                                    已经就绪的兴趣属性

Channel                                                      所对应的Channel

Selector                                                      所对应的selector

附加的对象(可选)                                可以添加一些附加信息,来作为唯一标识

interes()可以查看所有interest集合

read集合可以调用Bool的方法来判断属于哪类兴趣

selectionKey.isAcceptable();

selectionKey.isConnectable();

selectionKey.isReadable();

selectionKey.isWritable();

从SelectionKey访问Channel和Selector很简单。如下:

Channel  channel  = selectionKey.channel();

Selector selector = selectionKey.selector(); 

选择通道  方法select(),select(time),selectNew()

这个方法会像Selector的keys集合返回有多少个已经就绪的通道。如果没有这个方法,会报错 java.nio.channels.IllegalBlockingModeException。select()也是一个阻塞方法,它会阻塞到你有已经就绪的通道且它们的SelectionKey是你感兴趣的。而selectNow()是非阻塞,返回一切就绪通道。

wakeUp(),有阻塞就有唤醒,这个函数会唤醒阻塞方法。

实例程序:

package NIO;

import java.io.IOException;

import java.net.InetSocketAddress;

import java.net.Socket;

import java.nio.Buffer;

import java.nio.ByteBuffer;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.ServerSocketChannel;

import java.nio.channels.SocketChannel;

import java.util.Iterator;

import java.util.Set;

public class Socket_server {

private ServerSocketChannel server;

private Selector selector;

public Socket_server() throws IOException {

initSelector();

initServer();

run();

}

public void initSelector() throws IOException {

//创建一个selector

selector=Selector.open();

System.out.println("initSelector");

}

public void initServer() throws IOException {

System.out.println("initServer");

//开启serverChannel

server=ServerSocketChannel.open();

//SocketChannel的打开需要一个Address,指定地址和端口

InetSocketAddress address=new InetSocketAddress("localhost",9091);

//绑定一个监听端口

server.bind(address);

//设定是否用阻塞

server.configureBlocking(false);

//将管道注册到Selector中,并指定该管道具有accept操作,这意味着

server.register(selector, SelectionKey.OP_ACCEPT);

}

public void run() throws IOException {

while(true) {

selector.select();

//取出每一个准备好的i/o,server不在内,但server会不断的调用isAcceptable对应的方法

Set<SelectionKey> sets=selector.selectedKeys();

//将之变为一个iterator,方便遍历

Iterator<SelectionKey> iterator=sets.iterator();

SelectionKey key=null;

while(iterator.hasNext()) {

key=iterator.next();

Hadle(key);

iterator.remove();

}

selector.select();

}

}

public void Hadle(SelectionKey key) throws IOException {

//根据key的属性,决定其操作

if(key.isAcceptable()) {

//这个方法会被反复调用

HadleAp(key);

}else if (key.isReadable()) {

HadleRead(key);

}else if (key.isWritable()) {

HableWrite(key);

}else if (key.isConnectable()) {

}

}

//客户端传来一个可读的通道

public void HadleRead(SelectionKey key) throws IOException {

//通过key得到一个通道

SocketChannel channel=(SocketChannel) key.channel();

ByteBuffer buffer=ByteBuffer.allocate(1024);

int i=channel.read(buffer);

while(i!=0) {

i=channel.read(buffer);

}

//dosomething hadle the buffer date

channel.close();

}

//可写文件,将要返回的内容写入文件

public void HableWrite(SelectionKey key) throws IOException {

SocketChannel channel=(SocketChannel) key.channel();

System.out.println("1"+Thread.currentThread().getName());

ByteBuffer buffer=ByteBuffer.allocate(1024);

String content="需要返回的内容";

buffer.put(content.getBytes());

channel.write(buffer);

while(buffer.hasRemaining()) {

channel.write(buffer);

}

channel.close();

}

public void HadleAp(SelectionKey key) throws IOException {

System.out.println("dasd");

//通过key得到一个serverSocketChannel

//ServerSocketChannel channel=(ServerSocketChannel) key.channel();

//ServerSocketChannle可以通过accept方法,获得连接到服务器的连接

//SocketChannel client=channel.accept();

//设置为非阻塞

//client.configureBlocking(false);

//将client注册到selector,selector将会对其任务进行处理

//client.register(selector, SelectionKey.OP_WRITE);

//key.interestOps(SelectionKey.OP_ACCEPT);

}

public static void main(String[] args) throws IOException {

Socket_server server=new Socket_server();

}

}


Netty

netty是一个NIO框架爱,它可以简单快速的开发网络应用程度。

一般的网络框架都是基于Reactor的。Reactor模式分为单线程模式,多线程模式,主从模式。

单线程模式,同一个Reactor线程作为客户端 服务端。

单线程模式

多线程模式,专门有一个Reactor线程负责处理accept线程,其他读写等操作交给Reactor线程池处理。一个Reactor线程可以同时处理多条链路,但个链路的相关操作只能一个Reactor负责,这样避免了并发问题。

多线程模型

主从模型,使用一个独立的Reactor线程来处理连接操作,连接成功后又使用一个Reactor来处理accept操作。链路创建后,再用一条Reactor来处理相关操作

,主从模型

Netty线程模型同时支持了三种模型,可以通过配置来切换。

大致的模型构建

package netty;

import java.nio.channels.NonWritableChannelException;

import java.nio.channels.SocketChannel;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.Channel;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInboundHandlerAdapter;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.ChannelOutboundHandlerAdapter;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioServerSocketChannel;

//服务器端代码

public class Model_Run {

private NioEventLoopGroup Accept_pool;

private NioEventLoopGroup Handle_pool;

public Model_Run() {

init();

}

public void init() {

//第一个创建的是接受client连接的线程组,1表示多线程模式,n(n>1)表示主从模式

Accept_pool=new NioEventLoopGroup(1);

//第二个创建处理具体业务的线程组,默认值为CPU*2,建议设置为空闲CPU*2

Handle_pool=new NioEventLoopGroup(4);

//用于配置服务的类

ServerBootstrap bootstrap=new ServerBootstrap();

//设置group

bootstrap.group(Accept_pool,Handle_pool)

//设置Channel的类,服务器端用的NioServerSocketChannel

.channel(NioServerSocketChannel.class)

//绑定具体的事件

.childHandler(new ChannelInitializer<Channel>() {

@Override

protected void initChannel(Channel sc) throws Exception {

//可以绑定多个或者一个

((Channel) sc).pipeline().addLast(new Server_Handle());

sc.pipeline().addLast(new ChannelOutboundHandlerAdapter())

.addLast(new ChannelInboundHandlerAdapter());

}

})

/*

* 每次client发来的数据要和服务器进行三次握手后才能进入准备状态,

* 前两此握手时,服务器会将所有客户加入一个A队列,通过最后一次握手,

* 服务器会将该客户放入B队列,AB队列size之和为backlog,如果a+b>backlog,就会

* 拒绝其他客户的请求,所以这个数应该相应大些

*/

.option(ChannelOption.SO_BACKLOG, 128)

//保持通信

.option(ChannelOption.SO_KEEPALIVE, true)

//接受的缓存区

.option(ChannelOption.SO_RCVBUF, 32*1024)

//发送的缓存区

.option(ChannelOption.SO_SNDBUF, 32*1024);

//绑定监控port,一个或者多个

try {

//并设置为异步

ChannelFuture future=bootstrap.bind(9091).sync();

future.channel().closeFuture().sync();

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}finally {

//线程组优雅退出

Accept_pool.shutdownGracefully();

Handle_pool.shutdownGracefully();

}

}

}

逻辑代码一般是写入childHandler的,在childHandler中先会创建一个ChannelPipeline,它是ChannelHandle的执行列表,用于处理通道中接受发送和过滤。

对于每个Channel,都会创建一个ChannelPipeline赋予它,Channel有唯一个ChannelPipeline。

Pipeline是一个ChannelHandler的列表,它可以控制Handler的流程,和执行顺序还可以动态的添加 删除 替代pipeline里的ChannelHandler。

ChanelHandlerContext:每一个ChannelHandler被添加到Pipeline后都会创建一个ChanelHandlerContext中,这个上下文可以和其他的ChannelHandler进行通信,由此可以控制事件的执行顺序。

netty5将ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter合并到ChannelHandlerAdapter中

上一篇下一篇

猜你喜欢

热点阅读