socket的正确使用方式
Android 通过Socket 和服务器通讯,是一种比较常用的通讯方式,时间比较紧,说下大致的思路,希望能帮到使用socket 进行通信的人
(1)开启一个线程发送消息SocketOutputThread
消息是放在队列里的,当有消息后,进入队列,线程唤醒,发送消息,并反馈发送是否成功的回调
(2)开启一个线程接受服务器消息SocketInputThread
为了防止一直收数据,浪费电池的电,采用NIO的方式读socket的数据,这个是本文的关键
(3)开启一个线程,做心跳,防止socket连接终断 ,SocketHeartThread
(4)构建 SocketThreadManager对以上三个thread进行管理
(5)构建 TCPClient 发送socket消息
在NIO的方式实现TCP,特别是在接收服务器的数
据,不用写个线程定时去读了。
TCPClient ,采用NIO的方式构建
package com.example.socketblockdemo;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;/**
* NIO TCP 客户端
**/publicclassTCPClient
{//信道选择器privateSelector selector;//与服务器通信的信道SocketChannel socketChannel;//要连接的服务器Ip地址privateString hostIp;//要连接的远程服务器在监听的端口privateinthostListenningPort;privatestaticTCPClient s_Tcp =null;publicboolean isInitialized =false;publicstaticsynchronized TCPClient instance()
{if(s_Tcp ==null)
{
s_Tcp=newTCPClient(Const.SOCKET_SERVER,
Const.SOCKET_PORT);
}returns_Tcp;
}/**
* 构造函数
*
* @param HostIp
* @param HostListenningPort
* @throws IOException*/publicTCPClient(String HostIp,intHostListenningPort)
{this.hostIp =HostIp;this.hostListenningPort =HostListenningPort;try{
initialize();this.isInitialized =true;
}catch(IOException e)
{this.isInitialized =false;//TODO Auto-generated catch blocke.printStackTrace();
}catch(Exception e)
{this.isInitialized =false;
e.printStackTrace();
}
}/**
* 初始化
*
* @throws IOException*/publicvoidinitialize() throws IOException
{
boolean done=false;try{//打开监听信道并设置为非阻塞模式socketChannel = SocketChannel.open(newInetSocketAddress(hostIp,
hostListenningPort));if(socketChannel !=null)
{
socketChannel.socket().setTcpNoDelay(false);
socketChannel.socket().setKeepAlive(true);//设置 读socket的timeout时间socketChannel.socket().setSoTimeout(
Const.SOCKET_READ_TIMOUT);
socketChannel.configureBlocking(false);//打开并注册选择器到信道selector =Selector.open();if(selector !=null)
{
socketChannel.register(selector, SelectionKey.OP_READ);
done=true;
}
}
}finally{if(!done && selector !=null)
{
selector.close();
}if(!done)
{
socketChannel.close();
}
}
}staticvoidblockUntil(SelectionKey key,longtimeout) throws IOException
{intnkeys =0;if(timeout >0)
{
nkeys= key.selector().select(timeout);
}elseif(timeout ==0)
{
nkeys=key.selector().selectNow();
}if(nkeys ==0)
{thrownewSocketTimeoutException();
}
}/**
* 发送字符串到服务器
*
* @param message
* @throws IOException*/publicvoidsendMsg(String message) throws IOException
{
ByteBuffer writeBuffer= ByteBuffer.wrap(message.getBytes("utf-8"));if(socketChannel ==null)
{thrownewIOException();
}
socketChannel.write(writeBuffer);
}/**
* 发送数据
*
* @param bytes
* @throws IOException*/publicvoidsendMsg(byte[] bytes) throws IOException
{
ByteBuffer writeBuffer=ByteBuffer.wrap(bytes);if(socketChannel ==null)
{thrownewIOException();
}
socketChannel.write(writeBuffer);
}/**
*
* @return*/publicsynchronized Selector getSelector()
{returnthis.selector;
}/**
* Socket连接是否是正常的
*
* @return*/publicboolean isConnect()
{
boolean isConnect=false;if(this.isInitialized)
{
isConnect=this.socketChannel.isConnected();
}returnisConnect;
}/**
* 关闭socket 重新连接
*
* @return*/publicboolean reConnect()
{
closeTCPSocket();try{
initialize();
isInitialized=true;
}catch(IOException e)
{
isInitialized=false;
e.printStackTrace();
}catch(Exception e)
{
isInitialized=false;
e.printStackTrace();
}returnisInitialized;
}/**
* 服务器是否关闭,通过发送一个socket信息
*
* @return*/publicboolean canConnectToServer()
{try{if(socketChannel !=null)
{
socketChannel.socket().sendUrgentData(0xff);
}
}catch(IOException e)
{//TODO Auto-generated catch blocke.printStackTrace();returnfalse;
}catch(Exception e){
e.printStackTrace();returnfalse;
}returntrue;
}/**
* 关闭socket*/publicvoidcloseTCPSocket()
{try{if(socketChannel !=null)
{
socketChannel.close();
}
}catch(IOException e)
{
}try{if(selector !=null)
{
selector.close();
}
}catch(IOException e)
{
}
}/**
* 每次读完数据后,需要重新注册selector,读取数据*/publicsynchronizedvoidrepareRead()
{if(socketChannel !=null)
{try{
selector=Selector.open();
socketChannel.register(selector, SelectionKey.OP_READ);
}catch(ClosedChannelException e)
{
e.printStackTrace();
}catch(IOException e)
{
e.printStackTrace();
}
}
}
}
如何使用
//发送消息,失败或者成功的handlerSocketThreadManager.sharedInstance().sendMsg(str.getBytes(), handler);
线程管理类
package com.example.socketblockdemo;
import android.os.Handler;
import android.text.TextUtils;publicclassSocketThreadManager
{privatestaticSocketThreadManager s_SocketManager =null;privateSocketInputThread mInputThread =null;privateSocketOutputThread mOutThread =null;privateSocketHeartThread mHeartThread =null;//获取单例publicstaticSocketThreadManager sharedInstance()
{if(s_SocketManager ==null)
{
s_SocketManager=newSocketThreadManager();
s_SocketManager.startThreads();
}returns_SocketManager;
}//单例,不允许在外部构建对象privateSocketThreadManager()
{
mHeartThread=newSocketHeartThread();
mInputThread=newSocketInputThread();
mOutThread=newSocketOutputThread();
}/**
* 启动线程*/privatevoidstartThreads()
{
mHeartThread.start();
mInputThread.start();
mInputThread.setStart(true);
mOutThread.start();
mInputThread.setStart(true);//mDnsthread.start();}/**
* stop线程*/publicvoidstopThreads()
{
mHeartThread.stopThread();
mInputThread.setStart(false);
mOutThread.setStart(false);
}publicstaticvoidreleaseInstance()
{if(s_SocketManager !=null)
{
s_SocketManager.stopThreads();
s_SocketManager=null;
}
}publicvoidsendMsg(byte[] buffer, Handler handler)
{
MsgEntity entity=newMsgEntity(buffer, handler);
mOutThread.addMsgToSendList(entity);
}
}
SocketHeartHread 心跳类
package com.example.socketblockdemo;
import java.io.IOException;
import android.text.TextUtils;classSocketHeartThread extends Thread
{
boolean isStop=false;
boolean mIsConnectSocketSuccess=false;staticSocketHeartThread s_instance;privateTCPClient mTcpClient =null;staticfinal String tag ="SocketHeartThread";publicstaticsynchronized SocketHeartThread instance()
{if(s_instance ==null)
{
s_instance=newSocketHeartThread();
}returns_instance;
}publicSocketHeartThread()
{
TCPClient.instance();//连接服务器//mIsConnectSocketSuccess = connect();}publicvoidstopThread()
{
isStop=true;
}/**
* 连接socket到服务器, 并发送初始化的Socket信息
*
* @return*/privateboolean reConnect()
{returnTCPClient.instance().reConnect();
}publicvoidrun()
{
isStop=false;while(!isStop)
{//发送一个心跳包看服务器是否正常boolean canConnectToServer =TCPClient.instance().canConnectToServer();if(canConnectToServer ==false){
reConnect();
}try{
Thread.sleep(Const.SOCKET_HEART_SECOND*1000);
}catch(InterruptedException e)
{
e.printStackTrace();
}
}
}
}
SocketInputThread
package com.example.socketblockdemo;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import android.content.Intent;
import android.text.TextUtils;/**
* 客户端读消息线程
*
* @author way
**/publicclassSocketInputThread extends Thread
{privateboolean isStart =true;privatestaticString tag ="socket";//private MessageListener messageListener;//消息监听接口对象publicSocketInputThread()
{
}publicvoidsetStart(boolean isStart)
{this.isStart =isStart;
}
@Overridepublicvoidrun()
{while(isStart)
{//手机能联网,读socket数据if(NetManager.instance().isNetworkConnected())
{if(!TCPClient.instance().isConnect())
{
CLog.e(tag,"TCPClient connet server is fail read thread sleep second"+Const.SOCKET_SLEEP_SECOND );try{
sleep(Const.SOCKET_SLEEP_SECOND*1000);
}catch(InterruptedException e)
{//TODO Auto-generated catch blocke.printStackTrace();
}
}
readSocket();//如果连接服务器失败,服务器连接失败,sleep固定的时间,能联网,就不需要sleepCLog.e("socket","TCPClient.instance().isConnect()"+TCPClient.instance().isConnect() );
}
}
}publicvoidreadSocket()
{
Selector selector=TCPClient.instance().getSelector();if(selector ==null)
{return;
}try{//如果没有数据过来,一直柱塞while(selector.select() >0)
{for(SelectionKey sk : selector.selectedKeys())
{//如果该SelectionKey对应的Channel中有可读的数据if(sk.isReadable())
{//使用NIO读取Channel中的数据SocketChannel sc =(SocketChannel) sk.channel();
ByteBuffer buffer= ByteBuffer.allocate(1024);try{
sc.read(buffer);
}catch(IOException e)
{//TODO Auto-generated catch blocke.printStackTrace();//continue;}
buffer.flip();
String receivedString="";//打印收到的数据try{
receivedString= Charset.forName("UTF-8")
.newDecoder().decode(buffer).toString();
CLog.e(tag, receivedString);
Intent i=newIntent(Const.BC);
i.putExtra("response", receivedString);
MainActivity.s_context.sendBroadcast(i );
}catch(CharacterCodingException e)
{//TODO Auto-generated catch blocke.printStackTrace();
}
buffer.clear();
buffer=null;try{//为下一次读取作准备sk.interestOps(SelectionKey.OP_READ);//删除正在处理的SelectionKeyselector.selectedKeys().remove(sk);
}catch(CancelledKeyException e)
{
e.printStackTrace();
}
}
}
}//selector.close();//TCPClient.instance().repareRead();}catch(IOException e1)
{//TODO Auto-generated catch blocke1.printStackTrace();
}catch(ClosedSelectorException e2)
{
}
}
}
SocketOutPutThread 类
package com.example.socketblockdemo;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import android.os.Bundle;
import android.os.Handler;
import android.os.Message;/**
* 客户端写消息线程
*
* @author way
**/publicclassSocketOutputThread extends Thread
{privateboolean isStart =true;privatestaticString tag ="socketOutputThread";privateListsendMsgList;publicSocketOutputThread( )
{
sendMsgList=newCopyOnWriteArrayList();
}publicvoidsetStart(boolean isStart)
{this.isStart =isStart;
synchronized (this)
{
notify();
}
}//使用socket发送消息publicboolean sendMsg(byte[] msg) throws Exception
{if(msg ==null)
{
CLog.e(tag,"sendMsg is null");returnfalse;
}try{
TCPClient.instance().sendMsg(msg);
}catch(Exception e)
{throw(e);
}returntrue;
}//使用socket发送消息publicvoidaddMsgToSendList(MsgEntity msg)
{
synchronized (this)
{this.sendMsgList.add(msg);
notify();
}
}
@Overridepublicvoidrun()
{while(isStart)
{//锁发送listsynchronized (sendMsgList)
{//发送消息for(MsgEntity msg : sendMsgList)
{
Handler handler=msg.getHandler();try{
sendMsg(msg.getBytes());
sendMsgList.remove(msg);//成功消息,通过hander回传if(handler !=null)
{
Message message=newMessage();
message.obj=msg.getBytes();
message.what=1;
handler.sendMessage(message);//handler.sendEmptyMessage(1);}
}catch(Exception e)
{
e.printStackTrace();
CLog.e(tag, e.toString());//错误消息,通过hander回传if(handler !=null)
{
Message message=newMessage();
message.obj=msg.getBytes();
message.what=0;;
handler.sendMessage(message);
}
}
}
}
synchronized (this)
{try{
wait();
}catch(InterruptedException e)
{//TODO Auto-generated catch blocke.printStackTrace();
}//发送完消息后,线程进入等待状态}
}
}
}