Android SocketChannel使用
一 概述:
是一种面向流连接只sockets套接字的可选择通道。是基于TCP连接传输,主要用来处理网络I/O的通道,实现了可选择通道,可以被多路复用。
二 特征:
1 对于已经存在的socket不能创建SocketChannel
2 SocketChannel中提供的open接口创建的Channel并没有进行网络级联,需要使用connect接口连接到指定地址
3 未进行连接的SocketChannle执行I/O操作时,会抛出NotYetConnectedException
4 SocketChannel支持两种I/O模式:阻塞式和非阻塞式
5 SocketChannel支持异步关闭。如果SocketChannel在一个线程上read阻塞,另一个线程对该SocketChannel调用shutdownInput,则读阻塞的线程将返回-1表示没有读取任何数据;如果SocketChannel在一个线程上write阻塞,另一个线程对该SocketChannel调用shutdownWrite,则写阻塞的线程将抛出AsynchronousCloseException
三 SocketChannel的使用:
(1)创建
Selector mSelector = Selector.open();
InetSocketAddress inetSocketAddress =new InetSocketAddress(mRemoteIp, Constants.TCP_PORT);
SocketChannel socketChannel= SocketChannel.open(inetSocketAddress);
socketChannel.configureBlocking(false);
socketChannel.register(mSelector, SelectionKey.OP_READ);
(2) 连接校验
socketChannel.isOpen();// 测试SocketChannel是否为open状态socketChannel.isConnected();//测试SocketChannel是否已经被连接socketChannel.isConnectionPending();//测试SocketChannel是否正在进行连接socketChannel.finishConnect();//校验正在进行套接字连接的SocketChannel是否已经完成连接
(3) 读写模式
前面提到SocketChannel支持阻塞和非阻塞两种模式:
socketChannel.configureBlocking(false);
主要是通过以上方法设置SocketChannel的读写模式。false表示非阻塞,true表示阻塞。
(4) 读写
SocketChannel socketChannel = SocketChannel.open(newInetSocketAddress("www.baidu.com",8080));
ByteBuffer byteBuffer = ByteBuffer.allocate(16);
socketChannel.read(byteBuffer);
socketChannel.close();
System.out.println("test end!");
以上为阻塞式读,当执行到read出,线程将阻塞,控制台将无法打印test end!。
SocketChannel socketChannel =SocketChannel.open(newInetSocketAddress("www.baidu.com",8080));socketChannel.configureBlocking(false);
ByteBuffer byteBuffer = ByteBuffer.allocate(16);
socketChannel.read(byteBuffer);
socketChannel.close();
System.out.println("test end!");
以上为非阻塞读,控制台将打印test end!。
读写都是面向缓冲区,这个读写方式与前文中的FileChannel一样,这里不再赘述。
(5) 设置和获取参数
socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,Boolean.TRUE).setOption(StandardSocketOptions.TCP_NODELAY,Boolean.TRUE);
通过setOptions方法可以设置socket套接字的相关参数。
socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE)socketChannel.getOption(StandardSocketOptions.SO_RCVBUF)
可以通过getOption获取相关参数的值。如默认的接收缓冲区大小是8192byte。
四 一个完整的例子
package com.zongmu.rpa.probes;
import android.text.TextUtils;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.zongmu.rpa.constant.Constants;
import com.zongmu.rpa.model.ProtoPackage;
import com.zongmu.rpa.probes.callback.TcpConnCallback;
import com.zongmu.rpa.utils.ByteUtils;
import com.zongmu.rpa.utils.LogUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class TcpProbes implements ZmProbes {
private static String TAG = "TcpProbes";
private Selector mSelector;
private SocketChannel mSocketChannel;
private String mRemoteIp;
private int mRecvLength = -1;
private TcpConnCallback mCallback;
private ProtoPackage mProtoPackage;
public void setTcpConnCallback(TcpConnCallback callback){
mCallback = callback;
}
public Selector getSelector() {
return mSelector;
}
public String getRemoteIp() {
return mRemoteIp;
}
public void setRemoteIp(String ip) {
this.mRemoteIp = ip;
}
@Override
public void sendData(final byte[] pkg) {
if (mSocketChannel.isOpen() && mSocketChannel.isConnected()) {
new Thread(new Runnable() {
@Override
public void run() {
try {
mSocketChannel.write(ByteBuffer.wrap(pkg));
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}
@Override
public int receData(Message responsePrototype,RpcCallback<Message>done) {
if (mSocketChannel == null || !mSocketChannel.isOpen()|| !mSocketChannel.isConnected()){
return Constants.SOCKET_NOT_CONNECT;
}
int ret = -1;
try {
ret = mSelector.select(3000);
} catch (IOException e) {
e.printStackTrace();
}
if(ret < 0){
return Constants.RECEIVE_DATA_TIMEOUT;
}
byte[] recvData = new byte[512];
Iterator<SelectionKey> iterator = mSelector.selectedKeys().iterator();
while(iterator.hasNext()) {
SelectionKey key = iterator.next();
if(key.isValid()) {
if(key.isReadable()) {
LogUtil.e(TAG, "receive data");
try {
mRecvLength = mSocketChannel.read(ByteBuffer.wrap(recvData));
} catch (IOException e) {
e.printStackTrace();
}
LogUtil.e(TAG, "Received data size: " + mRecvLength);
}
}else {
LogUtil.e(TAG, "wait server response timeout");
mRecvLength = -1;
recvData = null;
}
if (!iterator.hasNext()) {
break;
}
}
if(mProtoPackage == null){
mProtoPackage = new ProtoPackage();
}
String raw_data = new String(recvData, 0, mRecvLength);
LogUtil.e(TAG, "RawPackage:" + raw_data);
LogUtil.e(TAG, "RawPackage:" + ByteUtils.bytes2HexString(ByteUtils.getSubArrays(recvData,0,mRecvLength)));
String data = new String(recvData, 0, 13);
LogUtil.e(TAG, "data:"+data);
if (TextUtils.equals("ZongMuService", data)) {
LogUtil.e(TAG, "receive server response");
mProtoPackage.setPackageBuffer(recvData);
mProtoPackage.parsePackage();
if(mRecvLength != mProtoPackage.getPackageSize()){
LogUtil.e(TAG,"data length parse Error");
recvData = null;
mRecvLength = -1;
return -3;
}
byte[] serviceParamPackage = mProtoPackage.getParamPackage();
try {
responsePrototype = responsePrototype.getParserForType().parseFrom(serviceParamPackage);
done.run(responsePrototype);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
recvData = null;
mRecvLength = -1;
LogUtil.d(TAG,"callback finish");
}else {
LogUtil.d(TAG,"receive error");
recvData = null;
mRecvLength = -1;
return Constants.RECEIVE_DATA_ERROR;
}
return Constants.RECEIVE_DATA_SUCCESS;
}
@Override
public void start() {
if(TextUtils.isEmpty(mRemoteIp)){
LogUtil.i( TAG,"remote ip is null");
return;
}
new Thread(new Runnable() {
@Override
public void run() {
try {
mSelector = Selector.open();
InetSocketAddress inetSocketAddress = new InetSocketAddress(mRemoteIp, Constants.TCP_PORT);
mSocketChannel = SocketChannel.open(inetSocketAddress);
mSocketChannel.configureBlocking(false);
mSocketChannel.register(mSelector, SelectionKey.OP_READ);
LogUtil.e(TAG, "tcp socket connect success");
mCallback.onTcpConnect(Constants.SOCKET_CONN_SUCCESS);
} catch (IOException e) {
e.printStackTrace();
LogUtil.e(TAG, "tcp socket connect failed");
mCallback.onTcpConnect(Constants.SOCKET_CONN_FAILED);
}
}
}).start();
}
@Override
public void stop() {
try {
if (mSocketChannel != null && mSocketChannel.isConnected()) {
mSocketChannel.finishConnect();
mSelector.close();
mSocketChannel.close();
LogUtil.e( TAG,"tcp socket closed");
}
} catch (IOException e) {
e.printStackTrace();
LogUtil.e( TAG,"tcp socket closed:"+e.toString());
}
}
}