第九节 netty前传-NIO pipe
2018-10-29 本文已影响2人
勃列日涅夫
pipe管道作为线程之间通信的一种方式
- 首先作为对比我们先了解下再BIO模式下的pipe的使用
Pipe为运行在同一个JVM中的两个线程提供了通信的能力,注意是同一个jvm上,如果在不同的jvm就是不同的进程了,pipe就无能为力了。
- Java IO中管道的创建
Java IO中的PipedOutputStream和PipedInputStream创建管道。PipedInputStream流应该和PipedOutputStream流相关联。一个线程通过PipedOutputStream写入的数据可以被另一个线程通过相关联的PipedInputStream读取出来。
- Java IO 管道的使用
read()方法和write()方法,分别是写入数据和读取数据,但是需要注意的是
这两个方法调用时会导致流阻塞,就是说如果尝试在一个线程中同时进行读和写,可能会导致线程死锁。
- 管道输出流和管道输入流之间建立连接
PipedOutputStream outputPipe = new PipedOutputStream();
//构造器方式
PipedInputStream inputPipe = new PipedInputStream(output);
或者使用两者共有的方法connect()方法连接
4 完整示例如下:
public static void main(String[] args) throws IOException {
//分别创建输入和输出的管道流
final PipedOutputStream outputPipe = new PipedOutputStream();
//PipedInputStream(output);构造器将管道数据流和管道输入流建立通信
final PipedInputStream inputPipe = new PipedInputStream(output);
//在第一个线程将数据写入管道输出流
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
try {
outputPipe.write("Hello world, pipe!".getBytes());
} catch (IOException e) {
}
}
});
//第二个线程从管道输入流中读取数据
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
try {
int data = input.read();
while(data != -1){
System.out.print((char) data);
data = inputPipe .read();
}
} catch (IOException e) {
}
}
});
thread1.start();
thread2.start();
}
-
下面为NIO模式下的管道使用
管道pipe和线程之间的关系如下图:
图片.png
由此可以看出NIO的管道和BIO模式下的管道是不同的,在NIO模式下没有输入输出流的概念但是使用发送sink和读取source的channe。使用同一个pipe实现线程之间数据的流转
- 创建pipe管道
//多个线程之间使用同一个管道
Pipe pipe = Pipe.open();
- 向管道写数据。先获取sinkChannel ,然后将数据写入sinkChannel
//1、先获取sinkChannel
Pipe.SinkChannel sinkChannel = pipe.sink();
//2、将数据写入sinkChannel
String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
while(buf.hasRemaining()) {
sinkChannel.write(buf);
}
- 从管道中读取数据。先获取sourceChannel ,然后从sourceChannel 读取数据
//1、先获取sourceChannel
Pipe.SourceChannel sourceChannel = pipe.source();
//2、然后从sourceChannel 读取数据
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf);
2、3两步的读写分别在不同的线程实现
- 完整实例:
public static void pipeExample(){
Pipe pipe = null;
ExecutorService exec = Executors.newFixedThreadPool(2);
try{
//创建pipe
pipe = Pipe.open();
final Pipe pipeTemp = pipe;
//线程一向管道中写入数据
exec.submit(new Callable<Object>(){
@Override
public Object call() throws Exception
{
Pipe.SinkChannel sinkChannel = pipeTemp.sink();//向通道中写数据
while(true){
TimeUnit.SECONDS.sleep(1);
String newData = "hello world"+System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.clear();
buf.put(newData.getBytes());
//反转后可读
buf.flip();
while(buf.hasRemaining()){
System.out.println(buf);
sinkChannel.write(buf);
}
}
}
});
//线程二读取管道中的数据
exec.submit(new Callable<Object>(){
@Override
public Object call() throws Exception
{
Pipe.SourceChannel sourceChannel = pipeTemp.source();//向通道中读数据
while(true){
TimeUnit.SECONDS.sleep(1);
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.clear();
//可读大小
int bytesRead = sourceChannel.read(buf);
System.out.println("bytesRead="+bytesRead);
while(bytesRead >0 ){
buf.flip();
byte b[] = new byte[bytesRead];
int i=0;
while(buf.hasRemaining()){
b[i]=buf.get();
System.out.printf("%X",b[i]);
i++;
}
String s = new String(b);
System.out.println("========>>>>>>"+s);
//无数据时跳出当前循环体
bytesRead = sourceChannel.read(buf);
}
}
}
});
}catch(IOException e){
e.printStackTrace();
}finally{
exec.shutdown();
}
}