Pipe 学习

2019-08-26  本文已影响0人  Astimegoes

A pair of channels that implements a unidirectional pipe.

A pipe consists of a pair of channels:
A writable Pipe.SinkChannel
A readable Pipe.SourceChannel
Once some bytes are written to the sink channel they can be read from the source channel in exactly the
order in which they were written.

Pipe类上的注释,大致意思就是Pipe包含一对 channel:
一个只可写的SinkChannel,一个只可读的SourceChannel,写入SinkChannel的byte可以从SourceChannel中按照写入顺序读出。

Pipe 主要方法

//获取 SourceChannel 
public abstract SourceChannel source();

//获取 SinkChannel
public abstract SinkChannel sink();

//打开一个新的Pipe
public static Pipe open() throws IOException;

SinkChannel声明


public abstract static class SinkChannel extends AbstractSelectableChannel implements WritableByteChannel, GatheringByteChannel

AbstractSelectableChannel 可以 register 到一个 Selector上
WritableByteChannel 可以写入
GatheringByteChannel 支持gathering, 我理解就是可以一次写入多个ByteBuffer

SourceChannel声明


public abstract static class SourceChannel extends AbstractSelectableChannel implements ReadableByteChannel, ScatteringByteChannel

AbstractSelectableChannel 可以 register 到一个 Selector上
ReadableByteChannel 可以读取
GatheringByteChannel 支持gathering, 我理解就是可以一次读入多个ByteBuffer

一个简单的列子

Charset charset = Charset.forName("utf-8");

Pipe pipe = Pipe.open();
Pipe.SinkChannel sinkChannel = pipe.sink();
Pipe.SourceChannel sourceChannel = pipe.source();

//写入的buffer
byte[] bytes = "message".getBytes(charset);
ByteBuffer writeBuffer = ByteBuffer.wrap(bytes);

//读入的buffer
ByteBuffer readBuffer = ByteBuffer.allocate(bytes.length);

sinkChannel.write(writeBuffer);
sourceChannel.read(readBuffer);

//读模式
readBuffer.flip();

System.out.println(charset.decode(readBuffer).toString());

设置阻塞、非阻塞


//设置 非阻塞 且 sinkChannel 无写入
sourceChannel.configureBlocking(false);
System.out.println("read ..");
sourceChannel.read(readBuffer);
// 输出 read end ..0
System.out.println("read end .." + readBuffer.flip().remaining());

readBuffer.clear();

// 设置 阻塞 且 sinkChannel 无写入
sourceChannel.configureBlocking(true);
System.out.println("read ..");
sourceChannel.read(readBuffer);
// 上一句阻塞,不会输出
System.out.println("read end ..");

和Selector一起

Selector selector = Selector.open();
sourceChannel.configureBlocking(false);     //默认是阻塞,不设置会报错
sourceChannel.register(selector, SelectionKey.OP_READ);

Pipe 在 OpenJDK 10 下的实现 PipeImpl


/**
 * A simple Pipe implementation based on a socket connection.
 * (基于socket实现)
 */

class PipeImpl extends Pipe
{
    ...
    // 生成握手的随机 bytes
    private static final Random RANDOM_NUMBER_GENERATOR = new SecureRandom();

    // SourceChannel  SinkChannel  
    private SourceChannel source;
    private SinkChannel sink;

    PipeImpl(final SelectorProvider sp) throws IOException {
        AccessController.doPrivileged(new Initializer(sp));
    }
    public SourceChannel source() {
        return source;
    }
    public SinkChannel sink() {
        return sink;
    }

    private class Initializer implements PrivilegedExceptionAction<Void> {

        @Override
        public Void run() throws IOException {
            LoopbackConnector connector = new LoopbackConnector();
            connector.run();
        }

        private class LoopbackConnector implements Runnable {

            @Override
            public void run() {
                //sc1、sc2 连接建立使用,完成后会关闭
                ServerSocketChannel ssc = null;

                //SourceChannel 中使用,读取
                SocketChannel sc1 = null;

                // SinkChannel  中使用,写入
                SocketChannel sc2 = null;

                try {
                    // 握手用的两个ByteBuffer
                    ByteBuffer secret = ByteBuffer.allocate(NUM_SECRET_BYTES);
                    ByteBuffer bb = ByteBuffer.allocate(NUM_SECRET_BYTES);

                    // 本机地址
                    InetAddress lb = InetAddress.getByName("127.0.0.1");
                   
                    InetSocketAddress sa = null;
                    for(;;) {    
                        if (ssc == null || !ssc.isOpen()) {
                            //打开ServerSocketChannel等待连接
                            ssc = ServerSocketChannel.open();
                            ssc.socket().bind(new InetSocketAddress(lb, 0));
                            sa = new InetSocketAddress(lb, ssc.socket().getLocalPort());
                        }

                        // 建立连接
                        sc1 = SocketChannel.open(sa);

                        //生成随机握手信息
                        RANDOM_NUMBER_GENERATOR.nextBytes(secret.array());
                        //发送
                        do {
                            sc1.write(secret);
                        } while (secret.hasRemaining());
                        secret.rewind();

                        // 获取连接
                        sc2 = ssc.accept();
                        do {
                            //读取握手信息
                            sc2.read(bb);
                            //依据ByteBuffer长度保证不会少读
                        } while (bb.hasRemaining());
                        bb.rewind();
                        
                        if (bb.equals(secret))
                            //握手成功,跳出循环
                            break;

                        sc2.close();
                        sc1.close();
                    }

                    source = new SourceChannelImpl(sp, sc1);
                    sink = new SinkChannelImpl(sp, sc2);
                } catch (IOException e) {
                    ...
                } finally {
                    ...
                }
            }
        }
    }
}

SinkChannelImpl

class SourceChannelImpl extends Pipe.SourceChannel implements SelChImpl{
  ...
  public int read(ByteBuffer dst) throws IOException {
        try {
            return sc.read(dst);
        } catch (AsynchronousCloseException x) {
            close();
            throw x;
        }
    }
  ...
}

SourceChannelImpl 基本也这样。



P.P.S. 水平有限,如有错误请指正。

上一篇下一篇

猜你喜欢

热点阅读