则不达的Java专题

3 java的IO

2016-12-04  本文已影响0人  则不达

java nio

1 通道和缓冲器

1.1 简介

java.nio.*包的引入是为了提高速度,并且旧的IO包已经用nio重新实现过,所以即使你不用nio,也已经收益了

下面的格式可能比较乱,需要配合GetChannel例子来理解

例子,代码比较短,直接贴过来

package com.cowthan.nio;

import java.nio.*;
import java.nio.channels.*;
import java.io.*;

public class GetChannel {
    private static final int BSIZE = 1024;

    public static void main(String[] args) throws Exception {
        
        // 写文件
        FileChannel fc = new FileOutputStream("data.txt").getChannel();
        fc.write(ByteBuffer.wrap("Some text ".getBytes()));  //
        fc.close();
        
        // 写文件:append
        fc = new RandomAccessFile("data.txt", "rw").getChannel();
        fc.position(fc.size()); // Move to the end
        fc.write(ByteBuffer.wrap("Some more".getBytes()));
        fc.close();
        
        // 读文件
        fc = new FileInputStream("data.txt").getChannel();
        ByteBuffer buff = ByteBuffer.allocate((int) fc.size());
        fc.read(buff);
        buff.flip();
        
        
        System.out.println("读取:");
        String str = new String(buff.array(), "utf-8");
        System.out.println(str);
        
        System.out.println("读取2:");
        while (buff.hasRemaining()){
            System.out.print((char) buff.get());
        }
    }
} /*
 * Output: Some text Some more
 */// :~

1.2 更多:flip, clear,compact和mark,reset操作

///复制文件的部分代码(更优化的复制文件是用transfer接口,直接通道相连)
ByteBuffer buff = ByteBuffer.allocate(1024); //1K
while(src.read(buff) != -1){
    buff.flip(); //准备卸车
    dest.write(buff); //卸车了
    buff.clear(); //其实这才是真正的卸车,并送回通道那头(可以再次read(buff)了)
}

缓冲器细节:四大索引

看图:

对应的方法:


public final Buffer flip() {
    limit = position;
    position = 0;
    mark = UNSET_MARK;
    return this;
}

public final Buffer rewind() {
    position = 0;
    mark = UNSET_MARK;
    return this;
}

public final boolean hasRemaining() {
    return position < limit;
}

public final Buffer clear() {
    position = 0;
    mark = UNSET_MARK;
    limit = capacity;
    return this;
}


public final Buffer mark() {
    mark = position;
    return this;
}

public final Buffer reset() {
    if (mark == UNSET_MARK) {
        throw new InvalidMarkException("Mark not set");
    }
    position = mark;
    return this;
}

例子:交换相邻的两个字符

/**
 * 给一个字符串,交换相邻的两个字符
 */
private static void symmetricScramble(CharBuffer buffer) {
    while (buffer.hasRemaining()) {
        buffer.mark();
        char c1 = buffer.get();
        char c2 = buffer.get();
        buffer.reset();
        buffer.put(c2).put(c1);
    }
}

/*
思考:如果没有mark和reset功能,你怎么做?用postion方法记录和恢复刚才位置
*/
private static void symmetricScramble2(CharBuffer buffer) {
    while (buffer.hasRemaining()) {
        int position = buffer.position();
        char c1 = buffer.get();
        char c2 = buffer.get();
        buffer.position(position);
        buffer.put(c2).put(c1);
    }
}

1.3 连接通道

上面说过,nio通过大块数据的移动来加快读写速度,前面这个大小都由ByteBuffer来控制,
其实还有方法可以直接将读写两个Channel相连

这也是实现文件复制的更好的方法

public class TransferTo {
    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.out.println("arguments: sourcefile destfile");
            System.exit(1);
        }
        FileChannel in = new FileInputStream(args[0]).getChannel(), out = new FileOutputStream(
                args[1]).getChannel();
        in.transferTo(0, in.size(), out);
        // 或者:
        // out.transferFrom(in, 0, in.size());
    }
} // /:~

1.4 字符流:CharBuffer和Charset,其实就是byte[]和编码问题

ByteBuffer是最原始的,其实就是字节流,适用于二进制数据的读写,图片文件等

但我们更常用的,其实是字符串

====先休息一下,说说怎么得到编码相关的一些信息吧====

//打印系统支持的所有编码,及其别名
import java.nio.charset.*;
import java.util.*;

public class AvailableCharSets {
    public static void main(String[] args) {
        SortedMap<String, Charset> charSets = Charset.availableCharsets();
        Iterator<String> it = charSets.keySet().iterator();
        while (it.hasNext()) {
            String csName = it.next();
            System.out.print(csName);
            Iterator aliases = charSets.get(csName).aliases().iterator();
            if (aliases.hasNext())
                System.out.print(": ");
            while (aliases.hasNext()) {
                System.out.print(aliases.next());
                if (aliases.hasNext())
                    System.out.print(", ");
            }
            System.out.println();
        }
    }
}
/*
部分输出:
KOI8-U: koi8_u
Shift_JIS: shift_jis, x-sjis, sjis, shift-jis, ms_kanji, csShiftJIS
TIS-620: tis620, tis620.2533
US-ASCII: ANSI_X3.4-1968, cp367, csASCII, iso-ir-6, ASCII, iso_646.irv:1983, ANSI_X3.4-1986, ascii7, default, ISO_646.irv:1991, ISO646-US, IBM367, 646, us
UTF-16: UTF_16, unicode, utf16, UnicodeBig
UTF-16BE: X-UTF-16BE, UTF_16BE, ISO-10646-UCS-2, UnicodeBigUnmarked
UTF-16LE: UnicodeLittleUnmarked, UTF_16LE, X-UTF-16LE
UTF-32: UTF_32, UTF32
UTF-32BE: X-UTF-32BE, UTF_32BE
UTF-32LE: X-UTF-32LE, UTF_32LE
UTF-8: unicode-1-1-utf-8, UTF8
windows-1250: cp1250, cp5346
windows-1251: cp5347, ansi-1251, cp1251
windows-1252: cp5348, cp1252
windows-1253: cp1253, cp5349
*/

=====ByteBuffer.asCharBuffer()的局限:没指定编码,容易乱码=====

1.5 视图缓冲器:ShortBuffer,IntBuffer, LongBuffer,FloatBuffer,DoubleBuffer,CharBuffer

ByteBuffer系列的类继承关系挺有意思,可以研究研究

ByteArrayBuffer是其最通用子类,一般操作的都是ByteArrayBuffer

ByteBuffer.asLongBuffer(), asIntBuffer(), asDoubleBuffer()等一系列

例子:ViewBuffers.java

1.6 字节序

1.7 Scatter/Gather

一个Channel,多个Buffer,相当于多个运煤车在一个通道工作

读到多个Buffer里:

ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body   = ByteBuffer.allocate(1024);
ByteBuffer[] bufferArray = { header, body };
channel.read(bufferArray);

多个Buffer往channel写:

//注意,Buffer的长度是100,但只有50个数据,就只会写入50,换句话说,只有position和limit之间的内容会被写入(put完先flip一下,才能往channel写???)
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body   = ByteBuffer.allocate(1024);
ByteBuffer[] bufferArray = { header, body };
channel.write(bufferArray);

1.8 内存映射文件:大文件的读写

大文件,如2G的文件,没法一下加载到内存中读写

MappedByteBuffer提供了一个映射功能,可以将文件部分载入到内存中,但你使用时,
感觉文件都在内存中了

MappedByteBuffer继承了ByteBuffer,所以可以像上面那样使用

MappedByteBuffer性能很高,远高于FileInputStream,FileOutputStream,RandomAccessFile的原始方式的读写,百倍速度

public static void main(String[] args) throws Exception {
        
    //创建个文件,大小是128M
    MappedByteBuffer out = new RandomAccessFile("test.dat", "rw")
            .getChannel().map(FileChannel.MapMode.READ_WRITE, 0, length);
    
    //写入
    for (int i = 0; i < length; i++)
        out.put((byte) 'x');
    
    System.out.println("写入完毕");
    
    //读取
    for (int i = length / 2; i < length / 2 + 6; i++)
        System.out.println((char) out.get(i));
}

1.9 文件加锁

用法:

public static void main(String[] args) throws Exception {
    FileOutputStream fos = new FileOutputStream("file.txt");
    FileLock fl = fos.getChannel().tryLock();//---------
    if (fl != null) {
        System.out.println("Locked File");
        TimeUnit.MILLISECONDS.sleep(100);
        fl.release();//---------------------------------
        System.out.println("Released Lock");
    }
    fos.close();
}

更多例子

package com.cowthan.nio;

//: io/LockingMappedFiles.java
// Locking portions of a mapped file.
// {RunByHand}
import java.nio.*;
import java.nio.channels.*;
import java.io.*;

public class LockingMappedFiles {
    static final int LENGTH = 0x8FFFFFF; // 128 MB
    static FileChannel fc;

    public static void main(String[] args) throws Exception {
        fc = new RandomAccessFile("test.dat", "rw").getChannel();
        MappedByteBuffer out = fc
                .map(FileChannel.MapMode.READ_WRITE, 0, LENGTH);
        for (int i = 0; i < LENGTH; i++)
            out.put((byte) 'x');
        new LockAndModify(out, 0, 0 + LENGTH / 3);
        new LockAndModify(out, LENGTH / 2, LENGTH / 2 + LENGTH / 4);
    }

    private static class LockAndModify extends Thread {
        private ByteBuffer buff;
        private int start, end;

        LockAndModify(ByteBuffer mbb, int start, int end) {
            this.start = start;
            this.end = end;
            mbb.limit(end);
            mbb.position(start);
            buff = mbb.slice();
            start();
        }

        public void run() {
            try {
                // Exclusive lock with no overlap:
                FileLock fl = fc.lock(start, end, false);
                System.out.println("Locked: " + start + " to " + end);
                // Perform modification:
                while (buff.position() < buff.limit() - 1)
                    buff.put((byte) (buff.get() + 1));
                fl.release();
                System.out.println("Released: " + start + " to " + end);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
} // /:~

2 异步IO

2.1 旧IO处理Socket的方式

要读取Socket上的Stream,就得在read时阻塞,所以每一个Socket都得一个线程管理,对于服务器来说,能开的线程数是有限的

2.2 不使用Selector,自己想法管理SocketChannel

@Override
public void run() {
    while(!isClosed && !Thread.interrupted()){
        for(String key: map.keySet()){
            SocketChannel sc = map.get(key);
            
            ByteBuffer buf = ByteBuffer.allocate(1024);
            try {
                int bytesRead = sc.read(buf);
                buf.flip();
                if(bytesRead <= 0){

                }else{
                    System.out.println("收到消息(来自" + key + "):" + Charset.forName("utf-8").decode(buf));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

2.3 Selector

使用Selector的完整示例:

Selector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
while(true) {
  int readyChannels = selector.select();   //就在这阻塞,但已经实现了一个线程管理多个Channel(SocketChannel-读写,connect事件,DatagramChannel-读写事件,SocketServerChannel-accept事件)
  if(readyChannels == 0) continue;
  Set selectedKeys = selector.selectedKeys();
  Iterator keyIterator = selectedKeys.iterator();
  while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
    keyIterator.remove();
  }
}
Selector selector = Selector.open();
SelectionKey selectionKey = sc.register(selector, SelectionKey.OP_READ);

//看Selector对哪些事件感兴趣
int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept  = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ) == SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE;

//通道中已经就绪的集合,每一次selection都得先访问这个,知道是因为哪些事件被唤醒的
int readySet = selectionKey.readyOps();
//或者:
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

//拿到Channel和Selector
Channel  channel  = selectionKey.channel();
Selector selector = selectionKey.selector();

//对应关系是:1个Selector,多个Channel,多个SelectionKey,一个Channel对应一个SelectionKey,而且一个SelectionKey可以添加一个extra数据,以满足特定需求

//select方法:这才是会阻塞的地方,注意,在这里阻塞,是性能最佳的表现
int readyCount = selector.select()  //select()阻塞到至少有一个通道在你注册的事件上就绪了
int readyCount = selector.select(long timeout) //最长会阻塞timeout毫秒(参数)
int readyCount = selector.selectNow() //不会阻塞,无则0
//返回值:有几个通道就绪
/*
select()方法返回的int值表示有多少通道已经就绪。亦即,自上次调用select()方法后有多少通
道变成就绪状态。如果调用select()方法,因为有一个通道变成就绪状态,返回了1,若再次调用select()方法,
如果另一个通道就绪了,它会再次返回1。如果对第一个就绪的channel没有做任何操作,现在就有两个就绪的通
道,但在每次select()方法调用之间,只有一个通道就绪了
*/

//有通道就绪了,就得得到这个Channel,通道存在SelectionKey里,而selector可以获得一个SelectionKey集合
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
        Channel channel = key.channel();
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
    keyIterator.remove();
}

3 DatagramChannel:UDP通信

略过

4 Pipe

http://ifeve.com/pipe/

看图:

Pipe pipe = Pipe.open();

//写入
Pipe.SinkChannel sinkChannel = pipe.sink();
String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());

buf.flip();

while(buf.hasRemaining()) {
    sinkChannel.write(buf);
}

//读取
Pipe.SourceChannel sourceChannel = pipe.source();
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = sourceChannel.read(buf);

5 Okio

github: https://github.com/square/okio

只是对旧IO的封装,没用到Channel,也没用到ByteBuffer

5.1 简介:

5.2 使用

构造BufferedSink和BufferedSource

//创建Source
Source source = Okio.source(final InputStream in, final Timeout timeout);
source(InputStream in); //new Timeout()
source(File file);
source(Path path, OpenOption... options); //java7
source(Socket socket);

//创建Sink
Sink sink = Okio.sink(OutputStream out);
sink(final OutputStream out, final Timeout timeout);
sink(File file)
appendingSink(File file)
sink(Path path, OpenOption... options)
sink(Socket socket)

//创建BufferedSource:
BufferedSource pngSource = Okio.buffer(Source source); //返回RealBufferedSource对象
BufferedSink pngSink = Okio.buffer(Sink sink); //返回RealBufferedSink对象

//从BufferedSource读取
看例子吧

//往BufferedSink写入
看例子吧

//ByteString
看例子吧


//Buffer
看例子吧

5.3 例子:来自官网

package com.cowthan.nio.okio;

import java.io.IOException;
import java.io.InputStream;

import okio.Buffer;
import okio.BufferedSource;
import okio.ByteString;
import okio.Okio;

public class Test1_png {

    public static void main(String[] args) throws IOException {
        InputStream in = Test1_png.class.getResourceAsStream("/com/demo/1.png");
        decodePng(in);
    }

    private static final ByteString PNG_HEADER = ByteString
            .decodeHex("89504e470d0a1a0a");

    public static void decodePng(InputStream in) throws IOException {
        BufferedSource pngSource = Okio.buffer(Okio.source(in));

        ByteString header = pngSource.readByteString(PNG_HEADER.size());
        if (!header.equals(PNG_HEADER)) {
            throw new IOException("Not a PNG.");
        }

        while (true) {
            Buffer chunk = new Buffer();

            // Each chunk is a length, type, data, and CRC offset.
            int length = pngSource.readInt();
            String type = pngSource.readUtf8(4);
            pngSource.readFully(chunk, length);
            int crc = pngSource.readInt();

            decodeChunk(type, chunk);
            if (type.equals("IEND"))
                break;
        }

        pngSource.close();
    }

    private static void decodeChunk(String type, Buffer chunk) {
        if (type.equals("IHDR")) {
            int width = chunk.readInt();
            int height = chunk.readInt();
            System.out.printf("%08x: %s %d x %d%n", chunk.size(), type, width,
                    height);
        } else {
            System.out.printf("%08x: %s%n", chunk.size(), type);
        }
    }

}
上一篇下一篇

猜你喜欢

热点阅读