Android开发经验谈Android技术知识Android进阶之路

Android SocketChannel使用

2019-04-18  本文已影响6人  寒江楓雨

一 概述:

是一种面向流连接只sockets套接字的可选择通道。是基于TCP连接传输,主要用来处理网络I/O的通道,实现了可选择通道,可以被多路复用。

二 特征:

1 对于已经存在的socket不能创建SocketChannel

2 SocketChannel中提供的open接口创建的Channel并没有进行网络级联,需要使用connect接口连接到指定地址

3 未进行连接的SocketChannle执行I/O操作时,会抛出NotYetConnectedException

4 SocketChannel支持两种I/O模式:阻塞式和非阻塞式

5 SocketChannel支持异步关闭。如果SocketChannel在一个线程上read阻塞,另一个线程对该SocketChannel调用shutdownInput,则读阻塞的线程将返回-1表示没有读取任何数据;如果SocketChannel在一个线程上write阻塞,另一个线程对该SocketChannel调用shutdownWrite,则写阻塞的线程将抛出AsynchronousCloseException

三 SocketChannel的使用:

(1)创建

  Selector mSelector = Selector.open();

  InetSocketAddress inetSocketAddress =new InetSocketAddress(mRemoteIp, Constants.TCP_PORT);

  SocketChannel socketChannel= SocketChannel.open(inetSocketAddress);

 socketChannel.configureBlocking(false);

 socketChannel.register(mSelector, SelectionKey.OP_READ);

(2) 连接校验

socketChannel.isOpen();// 测试SocketChannel是否为open状态socketChannel.isConnected();//测试SocketChannel是否已经被连接socketChannel.isConnectionPending();//测试SocketChannel是否正在进行连接socketChannel.finishConnect();//校验正在进行套接字连接的SocketChannel是否已经完成连接

(3) 读写模式

前面提到SocketChannel支持阻塞和非阻塞两种模式:

socketChannel.configureBlocking(false);

主要是通过以上方法设置SocketChannel的读写模式。false表示非阻塞,true表示阻塞。

(4) 读写

SocketChannel socketChannel = SocketChannel.open(newInetSocketAddress("www.baidu.com",8080));
ByteBuffer byteBuffer = ByteBuffer.allocate(16);
socketChannel.read(byteBuffer);
socketChannel.close();
System.out.println("test end!");

以上为阻塞式读,当执行到read出,线程将阻塞,控制台将无法打印test end!。

SocketChannel socketChannel =SocketChannel.open(newInetSocketAddress("www.baidu.com",8080));socketChannel.configureBlocking(false);
ByteBuffer byteBuffer = ByteBuffer.allocate(16);
socketChannel.read(byteBuffer);
socketChannel.close();
System.out.println("test end!");

以上为非阻塞读,控制台将打印test end!。

读写都是面向缓冲区,这个读写方式与前文中的FileChannel一样,这里不再赘述。

(5) 设置和获取参数

socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,Boolean.TRUE).setOption(StandardSocketOptions.TCP_NODELAY,Boolean.TRUE);

通过setOptions方法可以设置socket套接字的相关参数。

socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE)socketChannel.getOption(StandardSocketOptions.SO_RCVBUF)

可以通过getOption获取相关参数的值。如默认的接收缓冲区大小是8192byte。

四 一个完整的例子

package com.zongmu.rpa.probes;

import android.text.TextUtils;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.zongmu.rpa.constant.Constants;
import com.zongmu.rpa.model.ProtoPackage;
import com.zongmu.rpa.probes.callback.TcpConnCallback;
import com.zongmu.rpa.utils.ByteUtils;
import com.zongmu.rpa.utils.LogUtil;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class TcpProbes implements ZmProbes {
    private static String TAG = "TcpProbes";
    private Selector mSelector;
    private SocketChannel mSocketChannel;
    private String mRemoteIp;
    private int mRecvLength = -1;
    private TcpConnCallback mCallback;
    private ProtoPackage mProtoPackage;

    public void setTcpConnCallback(TcpConnCallback callback){
        mCallback = callback;
    }

    public Selector getSelector() {
        return mSelector;
    }

    public String getRemoteIp() {
        return mRemoteIp;
    }

    public void setRemoteIp(String ip) {
        this.mRemoteIp = ip;
    }

    @Override
    public void sendData(final byte[] pkg) {
        if (mSocketChannel.isOpen() && mSocketChannel.isConnected()) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        mSocketChannel.write(ByteBuffer.wrap(pkg));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }

    @Override
    public int receData(Message responsePrototype,RpcCallback<Message>done) {
        if (mSocketChannel == null || !mSocketChannel.isOpen()|| !mSocketChannel.isConnected()){
            return Constants.SOCKET_NOT_CONNECT;
        }
        int ret = -1;
        try {
            ret = mSelector.select(3000);
        } catch (IOException e) {
            e.printStackTrace();
        }
        if(ret < 0){
            return Constants.RECEIVE_DATA_TIMEOUT;
        }
        byte[] recvData = new byte[512];
        Iterator<SelectionKey> iterator = mSelector.selectedKeys().iterator();
        while(iterator.hasNext()) {
            SelectionKey key = iterator.next();
            if(key.isValid()) {
                if(key.isReadable()) {
                    LogUtil.e(TAG, "receive data");
                    try {
                        mRecvLength = mSocketChannel.read(ByteBuffer.wrap(recvData));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    LogUtil.e(TAG, "Received data size: " + mRecvLength);
                }
            }else {
                LogUtil.e(TAG, "wait server response timeout");
                mRecvLength = -1;
                recvData = null;
            }
            if (!iterator.hasNext()) {
                break;
            }
        }
        
        if(mProtoPackage == null){
            mProtoPackage = new ProtoPackage();
        }
        String raw_data = new String(recvData, 0, mRecvLength);
        LogUtil.e(TAG, "RawPackage:" + raw_data);
        LogUtil.e(TAG, "RawPackage:" + ByteUtils.bytes2HexString(ByteUtils.getSubArrays(recvData,0,mRecvLength)));
        String data = new String(recvData, 0, 13);
        LogUtil.e(TAG, "data:"+data);
        if (TextUtils.equals("ZongMuService", data)) {
            LogUtil.e(TAG, "receive server response");
            mProtoPackage.setPackageBuffer(recvData);
            mProtoPackage.parsePackage();
            
            if(mRecvLength != mProtoPackage.getPackageSize()){
                LogUtil.e(TAG,"data length parse Error");
                recvData = null;
                mRecvLength = -1;
                return -3;
            }

            byte[] serviceParamPackage = mProtoPackage.getParamPackage();
            try {
                responsePrototype = responsePrototype.getParserForType().parseFrom(serviceParamPackage);
                done.run(responsePrototype);
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
            recvData = null;
            mRecvLength = -1;
            LogUtil.d(TAG,"callback finish");
        }else {
            LogUtil.d(TAG,"receive error");
            recvData = null;
            mRecvLength = -1;
            return Constants.RECEIVE_DATA_ERROR;
        }
        return Constants.RECEIVE_DATA_SUCCESS;
    }

    @Override
    public void start() {
        if(TextUtils.isEmpty(mRemoteIp)){
            LogUtil.i( TAG,"remote ip is null");
            return;
        }
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    mSelector = Selector.open();
                    InetSocketAddress inetSocketAddress = new InetSocketAddress(mRemoteIp, Constants.TCP_PORT);
                    mSocketChannel = SocketChannel.open(inetSocketAddress);
                    mSocketChannel.configureBlocking(false);
                    mSocketChannel.register(mSelector, SelectionKey.OP_READ);
                    LogUtil.e(TAG, "tcp socket connect success");
                    mCallback.onTcpConnect(Constants.SOCKET_CONN_SUCCESS);
                } catch (IOException e) {
                    e.printStackTrace();
                    LogUtil.e(TAG, "tcp socket connect failed");
                    mCallback.onTcpConnect(Constants.SOCKET_CONN_FAILED);
                }
            }
        }).start();
    }

    @Override
    public void stop() {
        try {
            if (mSocketChannel != null && mSocketChannel.isConnected()) {
                mSocketChannel.finishConnect();
                mSelector.close();
                mSocketChannel.close();
                LogUtil.e( TAG,"tcp socket closed");
            }
        } catch (IOException e) {
            e.printStackTrace();
            LogUtil.e( TAG,"tcp socket closed:"+e.toString());
        }
    }

}

上一篇 下一篇

猜你喜欢

热点阅读