socket的正确使用方式

2016-08-07  本文已影响0人  0dce86ba3565

Android 通过Socket 和服务器通讯,是一种比较常用的通讯方式,时间比较紧,说下大致的思路,希望能帮到使用socket 进行通信的人

(1)开启一个线程发送消息SocketOutputThread

消息是放在队列里的,当有消息后,进入队列,线程唤醒,发送消息,并反馈发送是否成功的回调

(2)开启一个线程接受服务器消息SocketInputThread

为了防止一直收数据,浪费电池的电,采用NIO的方式读socket的数据,这个是本文的关键

(3)开启一个线程,做心跳,防止socket连接终断 ,SocketHeartThread

(4)构建 SocketThreadManager对以上三个thread进行管理

(5)构建 TCPClient 发送socket消息

在NIO的方式实现TCP,特别是在接收服务器的数

据,不用写个线程定时去读了。


TCPClient ,采用NIO的方式构建


package com.example.socketblockdemo;

import java.io.IOException;

import java.net.ConnectException;

import java.net.InetSocketAddress;

import java.net.SocketAddress;

import java.net.SocketTimeoutException;

import java.nio.ByteBuffer;

import java.nio.channels.ClosedChannelException;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.SocketChannel;/**

* NIO TCP 客户端

**/publicclassTCPClient

{//信道选择器privateSelector selector;//与服务器通信的信道SocketChannel socketChannel;//要连接的服务器Ip地址privateString hostIp;//要连接的远程服务器在监听的端口privateinthostListenningPort;privatestaticTCPClient s_Tcp =null;publicboolean isInitialized =false;publicstaticsynchronized TCPClient instance()

{if(s_Tcp ==null)

{

s_Tcp=newTCPClient(Const.SOCKET_SERVER,

Const.SOCKET_PORT);

}returns_Tcp;

}/**

* 构造函数

*

* @param HostIp

* @param HostListenningPort

* @throws IOException*/publicTCPClient(String HostIp,intHostListenningPort)

{this.hostIp =HostIp;this.hostListenningPort =HostListenningPort;try{

initialize();this.isInitialized =true;

}catch(IOException e)

{this.isInitialized =false;//TODO Auto-generated catch blocke.printStackTrace();

}catch(Exception e)

{this.isInitialized =false;

e.printStackTrace();

}

}/**

* 初始化

*

* @throws IOException*/publicvoidinitialize() throws IOException

{

boolean done=false;try{//打开监听信道并设置为非阻塞模式socketChannel = SocketChannel.open(newInetSocketAddress(hostIp,

hostListenningPort));if(socketChannel !=null)

{

socketChannel.socket().setTcpNoDelay(false);

socketChannel.socket().setKeepAlive(true);//设置 读socket的timeout时间socketChannel.socket().setSoTimeout(

Const.SOCKET_READ_TIMOUT);

socketChannel.configureBlocking(false);//打开并注册选择器到信道selector =Selector.open();if(selector !=null)

{

socketChannel.register(selector, SelectionKey.OP_READ);

done=true;

}

}

}finally{if(!done && selector !=null)

{

selector.close();

}if(!done)

{

socketChannel.close();

}

}

}staticvoidblockUntil(SelectionKey key,longtimeout) throws IOException

{intnkeys =0;if(timeout >0)

{

nkeys= key.selector().select(timeout);

}elseif(timeout ==0)

{

nkeys=key.selector().selectNow();

}if(nkeys ==0)

{thrownewSocketTimeoutException();

}

}/**

* 发送字符串到服务器

*

* @param message

* @throws IOException*/publicvoidsendMsg(String message) throws IOException

{

ByteBuffer writeBuffer= ByteBuffer.wrap(message.getBytes("utf-8"));if(socketChannel ==null)

{thrownewIOException();

}

socketChannel.write(writeBuffer);

}/**

* 发送数据

*

* @param bytes

* @throws IOException*/publicvoidsendMsg(byte[] bytes) throws IOException

{

ByteBuffer writeBuffer=ByteBuffer.wrap(bytes);if(socketChannel ==null)

{thrownewIOException();

}

socketChannel.write(writeBuffer);

}/**

*

* @return*/publicsynchronized Selector getSelector()

{returnthis.selector;

}/**

* Socket连接是否是正常的

*

* @return*/publicboolean isConnect()

{

boolean isConnect=false;if(this.isInitialized)

{

isConnect=this.socketChannel.isConnected();

}returnisConnect;

}/**

* 关闭socket 重新连接

*

* @return*/publicboolean reConnect()

{

closeTCPSocket();try{

initialize();

isInitialized=true;

}catch(IOException e)

{

isInitialized=false;

e.printStackTrace();

}catch(Exception e)

{

isInitialized=false;

e.printStackTrace();

}returnisInitialized;

}/**

* 服务器是否关闭,通过发送一个socket信息

*

* @return*/publicboolean canConnectToServer()

{try{if(socketChannel !=null)

{

socketChannel.socket().sendUrgentData(0xff);

}

}catch(IOException e)

{//TODO Auto-generated catch blocke.printStackTrace();returnfalse;

}catch(Exception e){

e.printStackTrace();returnfalse;

}returntrue;

}/**

* 关闭socket*/publicvoidcloseTCPSocket()

{try{if(socketChannel !=null)

{

socketChannel.close();

}

}catch(IOException e)

{

}try{if(selector !=null)

{

selector.close();

}

}catch(IOException e)

{

}

}/**

* 每次读完数据后,需要重新注册selector,读取数据*/publicsynchronizedvoidrepareRead()

{if(socketChannel !=null)

{try{

selector=Selector.open();

socketChannel.register(selector, SelectionKey.OP_READ);

}catch(ClosedChannelException e)

{

e.printStackTrace();

}catch(IOException e)

{

e.printStackTrace();

}

}

}

}


如何使用



//发送消息,失败或者成功的handlerSocketThreadManager.sharedInstance().sendMsg(str.getBytes(), handler);


线程管理类


package com.example.socketblockdemo;

import android.os.Handler;

import android.text.TextUtils;publicclassSocketThreadManager

{privatestaticSocketThreadManager s_SocketManager =null;privateSocketInputThread mInputThread =null;privateSocketOutputThread mOutThread =null;privateSocketHeartThread mHeartThread =null;//获取单例publicstaticSocketThreadManager sharedInstance()

{if(s_SocketManager ==null)

{

s_SocketManager=newSocketThreadManager();

s_SocketManager.startThreads();

}returns_SocketManager;

}//单例,不允许在外部构建对象privateSocketThreadManager()

{

mHeartThread=newSocketHeartThread();

mInputThread=newSocketInputThread();

mOutThread=newSocketOutputThread();

}/**

* 启动线程*/privatevoidstartThreads()

{

mHeartThread.start();

mInputThread.start();

mInputThread.setStart(true);

mOutThread.start();

mInputThread.setStart(true);//mDnsthread.start();}/**

* stop线程*/publicvoidstopThreads()

{

mHeartThread.stopThread();

mInputThread.setStart(false);

mOutThread.setStart(false);

}publicstaticvoidreleaseInstance()

{if(s_SocketManager !=null)

{

s_SocketManager.stopThreads();

s_SocketManager=null;

}

}publicvoidsendMsg(byte[] buffer, Handler handler)

{

MsgEntity entity=newMsgEntity(buffer, handler);

mOutThread.addMsgToSendList(entity);

}

}


SocketHeartHread 心跳类


package com.example.socketblockdemo;

import java.io.IOException;

import android.text.TextUtils;classSocketHeartThread extends Thread

{

boolean isStop=false;

boolean mIsConnectSocketSuccess=false;staticSocketHeartThread s_instance;privateTCPClient mTcpClient =null;staticfinal String tag ="SocketHeartThread";publicstaticsynchronized SocketHeartThread instance()

{if(s_instance ==null)

{

s_instance=newSocketHeartThread();

}returns_instance;

}publicSocketHeartThread()

{

TCPClient.instance();//连接服务器//mIsConnectSocketSuccess = connect();}publicvoidstopThread()

{

isStop=true;

}/**

* 连接socket到服务器, 并发送初始化的Socket信息

*

* @return*/privateboolean reConnect()

{returnTCPClient.instance().reConnect();

}publicvoidrun()

{

isStop=false;while(!isStop)

{//发送一个心跳包看服务器是否正常boolean canConnectToServer =TCPClient.instance().canConnectToServer();if(canConnectToServer ==false){

reConnect();

}try{

Thread.sleep(Const.SOCKET_HEART_SECOND*1000);

}catch(InterruptedException e)

{

e.printStackTrace();

}

}

}

}


SocketInputThread


package com.example.socketblockdemo;

import java.io.IOException;

import java.io.UnsupportedEncodingException;

import java.nio.ByteBuffer;

import java.nio.channels.CancelledKeyException;

import java.nio.channels.ClosedSelectorException;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.SocketChannel;

import java.nio.charset.CharacterCodingException;

import java.nio.charset.Charset;

import android.content.Intent;

import android.text.TextUtils;/**

* 客户端读消息线程

*

* @author way

**/publicclassSocketInputThread extends Thread

{privateboolean isStart =true;privatestaticString tag ="socket";//private MessageListener messageListener;//消息监听接口对象publicSocketInputThread()

{

}publicvoidsetStart(boolean isStart)

{this.isStart =isStart;

}

@Overridepublicvoidrun()

{while(isStart)

{//手机能联网,读socket数据if(NetManager.instance().isNetworkConnected())

{if(!TCPClient.instance().isConnect())

{

CLog.e(tag,"TCPClient connet server is fail read thread sleep second"+Const.SOCKET_SLEEP_SECOND );try{

sleep(Const.SOCKET_SLEEP_SECOND*1000);

}catch(InterruptedException e)

{//TODO Auto-generated catch blocke.printStackTrace();

}

}

readSocket();//如果连接服务器失败,服务器连接失败,sleep固定的时间,能联网,就不需要sleepCLog.e("socket","TCPClient.instance().isConnect()"+TCPClient.instance().isConnect() );

}

}

}publicvoidreadSocket()

{

Selector selector=TCPClient.instance().getSelector();if(selector ==null)

{return;

}try{//如果没有数据过来,一直柱塞while(selector.select() >0)

{for(SelectionKey sk : selector.selectedKeys())

{//如果该SelectionKey对应的Channel中有可读的数据if(sk.isReadable())

{//使用NIO读取Channel中的数据SocketChannel sc =(SocketChannel) sk.channel();

ByteBuffer buffer= ByteBuffer.allocate(1024);try{

sc.read(buffer);

}catch(IOException e)

{//TODO Auto-generated catch blocke.printStackTrace();//continue;}

buffer.flip();

String receivedString="";//打印收到的数据try{

receivedString= Charset.forName("UTF-8")

.newDecoder().decode(buffer).toString();

CLog.e(tag, receivedString);

Intent i=newIntent(Const.BC);

i.putExtra("response", receivedString);

MainActivity.s_context.sendBroadcast(i );

}catch(CharacterCodingException e)

{//TODO Auto-generated catch blocke.printStackTrace();

}

buffer.clear();

buffer=null;try{//为下一次读取作准备sk.interestOps(SelectionKey.OP_READ);//删除正在处理的SelectionKeyselector.selectedKeys().remove(sk);

}catch(CancelledKeyException e)

{

e.printStackTrace();

}

}

}

}//selector.close();//TCPClient.instance().repareRead();}catch(IOException e1)

{//TODO Auto-generated catch blocke1.printStackTrace();

}catch(ClosedSelectorException e2)

{

}

}

}


SocketOutPutThread 类


package com.example.socketblockdemo;

import java.io.IOException;

import java.io.ObjectOutputStream;

import java.net.Socket;

import java.util.List;

import java.util.concurrent.CopyOnWriteArrayList;

import android.os.Bundle;

import android.os.Handler;

import android.os.Message;/**

* 客户端写消息线程

*

* @author way

**/publicclassSocketOutputThread extends Thread

{privateboolean isStart =true;privatestaticString tag ="socketOutputThread";privateListsendMsgList;publicSocketOutputThread( )

{

sendMsgList=newCopyOnWriteArrayList();

}publicvoidsetStart(boolean isStart)

{this.isStart =isStart;

synchronized (this)

{

notify();

}

}//使用socket发送消息publicboolean sendMsg(byte[] msg) throws Exception

{if(msg ==null)

{

CLog.e(tag,"sendMsg is null");returnfalse;

}try{

TCPClient.instance().sendMsg(msg);

}catch(Exception e)

{throw(e);

}returntrue;

}//使用socket发送消息publicvoidaddMsgToSendList(MsgEntity msg)

{

synchronized (this)

{this.sendMsgList.add(msg);

notify();

}

}

@Overridepublicvoidrun()

{while(isStart)

{//锁发送listsynchronized (sendMsgList)

{//发送消息for(MsgEntity msg : sendMsgList)

{

Handler handler=msg.getHandler();try{

sendMsg(msg.getBytes());

sendMsgList.remove(msg);//成功消息,通过hander回传if(handler !=null)

{

Message message=newMessage();

message.obj=msg.getBytes();

message.what=1;

handler.sendMessage(message);//handler.sendEmptyMessage(1);}

}catch(Exception e)

{

e.printStackTrace();

CLog.e(tag, e.toString());//错误消息,通过hander回传if(handler !=null)

{

Message message=newMessage();

message.obj=msg.getBytes();

message.what=0;;

handler.sendMessage(message);

}

}

}

}

synchronized (this)

{try{

wait();

}catch(InterruptedException e)

{//TODO Auto-generated catch blocke.printStackTrace();

}//发送完消息后,线程进入等待状态}

}

}

}

上一篇下一篇

猜你喜欢

热点阅读