Java-NIO:用Channel代替Stream

2019-05-05  本文已影响0人  Cris_Ma

简介

NIO主要组成部分:

Channel有以下类型:

Buffer有以下类型:

Selector:

Selector可以理解为Channel的管理器,一个Selector可以管理多个Channel。

Channel和Buffer的使用:

FileChannel

FileChannel.force(boolean meta)方法将通道里尚未写入磁盘的数据强制写到磁盘上,参数表示是否包含元数据。
FileChannel.truncate(int len)可以用来截取指定长度的数据

public static boolean writeToFile(byte[] bytes, String destination) {

        File f = new File(destination);
        if (!f.exists()) {
            try {
                f.createNewFile();
            } catch (IOException e) {
                e.printStackTrace();
                return false;
            }
        }

        ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
        buffer.put(bytes);
        buffer.flip();

        RandomAccessFile raf = null;
        FileChannel channel = null;

        try {
            raf = new RandomAccessFile(f, "rw");
            channel = raf.getChannel();
            channel.write(buffer);
            channel.
            return true;
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (raf != null) {
                    raf.close();
                }
                if (channel != null) {
                    channel.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return false;
    }


public static byte[] readFileBytes(String path) {
        File f = new File(path);

        if (!f.exists() || f.isDirectory()) {
            return null;
        }

        RandomAccessFile raf = null;
        FileChannel channel = null;
        try {
            raf = new RandomAccessFile(f, "r");
            channel = raf.getChannel();
            ByteBuffer buffer = ByteBuffer.allocate((int) raf.length());
            channel.read(buffer);
            return buffer.array();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (raf != null) {
                    raf.close();
                }
                if (channel != null) {
                    channel.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        return null;
    }

这是使用FileChannel来写入/读取字节码的函数,Buffer使用的ByteBuffer。

ByteBuffer

ByteBuffer核心为put和get两个方法,put用来讲字节写入,get用来读出

它的成员变量和主要方法如下:

Selector

使用selector对channel进行管理时,首先要将chennel注册到Selector:

Selector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, Selectionkey.OP_READ);

register方法的第二个参数表示Selector对channel的什么事件进行监听,主要有以下四种:

可以使用 | 符号对多个事件进行监听:

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

Selector属性

Selector实际上维护了一个channel的集合,用SelectionKey对channel进行管理。SelectionKey主要有以下属性:

int interestSet = selectionKey.interestOps();

boolean isInterestedInAccept  = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = (interestSet & SelectionKey.OP_CONNECT)  == SelectionKey.OP_ACCEPT;
boolean isInterestedInRead    = (interestSet & SelectionKey.OP_READ)  == SelectionKey.OP_ACCEPT;
boolean isInterestedInWrite   = (interestSet & SelectionKey.OP_WRITE)  == SelectionKey.OP_ACCEPT; //false

key.interestOps(SelectionKey.OP_WRITE);
//key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);//注销WRITE事件
int ops = key.interestOps();
isInterestedInWrite   = (ops & SelectionKey.OP_WRITE)  == SelectionKey.OP_ACCEPT;//true
int rops = key.readyOps();
boolean isAcceptable = (rops & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
if(isAcceptable){
    ///...
    }

或者更简单的方式:

  if (key.isAcceptable()) {
      //....
  }
Channel  channel  = selectionKey.channel();//获取channel以后可以进行读写等操作

Selector主要方法

Selector创建以后,可以通过select方法检测是否有channel符合当前检测的事件,它有以下三种方式:

通过select方法获知当前有就绪事件以后,可以通过selectedKeys()方法获取当前就绪的channel对应的key

 while (true) {

    int readyChannels = selector.select();
    if (readyChannels == 0) {
            continue;
        }

    Set<SelectionKey> selectedKeys = selector.selectedKeys();
    Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

    while(keyIterator.hasNext()) {
        SelectionKey key = keyIterator.next();
        if(key.isAcceptable()) {
        //key.channel()
        // a connection was accepted by a ServerSocketChannel.
        } else if (key.isConnectable()) {
        // a connection was established with a remote server.
        } else if (key.isReadable()) {
        // a channel is ready for reading
        } else if (key.isWritable()) {
        // a channel is ready for writing
        }
        keyIterator.remove();
    }
}

注意每次迭代末尾的keyIterator.remove()。Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。下次该通道变成就绪时,Selector会再次将其放入selectedKeys。

其他方法:

Pipe

Pipe是一个阻塞队列,可以用于线程间的通讯,一个pipe内部包含一个队列,所以它只能进行单向的数据传递。

 static void piletest(){

        try {
            Pipe p= Pipe.open();
            final Pipe.SinkChannel sinkChannel = p.sink();
            final Pipe.SourceChannel souceChannel = p.source();

            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        sinkChannel.write(ByteBuffer.wrap("Message from sender Thread".getBytes()));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }).start();

            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        int len = souceChannel.read(buffer);
                        System.out.println("get message--->" + new String(buffer.array(),0,len));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }).start();

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

也可以用stream的方式:

final PipedOutputStream pOut = new PipedOutputStream();
final PipedInputStream pIn = new PipedInputStream();

new Thread(new Runnable() {
    @Override
    public void run() {
        try {
            pOut.connect(pIn);
            pOut.write("Message from sender Thread".getBytes());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}).start();

new Thread(new Runnable() {
    @Override
    public void run() {
        try {
            byte[] buffer = new byte[1024];
            int len = pIn.read(buffer);
            System.out.println("get message--->" + new String(buffer,0,len));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}).start();

或者Reader和Writer:

        final PipedWriter writer = new PipedWriter();
        final PipedReader reader = new PipedReader();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    writer.connect(reader);
                    writer.write("message from other thread");
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    char[] r = new char[1024];
                    int len = reader.read(r);
                    System.out.println("get message--->" + new String(r, 0, len));

                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();

上一篇 下一篇

猜你喜欢

热点阅读