基于JavaNIO的服务间的通讯

2017-10-27  本文已影响0人  tukangzheng

1、服务端的代码如下所示:

public classTCPServer {

private static finalIntegerPORT=9090;

public static voidstart()throwsException{

/**

*开启一个ServerSocketChannel

*/

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.configureBlocking(false);

serverSocketChannel.bind(newInetSocketAddress(PORT));

/**

*创建一个selector

*/

Selector selector = Selector.open();

/**

*将创建的serverSocketChannel注册到selector的选择器上,指定这个channel只关心OP_ACCEPT事件

*/

serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);

while(true){

/**

* select()操作,默认是阻塞模式的,当没有accept或者read事件道来的时候,将一直阻塞不会继续向下执行

*/

intreadChannels = selector.select();

if(readChannels <0){

continue;

}

/**

*从selector上获取IO事件,可能是accept,也可能是read

*/

Set selectionKeySet = selector.selectedKeys();

Iterator iter = selectionKeySet.iterator();

/**

*循环遍历SelectionKeys中所有的SelectionKey

*/

while(iter.hasNext()){

SelectionKey ket = iter.next();

if(ket.isAcceptable()){//处理OP_ACCEPT事件

SocketChannel socketChannel = serverSocketChannel.accept();

socketChannel.configureBlocking(false);

socketChannel.register(selector,SelectionKey.OP_READ);

}else if(ket.isReadable()){//处理OP_READ事件

SocketChannel socketChannel = (SocketChannel)ket.channel();

StringBuilder sb =newStringBuilder();

ByteBuffer buffer = ByteBuffer.allocate(1024);

intreadBytes =0;

intret =0;

/**

*注意读取数据的时候,ByteBuffer的操作,需要flip(),反转buffer,clear进行指针位置的调整

*/

while((ret = socketChannel.read(buffer)) >0){

readBytes += ret;

buffer.flip();

sb.append(Charset.forName("UTF-8").decode(buffer).toString());

buffer.clear();

}

if(readBytes ==0){

System.out.println("handle oppsite close Exception!!");

socketChannel.close();

}

String message = sb.toString();

System.out.println("Message from client : "+ message);

if(Constants.CLIENT_CLOSE.equalsIgnoreCase(message.toString().trim())){

System.out.println("client is going to shutdown!");

socketChannel.close();

}else if(Constants.SERVER_CLOSE.equalsIgnoreCase(message.trim())){

System.out.println("server is going to shutdown!");

socketChannel.close();

serverSocketChannel.close();

selector.close();

}else{

String outMessage ="Server response : "+ message;

socketChannel.write(Charset.forName("UTF-8").encode(outMessage));

}

}

//将selector上当前已经监听到的并且已经处理了的时间标记清除

iter.remove();

}

}

}

public static voidmain(String[] args){

try{

start();

}catch(Exception e) {

e.printStackTrace();

}

}

}

2、客户端的代码如下所示:

public classTCPClient {

private static finalStringHOST="127.0.0.1";

private static finalIntegerPORT=9090;

public static voidstart(String message)throwsException{

/**

*创建一个SocketChannel和一个Selector,将SocketChannel注册到Selector上面,

*并注册OP_CONNECT事件,设置SocketChannel为非阻塞模式

*/

SocketChannel socketChannel = SocketChannel.open();

socketChannel.configureBlocking(false);

//连接到指定的地址

socketChannel.connect(newInetSocketAddress(HOST,PORT));

Selector selector = Selector.open();

socketChannel.register(selector,SelectionKey.OP_CONNECT);

while(true){

if(socketChannel.isConnected()){

socketChannel.write(Charset.forName("UTF-8").encode(message));

if(message ==null|| message.equalsIgnoreCase(Constants.CLIENT_CLOSE)){

socketChannel.close();

selector.close();

System.out.println("See you,客户端退出系统了");

System.exit(0);

}

}

// select()进行IO事件选择操作

intnSelectedKeys = selector.select();

if(nSelectedKeys >0){

for(SelectionKey key : selector.selectedKeys()){

/**

*判断检测到的Channel是不是可连接的,将对应的Channel注册到选择器上面,指定关心的事件类型为OP_READ

*/

if(key.isConnectable()){

SocketChannel connChannel = (SocketChannel) key.channel();

connChannel.configureBlocking(false);

connChannel.register(selector,SelectionKey.OP_READ);

connChannel.finishConnect();

}

/**

*若检测到的IO事件是读事件,则处理相关数据的读相关的业务逻辑

*/

else if(key.isReadable()){

SocketChannel readChannel = (SocketChannel) key.channel();

StringBuilder sb =newStringBuilder();

ByteBuffer buffer = ByteBuffer.allocate(1024);

intreadBytes =0;

intret =0;

/**

*注意对ByteBuffer的读操作,需要关心的是flip,clear操作等等

*/

while((ret = readChannel.read(buffer)) >0){

readBytes += ret;

buffer.flip();

sb.append(Charset.forName("UTF-8").decode(buffer).toString());

buffer.clear();

}

String result = sb.toString();

System.out.println("Message from server : "+ result);

if(readBytes ==0){

System.out.println("handle opposite close Exception");

readChannel.close();

}

}

}

/**

*一次监听的事件处理完之后,需要将已经记录的事件清除掉,准备下一轮的事件标记

*/

selector.selectedKeys().clear();

}else{

System.out.println("handle select timeout Exception");

socketChannel.close();

}

}

}

public static voidmain(String[] args){

try{

Scanner sc =newScanner(System.in);

String message = sc.nextLine();

start(message);

}catch(Exception e) {

e.printStackTrace();

}

}

}

上一篇下一篇

猜你喜欢

热点阅读