Android 开发进阶

在Android项目中原生写游戏APP,使用Java-WebSo

2021-02-01  本文已影响0人  JeffreyWorld

基于自己参与所开发的游戏项目用到的 Java-WebSocket,做一些经验总结。
本文相关基础代码依赖于:
https://github.com/TooTallNate/Java-WebSocket
https://github.com/0xZhangKe/WebSocketDemo
项目里使用 Java-WebSocket + Protocol Buffers,做的基础实现,游戏场景里,当前用户只存在一个游戏房间。没有IM的那种最新会话列表里面,会有多个聊天会话(群聊、单聊),要维护多个聊天室或房间内的场景。

1.关于心跳的维护

每隔15s(相关参数数据以自己项目里的设置来说的,本文后面的数据也一样)向服务器发送一次心跳类型请求。服务器那边也需要知道客户端这边的心跳维护请求,一直没有断开,如果服务器连续30s收不到心跳,认为用户离线。
比如,在房间内游戏里的玩家,就会在UI展现上看到对方离线的标记。

/**
 * 心跳管理工具类
 */
public class HeartTimerUtils {
    private static final HeartTimerUtils ourInstance = new HeartTimerUtils();
    private Timer timer;
    private TimerTask timerTask;
    private HeartTimerListener listener;
    public static HeartTimerUtils getInstance() {
        return ourInstance;
    }

    private HeartTimerUtils() {
    }
    public void startHeartData() {
        stopTimer();
        if(timer == null){
            timer = new Timer();
            timerTask = new TimerTask() {
                @Override
                public void run() {
                    if(listener != null){
                        listener.sendHeartData();
                    }
                }
            };
            timer.schedule(timerTask,1000,15000);//延时1s,每隔15000毫秒执行一次run方法
        }
    }
    public void stopTimer(){
        if (timer != null) {
            timer.cancel();
            timer = null;
        }
        if (timerTask != null) {
            timerTask.cancel();
            timerTask = null;
        }
    }

    public void setListener(HeartTimerListener listener) {
        this.listener = listener;
    }

    public interface HeartTimerListener{
        void sendHeartData();
    }
    public void closeTimer(){
        stopTimer();
        listener = null;
    }

}
/**
 * 已经绑定了 WebSocketService 服务的 Activity
 */
public abstract class WebSocketActivity extends BaseActivity implements HeartTimerUtils.HeartTimerListener {
    ...省略
    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        if (!MainApplication.getInstance().isWebSocketManagerConnected()) {
            MainApplication.getInstance().initWebSocket();
        } else {
            HeartTimerUtils.getInstance().stopTimer();
            onConnectedEvent();
        }
        HeartTimerUtils.getInstance().setListener(this);
        try {
            if (WebSocketHandler.getDefault() != null) {
                WebSocketHandler.getDefault().addListener(socketListener);
            }
        } catch (Exception e) {
        }
    }

    /**
     * 发送自定义类型proto数据(二进制)
     */
    public void sendData(byte[] text) {
        if (WebSocketHandler.getDefault() != null) {
            WebSocketHandler.getDefault().send(text);
        }
    }

    /**
     * WebSocket 连接成功,向服务器发送登录类型事件请求
     */
    public void onConnectedEvent() {
        LoginProtocalRequest request = new LoginProtocalRequest();
        int userId;
        if (!TextUtils.isEmpty(CommonUtil.getUid())) {
            userId = Integer.parseInt(CommonUtil.getUid());
        } else {
            userId = 0;
        }
        LoginRequest req = LoginRequest.newBuilder().setToken(Md5Utils.encrypt(CommonUtil.getAccessToken())).setUserId(userId).setRoomId(roomId).build();
        request.op = ProtocolMap.LOGIN_OP_REQ;
        request.data = req.toByteArray();
        sendData(request.getByteData());
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (WebSocketHandler.getDefault() != null) {
            WebSocketHandler.getDefault().removeListener(socketListener);
        }
    }

    public void destoryWebSocket() {
        HeartTimerUtils.getInstance().stopTimer();
        if (WebSocketHandler.getDefault() != null) {
            WebSocketHandler.getDefault().disConnect();
            WebSocketHandler.getDefault().destroy();
        }
    }

    @Override
    public void sendHeartData() {
        BaseProtocol protocol = new BaseProtocol();
        protocol.op = ProtocolMap.SOCKET_HEART;
        sendData(protocol.getByteData());
    }

    ...省略
}

所用到具体业务的 Activity,只需要继承 WebSocketActivity,去实现自己发消息和收消息的业务。
Websocket本身有封装好 ping/pong 事件来维护心跳,通过发送 ping/pong 来确保连接的可用,客户端发送 ping 帧,服务端响应 pong 帧。只是我们项目里没用这个来维护,自己写的逻辑来维护的。

/**
 * 接收到 Ping 数据
 */
public class PingResponse implements Response<Framedata> {

    private Framedata framedata;

    PingResponse() {
    }

    @Override
    public Framedata getResponseData() {
        return framedata;
    }

    @Override
    public void setResponseData(Framedata responseData) {
        this.framedata = responseData;
    }

    @Override
    public void onResponse(IResponseDispatcher dispatcher, ResponseDelivery delivery) {
        dispatcher.onPing(framedata, delivery);
    }

    @Override
    public void release() {
        framedata = null;
        ResponseFactory.releasePingResponse(this);
    }

    @Override
    public String toString() {
        return String.format("[@PingResponse%s->Framedata:%s]",
                hashCode(),
                framedata == null ?
                        "null" :
                        framedata.toString());
    }
/**
 * 接收到 Pong
 */
public class PongResponse implements Response<Framedata> {

    private Framedata framedata;

    PongResponse() {
    }

    @Override
    public Framedata getResponseData() {
        return framedata;
    }

    @Override
    public void setResponseData(Framedata responseData) {
        this.framedata = responseData;
    }

    @Override
    public void onResponse(IResponseDispatcher dispatcher, ResponseDelivery delivery) {
        dispatcher.onPong(this.framedata, delivery);
    }

    @Override
    public void release() {
        framedata = null;
        ResponseFactory.releasePongResponse(this);
    }

    @Override
    public String toString() {
        return String.format("[@PongResponse%s->Framedata:%s]",
                hashCode(),
                framedata == null ?
                        "null" :
                        framedata.toString());
    }
}
/**
 * 创建 {@link Response} 的工厂类
 */
public class ResponseFactory {

    private static final int POOL_SIZE = 7;
    private static Queue<ErrorResponse> ERROR_RESPONSE_POOL = new ArrayDeque<>(POOL_SIZE);
    private static Queue<TextResponse> TEXT_RESPONSE_POOL = new ArrayDeque<>(POOL_SIZE);
    private static Queue<ByteBufferResponse> BYTE_BUFFER_RESPONSE_POOL = new ArrayDeque<>(POOL_SIZE);
    private static Queue<PingResponse> PING_RESPONSE_POOL = new ArrayDeque<>(POOL_SIZE);
    private static Queue<PongResponse> PONG_RESPONSE_POOL = new ArrayDeque<>(POOL_SIZE);

    public static ErrorResponse createErrorResponse(){
        ErrorResponse response = ERROR_RESPONSE_POOL.poll();
        if(response == null){
            response = new ErrorResponse();
        }
        return response;
    }

    public static Response<String> createTextResponse() {
        Response<String> response = TEXT_RESPONSE_POOL.poll();
        if (response == null) {
            response = new TextResponse();
        }
        return response;
    }

    public static Response<ByteBuffer> createByteBufferResponse() {
        Response<ByteBuffer> response = BYTE_BUFFER_RESPONSE_POOL.poll();
        if (response == null) {
            response = new ByteBufferResponse();
        }
        return response;
    }

    public static Response<Framedata> createPingResponse() {
        Response<Framedata> response = PING_RESPONSE_POOL.poll();
        if (response == null) {
            response = new PingResponse();
        }
        return response;
    }

    public static Response<Framedata> createPongResponse() {
        Response<Framedata> response = PONG_RESPONSE_POOL.poll();
        if (response == null) {
            response = new PongResponse();
        }
        return response;
    }

    static void releaseErrorResponse(ErrorResponse response){
        ERROR_RESPONSE_POOL.offer(response);
    }

    static void releaseTextResponse(TextResponse response) {
        TEXT_RESPONSE_POOL.offer(response);
    }

    static void releaseByteBufferResponse(ByteBufferResponse response) {
        BYTE_BUFFER_RESPONSE_POOL.offer(response);
    }

    static void releasePingResponse(PingResponse response) {
        PING_RESPONSE_POOL.offer(response);
    }

    static void releasePongResponse(PongResponse response) {
        PONG_RESPONSE_POOL.offer(response);
    }

}

2.关于断线重连

断线重连,分为几种情况:
一、客户端处于无网状态或弱网的情况,这时断开了。当客户端监听到网络恢复,重新给 socket 重连上;
二、当客户端遇到弱网的情况,导致 socket 连接断开 onDisconnect ,此时网络是连接的状态。这时客户端发消息和收消息都会失败。客户端重连可能也连接不上,这时需要客户端多次重连网络恢复到正常时,才能连接成功。
三、客户端发送协议消息是,发送失败了 onSendDataError 时,来触发重连操作。

/**
 * 重连接口
 */
public interface ReconnectManager {

    /**
     * 是否正在重连
     */
    boolean reconnecting();

    /**
     * 开始重连
     */
    void startReconnect();

    /**
     * 停止重连
     */
    void stopReconnect();

    /**
     * 连接成功
     */
    void onConnected();

    /**
     * 连接失败
     *
     * @param th 失败原因
     */
    void onConnectError(Throwable th);

    /**
     * 销毁资源
     */
    void destroy();

    /**
     * 连接成功或失败事件
     */
    interface OnConnectListener {
        void onConnected();

        void onDisconnect();
    }
}

/**
 * WebSocket 管理类
 */
public class WebSocketManager {

    private static final String TAG = "WSManager";

    private WebSocketSetting mSetting;

    private WebSocketWrapper mWebSocket;

    /**
     * 注册的监听器集合
     */
    private ResponseDelivery mDelivery;
    private ReconnectManager mReconnectManager;

    private SocketWrapperListener mSocketWrapperListener;
    /**
     * 当前是否已销毁
     */
    private boolean destroyed = false;
    /**
     * 用户调用了 disconnect 方法后为 true
     */
    private boolean disconnect = false;

    private WebSocketEngine mWebSocketEngine;
    private ResponseProcessEngine mResponseProcessEngine;

    WebSocketManager(WebSocketSetting setting,
                     WebSocketEngine webSocketEngine,
                     ResponseProcessEngine responseProcessEngine) {
        this.mSetting = setting;
        this.mWebSocketEngine = webSocketEngine;
        this.mResponseProcessEngine = responseProcessEngine;

        mDelivery = mSetting.getResponseDelivery();
        if (mDelivery == null) {
            mDelivery = new MainThreadResponseDelivery();
        }
        mSocketWrapperListener = getSocketWrapperListener();
        if (mWebSocket == null) {
            mWebSocket = new WebSocketWrapper(this.mSetting, mSocketWrapperListener);
        }
        start();
    }

    /**
     * 启动,调用此方法开始连接
     */
    public WebSocketManager start() {
        if (mWebSocket == null) {
            mWebSocket = new WebSocketWrapper(this.mSetting, mSocketWrapperListener);
        }
        if (mWebSocket.getConnectState() == 0) {
            reconnect();
        }
        return this;
    }

    /**
     * WebSocket 是否已连接
     */
    public boolean isConnect() {
        return mWebSocket != null && mWebSocket.getConnectState() == 2;
    }

    /**
     * 设置重连管理类。
     * 用户可根据需求设置自己的重连管理类,只需要实现接口即可
     */
    public void setReconnectManager(ReconnectManager reconnectManager) {
        this.mReconnectManager = reconnectManager;
    }

    /**
     * 通过 {@link ReconnectManager} 开始重接
     */
    public WebSocketManager reconnect() {
        disconnect = false;
        if (mReconnectManager == null) {
            mReconnectManager = getDefaultReconnectManager();
        }
        if (!mReconnectManager.reconnecting()) {
            mReconnectManager.startReconnect();
        }
        return this;
    }

    /**
     * 使用新的 Setting 重新创建连接,同时会销毁之前的连接
     */
    public WebSocketManager reconnect(WebSocketSetting setting) {
        disconnect = false;
        if (destroyed) {
            LogUtil.e(TAG, "This WebSocketManager is destroyed!");
            return this;
        }
        this.mSetting = setting;
        if (mWebSocket != null) {
            mWebSocket.destroy();
            mWebSocket = null;
        }
        start();
        return this;
    }

    /**
     * 断开连接,断开后可使用 {@link this#reconnect()} 方法重新建立连接
     */
    public WebSocketManager disConnect() {
        disconnect = true;
        if (destroyed) {
            LogUtil.e(TAG, "This WebSocketManager is destroyed!");
            return this;
        }
        if (mWebSocket.getConnectState() != 0) {
            mWebSocketEngine.disConnect(mWebSocket, mSocketWrapperListener);
        }
        return this;
    }

    /**
     * 发送文本数据
     */
    public void send(String text) {
        if (TextUtils.isEmpty(text)) {
            return;
        }
        Request<String> request = RequestFactory.createStringRequest();
        request.setRequestData(text);
        sendRequest(request);
    }

    /**
     * 发送 byte[] 数据
     */
    public void send(byte[] bytes) {
        if (bytes == null || bytes.length == 0) {
            return;
        }
        Request<byte[]> request = RequestFactory.createByteArrayRequest();
        request.setRequestData(bytes);
        sendRequest(request);
    }

    /**
     * 发送 ByteBuffer 数据
     */
    public void send(ByteBuffer byteBuffer) {
        if (byteBuffer == null) {
            return;
        }
        Request<ByteBuffer> request = RequestFactory.createByteBufferRequest();
        request.setRequestData(byteBuffer);
        sendRequest(request);
    }

    /**
     * 发送 Ping
     */
    public void sendPing() {
        sendRequest(RequestFactory.createPingRequest());
    }

    /**
     * 发送 Pong
     */
    public void sendPong() {
        sendRequest(RequestFactory.createPongRequest());
    }

    /**
     * 发送 Pong
     */
    public void sendPong(PingFrame pingFrame) {
        if (pingFrame == null) {
            return;
        }
        Request<PingFrame> request = RequestFactory.createPongRequest();
        request.setRequestData(pingFrame);
        sendRequest(request);
    }

    /**
     * 发送 {@link Framedata}
     */
    public void sendFrame(Framedata framedata) {
        if (framedata == null) {
            return;
        }
        Request<Framedata> request = RequestFactory.createFrameDataRequest();
        request.setRequestData(framedata);
        sendRequest(request);
    }

    /**
     * 发送 {@link Framedata} 集合
     */
    public void sendFrame(Collection<Framedata> frameData) {
        if (frameData == null) {
            return;
        }
        Request<Collection<Framedata>> request = RequestFactory.createCollectionFrameRequest();
        request.setRequestData(frameData);
        sendRequest(request);
    }

    /**
     * 添加一个监听器,使用完成之后需要调用
     * {@link #removeListener(SocketListener)} 方法移除监听器
     */
    public WebSocketManager addListener(SocketListener listener) {
        mDelivery.addListener(listener);
        return this;
    }

    /**
     * 移除一个监听器
     */
    public WebSocketManager removeListener(SocketListener listener) {
        mDelivery.removeListener(listener);
        return this;
    }

    /**
     * 获取配置类,
     * 部分参数支持动态设定。
     */
    public WebSocketSetting getSetting() {
        return mSetting;
    }

    /**
     * 彻底销毁该连接,销毁后改连接完全失效,
     * 请勿使用其他方法。
     */
    public void destroy() {
        destroyed = true;
        if (mWebSocket != null) {
            mWebSocketEngine.destroyWebSocket(mWebSocket);
            mWebSocketEngine = null;
            mWebSocket = null;
        }
        if (mDelivery != null) {
            if (!mDelivery.isEmpty()) {
                mDelivery.clear();
            }
            mDelivery = null;
        }
        if (mReconnectManager != null) {
            if (mReconnectManager.reconnecting()) {
                mReconnectManager.stopReconnect();
            }
            mReconnectManager = null;
        }
        WebSocketHandler.clearDefaultWebSocket();
    }

    /**
     * 重新连接一次,
     * for {@link ReconnectManager}
     */
    void reconnectOnce() {
        if (destroyed) {
            LogUtil.e(TAG, "This WebSocketManager is destroyed!");
            return;
        }
        if (mWebSocket.getConnectState() == 0) {
            mWebSocketEngine.connect(mWebSocket, mSocketWrapperListener);
        } else {
            if (mReconnectManager != null) {
                mReconnectManager.onConnected();
            }
            LogUtil.e(TAG, "WebSocket 已连接,请勿重试。");
        }
    }

    /**
     * 发送数据
     */
    private void sendRequest(Request request) {
        if (destroyed) {
            LogUtil.e(TAG, "This WebSocketManager is destroyed!");
            return;
        }
        LogUtil.d(TAG, "mWebSocketEngine.sendRequest");
        mWebSocketEngine.sendRequest(mWebSocket, request, mSocketWrapperListener);
    }

    /**
     * 获取默认的重连器
     */
    private ReconnectManager getDefaultReconnectManager() {
        return new DefaultReconnectManager(this, new ReconnectManager.OnConnectListener() {
            @Override
            public void onConnected() {
                LogUtil.i(TAG, "重连成功");
     //         mSetting.getResponseDispatcher()
     //                .onConnected(mDelivery);
            }

            @Override
            public void onDisconnect() {
                LogUtil.i(TAG, "重连失败");
                mSetting.getResponseDispatcher()
                        .onDisconnect(mDelivery);
            }
        });
    }

    /**
     * 获取监听器
     */
    private SocketWrapperListener getSocketWrapperListener() {
        return new SocketWrapperListener() {
            @Override
            public void onConnected() {
                if (mReconnectManager != null) {
                    mReconnectManager.onConnected();
                }
                mSetting.getResponseDispatcher()
                        .onConnected(mDelivery);
            }

            @Override
            public void onConnectFailed(Throwable e) {
                //if reconnecting,interrupt this event for ReconnectManager.
                if (mReconnectManager != null &&
                        mReconnectManager.reconnecting()) {
                    mReconnectManager.onConnectError(e);
                }
                mSetting.getResponseDispatcher()
                        .onConnectFailed(e, mDelivery);
            }

            @Override
            public void onDisconnect() {
                LogUtil.i(TAG, "onDisconnect 开始重连");
//                mSetting.getResponseDispatcher()
//                        .onDisconnect(mDelivery);
                if (mReconnectManager != null &&
                        mReconnectManager.reconnecting()) {
                    if (disconnect) {
                        mSetting.getResponseDispatcher()
                                .onDisconnect(mDelivery);
                    } else {
                        mReconnectManager.onConnectError(null);
                    }
                } else {
                    if (!disconnect) {
                        if (mReconnectManager == null) {
                            mReconnectManager = getDefaultReconnectManager();
                        }
                        mReconnectManager.onConnectError(null);
                        mReconnectManager.startReconnect();
                    }
                }
            }
            @Override
            public void onSendDataError(Request request, int type, Throwable tr) {
                ErrorResponse errorResponse = ResponseFactory.createErrorResponse();
                errorResponse.init(request, type, tr);
                if (mSetting.processDataOnBackground()) {
                    mResponseProcessEngine
                            .onSendDataError(errorResponse,
                                    mSetting.getResponseDispatcher(),
                                    mDelivery);
                } else {
                    mSetting.getResponseDispatcher().onSendDataError(errorResponse, mDelivery);
                }
                if (!disconnect && type == ErrorResponse.ERROR_NO_CONNECT) {
                    LogUtil.e(TAG, "数据发送失败,网络未连接,开始重连。。。");
                    reconnect();
                }
            }

            @Override
            public void onMessage(Response message) {
                if (mSetting.processDataOnBackground()) {
                    mResponseProcessEngine
                            .onMessageReceive(message,
                                    mSetting.getResponseDispatcher(),
                                    mDelivery);
                } else {
                    message.onResponse(mSetting.getResponseDispatcher(), mDelivery);
                }
            }
        };
    }
/**
 * 监听网络变化广播,网络变化时自动重连
 */
public class NetworkChangedReceiver extends BroadcastReceiver {

    private static final String TAG = "WSNetworkReceiver";

    public NetworkChangedReceiver() {
    }

    @Override
    public void onReceive(Context context, Intent intent) {
        if (ConnectivityManager.CONNECTIVITY_ACTION.equals(intent.getAction())) {
            ConnectivityManager manager = (ConnectivityManager) context
                    .getSystemService(Context.CONNECTIVITY_SERVICE);
            if (manager == null) return;
            try {
                if (PermissionUtil.checkPermission(context, Manifest.permission.ACCESS_NETWORK_STATE)) {
                    NetworkInfo activeNetwork = manager.getActiveNetworkInfo();
                    if (activeNetwork != null) {
                        if (activeNetwork.isConnected()) {
                            if (activeNetwork.getType() == ConnectivityManager.TYPE_WIFI) {
                                LogUtil.i(TAG, "网络连接发生变化,当前WiFi连接可用,正在尝试重连。");
                            } else if (activeNetwork.getType() == ConnectivityManager.TYPE_MOBILE) {
                                LogUtil.i(TAG, "网络连接发生变化,当前移动连接可用,正在尝试重连。");
                            }
                            if (WebSocketHandler.getDefault() != null) {
                                if (WebSocketHandler.getDefault().getSetting().reconnectWithNetworkChanged()) {
                                    WebSocketHandler.getDefault().reconnect();
                                }
                            }
                            if (!WebSocketHandler.getAllWebSocket().isEmpty()) {
                                Map<String, WebSocketManager> webSocketManagerMap = WebSocketHandler.getAllWebSocket();
                                for (String key : webSocketManagerMap.keySet()) {
                                    WebSocketManager item = webSocketManagerMap.get(key);
                                    if (item != null && item.getSetting().reconnectWithNetworkChanged()) {
                                        item.reconnect();
                                    }
                                }
                            }
                        } else {
                            LogUtil.i(TAG, "当前没有可用网络");
                        }
                    }else{
                        LogUtil.i(TAG, "当前网络断开");
//                        EventBus.getDefault().post(new NetworkChangedReceiverNotify(false));
                    }
                }
            } catch (Exception e) {
                LogUtil.e(TAG, "网络状态获取错误", e);
            }
        }
    }

}

3.关于弱网的环境下的一些情况:

一、发送消息失败或收取消息失败(让用户在ui交互上,自己点击重新发送或者下拉刷新通过http接口拉取最新服务器数据,内部封装好重新连接socket的逻辑)
二、手机有弱网情况,房间内很多用户同时向服务器发送同种类型消息,比如:准备协议。这时,弱网的手机,发送的消息给服务器,服务器可能收到了,但是因为网络环境不稳定,客户端不一定能收到这条对应回来的消息。发送的数据,一定会有与之配对服务器返回的数据。但是在一些特定的场景下,可能服务器回来的消息,客户端这边收到不到。比如:弱网的环境,会出现丢包、丢数据的情况。

4.关于客户端断开 socket 的时机和策略

断开分为几种情况:
一、客户端自己主动离开房间,正常断开;
二、客户端杀掉APP后台进程,这时服务器依赖于心跳,30s没有收到客户端的心跳请求,会把客户端的连接状态断开;

5.关于消息分发器

接受到的数据,需要通过消息分发器来分发。先进入该类中处理,处理完再发送到下游。

public class AppResponseDispatcher extends SimpleDispatcher {

    /**
     * JSON 数据格式错误
     */
    public static final int JSON_ERROR = 11;
    /**
     * code 码错误
     */
    public static final int CODE_ERROR = 12;

    @Override
    public void onMessage(String message, ResponseDelivery delivery) {
        try {
            Gson gson = new Gson();
            BaseProtocol response  = gson.fromJson(message,BaseProtocol.class);
            //op在1000与2000之间的协议,是客户端和服务器之间约定的有效op协议
            if (response.op >= 1000 && response.op < 2000) {
                delivery.onMessage(message, response);
            } else {
                ErrorResponse errorResponse = ResponseFactory.createErrorResponse();
                errorResponse.setErrorCode(CODE_ERROR);
                Response<String> textResponse = ResponseFactory.createTextResponse();
                textResponse.setResponseData(message);
                errorResponse.setResponseData(textResponse);
                errorResponse.setReserved(response);
                onSendDataError(errorResponse, delivery);
            }
        } catch (Exception e) {
            ErrorResponse errorResponse = ResponseFactory.createErrorResponse();
            Response<String> textResponse = ResponseFactory.createTextResponse();
            textResponse.setResponseData(message);
            errorResponse.setResponseData(textResponse);
            errorResponse.setErrorCode(JSON_ERROR);
            errorResponse.setCause(e);
            onSendDataError(errorResponse, delivery);
        }
    }

    /**
     * 统一处理错误信息,
     * 界面上可使用 ErrorResponse#getDescription() 来当做提示语
     */
    @Override
    public void onSendDataError(ErrorResponse error, ResponseDelivery delivery) {
        switch (error.getErrorCode()) {
            case ErrorResponse.ERROR_NO_CONNECT:
                error.setDescription("网络错误");
                break;
            case ErrorResponse.ERROR_UN_INIT:
                error.setDescription("连接未初始化");
                break;
            case ErrorResponse.ERROR_UNKNOWN:
                error.setDescription("未知错误");
                break;
            case JSON_ERROR:
                error.setDescription("数据格式异常");
                break;
            case CODE_ERROR:
                error.setDescription("响应码错误");
                break;
        }
        delivery.onSendDataError(error);
    }
}
/**
 * 一个简单的 WebSocket 消息分发器,实现了 {@link IResponseDispatcher} 接口,
 * 因为 IResponseDispatcher 中的方法比较多,所以在此提供了一个简单版本,
 * 只需要实现其中主要的几个方法即可。
 */
public abstract class SimpleDispatcher implements IResponseDispatcher {

    @Override
    public void onConnected(ResponseDelivery delivery) {
        delivery.onConnected();
    }

    @Override
    public void onConnectFailed(Throwable cause, ResponseDelivery delivery) {
        delivery.onConnectFailed(cause);
    }

    @Override
    public void onDisconnect(ResponseDelivery delivery) {
        if(delivery != null){
            delivery.onDisconnect();
        }
    }

    @Override
    public void onMessage(ByteBuffer byteBuffer, ResponseDelivery delivery) {
        delivery.onMessage(byteBuffer, null);
    }

    @Override
    public void onPing(Framedata framedata, ResponseDelivery delivery) {
        delivery.onPing(framedata);
    }

    @Override
    public void onPong(Framedata framedata, ResponseDelivery delivery) {
        delivery.onPong(framedata);
    }

}
/**
 * 消息发射器接口
 */
public interface ResponseDelivery extends SocketListener {

    void addListener(SocketListener listener);

    void removeListener(SocketListener listener);

    void clear();

    boolean isEmpty();
}

/**
 * WebSocket 监听器
 */
public interface SocketListener {

    /**
     * 连接成功
     */
    void onConnected();

    /**
     * 连接失败
     */
    void onConnectFailed(Throwable e);

    /**
     * 连接断开
     */
    void onDisconnect();

    /**
     * 数据发送失败
     *
     * @param errorResponse 失败响应
     */
    void onSendDataError(ErrorResponse errorResponse);

    /**
     * 接收到文本消息
     *
     * @param message 文本消息
     * @param data    用户可将数据转成对应的泛型类型,可能为空,具体看用户在 {@link com.im.websocket.websocketlib.dispatcher.IResponseDispatcher}
     *                中的实现,默认为空
     * @param <T>     IResponseDispatcher 中转换的泛型类型
     */
    <T> void onMessage(String message, T data);

    /**
     * 接收到二进制消息
     *
     * @param bytes 二进制消息
     * @param data  用户可将数据转成对应的泛型类型,可能为空,具体看用户在 {@link com.im.websocket.websocketlib.dispatcher.IResponseDispatcher}
     *              中的实现,默认为空
     * @param <T>   IResponseDispatcher 中转换的泛型类型
     */
    <T> void onMessage(ByteBuffer bytes, T data);

    /**
     * 接收到 ping
     */
    void onPing(Framedata framedata);

    /**
     * 接收到 pong
     */
    void onPong(Framedata framedata);
}

6.关于用到的相关源码部分

connectBlocking:WebSocketClient 的 connectBlocking() 会多出一个等待操作,会先连接再发送,否则未连接发送会报错。里面有用到 countDownLatch 这个类使一个线程等待其他线程各自执行完毕后再执行。
在 Java 的 原生 Socket 类里:

public class Socket implements java.io.Closeable {

   /**
    * Connects this socket to the server with a specified timeout value.
    * A timeout of zero is interpreted as an infinite timeout. The connection
    * will then block until established or an error occurs.
    *
    * @param   endpoint the {@code SocketAddress}
    * @param   timeout  the timeout value to be used in milliseconds.
    * @throws  IOException if an error occurs during the connection
    * @throws  SocketTimeoutException if timeout expires before connecting
    * @throws  java.nio.channels.IllegalBlockingModeException
    *          if this socket has an associated channel,
    *          and the channel is in non-blocking mode
    * @throws  IllegalArgumentException if endpoint is null or is a
    *          SocketAddress subclass not supported by this socket
    * @since 1.4
    * @spec JSR-51
    */
   public void connect(SocketAddress endpoint, int timeout) throws IOException {
       if (endpoint == null)
           throw new IllegalArgumentException("connect: The address can't be null");

       if (timeout < 0)
         throw new IllegalArgumentException("connect: timeout can't be negative");

       if (isClosed())
           throw new SocketException("Socket is closed");

       if (!oldImpl && isConnected())
           throw new SocketException("already connected");

       if (!(endpoint instanceof InetSocketAddress))
           throw new IllegalArgumentException("Unsupported address type");

       InetSocketAddress epoint = (InetSocketAddress) endpoint;
       InetAddress addr = epoint.getAddress ();
       int port = epoint.getPort();
       checkAddress(addr, "connect");

       SecurityManager security = System.getSecurityManager();
       if (security != null) {
           if (epoint.isUnresolved())
               security.checkConnect(epoint.getHostName(), port);
           else
               security.checkConnect(addr.getHostAddress(), port);
       }
       if (!created)
           createImpl(true);
       if (!oldImpl)
           impl.connect(epoint, timeout);
       else if (timeout == 0) {
           if (epoint.isUnresolved())
               impl.connect(addr.getHostName(), port);
           else
               impl.connect(addr, port);
       } else
           throw new UnsupportedOperationException("SocketImpl.connect(addr, timeout)");
       connected = true;
       /*
        * If the socket was not bound before the connect, it is now because
        * the kernel will have picked an ephemeral port & a local address
        */
       bound = true;
   }
}

关于 WebSocketImpl 里的BlockingQueue。BlockingQueue即阻塞队列,它是基于ReentrantLock。

在Java中,BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的。


/**
 *  表示单个WebSocketImpl连接的一端(客户端或服务器)。处理“握手”阶段,然后允许通过基于 
 *  事件的模型,轻松发送文本框架和接收框架。
 */
public class WebSocketImpl implements WebSocket {
    /**
     * Queue of buffers that need to be sent to the client.
     */
    public final BlockingQueue<ByteBuffer> outQueue;
    /**
     * Queue of buffers that need to be processed
     */
    public final BlockingQueue<ByteBuffer> inQueue;  

    /**
     * creates a websocket with client role
     *
     * @param listener The listener for this instance
     * @param draft    The draft which should be used
     */
    public WebSocketImpl( WebSocketListener listener, Draft draft ) {
        ...
        this.outQueue = new LinkedBlockingQueue<ByteBuffer>();
        inQueue = new LinkedBlockingQueue<ByteBuffer>();
        ...
    }
}
LinkedBlockingQueue是一个基于链表实现的可选容量的阻塞队列。队头的元素是插入时间最长的,队尾的元素是最新插入的。新的元素将会被插入到队列的尾部。 LinkedBlockingQueue的容量限制是可选的,如果在初始化时没有指定容量,那么默认使用int的最大值作为队列容量。LinkedBlockingQueue内部是使用链表实现一个队列的,但是却有别于一般的队列,在于该队列至少有一个节点,头节点不含有元素。结构图如下:

LinkedBlockingQueue中维持两把锁,一把锁用于入队,一把锁用于出队,这也就意味着,同一时刻,只能有一个线程执行入队,其余执行入队的线程将会被阻塞;同时,可以有另一个线程执行出队,其余执行出队的线程将会被阻塞。换句话说,虽然入队和出队两个操作同时均只能有一个线程操作,但是可以一个入队线程和一个出队线程共同执行,也就意味着可能同时有两个线程在操作队列,那么为了维持线程安全,LinkedBlockingQueue使用一个AtomicInterger类型的变量表示当前队列中含有的元素个数,所以可以确保两个线程之间操作底层队列是线程安全的。

关于握手协议

在 WebSocketClient 类下:

    /**
     * 创建握手并将其发送到另一个端点
     * @throws InvalidHandshakeException  a invalid handshake was created
     */
    private void sendHandshake() throws InvalidHandshakeException {
        String path;
        String part1 = uri.getRawPath();
        String part2 = uri.getRawQuery();
        if( part1 == null || part1.length() == 0 )
            path = "/";
        else
            path = part1;
        if( part2 != null )
            path += '?' + part2;
        int port = getPort();
        String host = uri.getHost() + ( 
            (port != WebSocketImpl.DEFAULT_PORT && port != WebSocketImpl.DEFAULT_WSS_PORT)
            ? ":" + port 
            : "" );

        HandshakeImpl1Client handshake = new HandshakeImpl1Client();
        handshake.setResourceDescriptor( path );
        handshake.put( "Host", host );
        if( headers != null ) {
            for( Map.Entry<String,String> kv : headers.entrySet() ) {
                handshake.put( kv.getKey(), kv.getValue() );
            }
        }
        engine.startHandshake( handshake );
    }
/**
 * 由WebSocketClient和WebSocketServer实现。 WebSocket调用其中的方法。几乎每个方法都采 
 * 用第一个参数conn,该参数代表相应事件的来源。
 */
public interface WebSocketListener {

    /**
     * Called on the server side when the socket connection is first established, and the WebSocket
     * handshake has been received. This method allows to deny connections based on the received handshake.<br>
     * By default this method only requires protocol compliance.
     * 
     * @param conn
     *            The WebSocket related to this event
     * @param draft
     *            The protocol draft the client uses to connect
     * @param request
     *            The opening http message send by the client. Can be used to access additional fields like cookies.
     * @return Returns an incomplete handshake containing all optional fields
     * @throws InvalidDataException
     *             Throwing this exception will cause this handshake to be rejected
     */
    ServerHandshakeBuilder onWebsocketHandshakeReceivedAsServer( WebSocket conn, Draft draft, ClientHandshake request ) throws InvalidDataException;

    /**
     * Called on the client side when the socket connection is first established, and the WebSocketImpl
     * handshake response has been received.
     * 
     * @param conn
     *            The WebSocket related to this event
     * @param request
     *            The handshake initially send out to the server by this websocket.
     * @param response
     *            The handshake the server sent in response to the request.
     * @throws InvalidDataException
     *             Allows the client to reject the connection with the server in respect of its handshake response.
     */
    void onWebsocketHandshakeReceivedAsClient( WebSocket conn, ClientHandshake request, ServerHandshake response ) throws InvalidDataException;

    /**
     * Called on the client side when the socket connection is first established, and the WebSocketImpl
     * handshake has just been sent.
     * 
     * @param conn
     *            The WebSocket related to this event
     * @param request
     *            The handshake sent to the server by this websocket
     * @throws InvalidDataException
     *             Allows the client to stop the connection from progressing
     */
    void onWebsocketHandshakeSentAsClient( WebSocket conn, ClientHandshake request ) throws InvalidDataException;

    /**
     * Called when an entire text frame has been received. Do whatever you want
     * here...
     * 
     * @param conn
     *            The <tt>WebSocket</tt> instance this event is occurring on.
     * @param message
     *            The UTF-8 decoded message that was received.
     */
    void onWebsocketMessage( WebSocket conn, String message );

    /**
     * Called when an entire binary frame has been received. Do whatever you want
     * here...
     * 
     * @param conn
     *            The <tt>WebSocket</tt> instance this event is occurring on.
     * @param blob
     *            The binary message that was received.
     */
    void onWebsocketMessage( WebSocket conn, ByteBuffer blob );

    /**
     * Called after <var>onHandshakeReceived</var> returns <var>true</var>.
     * Indicates that a complete WebSocket connection has been established,
     * and we are ready to send/receive data.
     * 
     * @param conn The <tt>WebSocket</tt> instance this event is occuring on.
     * @param d The handshake of the websocket instance
     */
    void onWebsocketOpen( WebSocket conn, Handshakedata d );

    /**
     * Called after <tt>WebSocket#close</tt> is explicity called, or when the
     * other end of the WebSocket connection is closed.
     * 
     * @param ws The <tt>WebSocket</tt> instance this event is occuring on.
     * @param code The codes can be looked up here: {@link CloseFrame}
     * @param reason Additional information string
     * @param remote Returns whether or not the closing of the connection was initiated by the remote host.
     */
    void onWebsocketClose( WebSocket ws, int code, String reason, boolean remote );

    /** Called as soon as no further frames are accepted
     *
     * @param ws The <tt>WebSocket</tt> instance this event is occuring on.
     * @param code The codes can be looked up here: {@link CloseFrame}
     * @param reason Additional information string
     * @param remote Returns whether or not the closing of the connection was initiated by the remote host.
     */
    void onWebsocketClosing( WebSocket ws, int code, String reason, boolean remote );

    /** send when this peer sends a close handshake
     *
     * @param ws The <tt>WebSocket</tt> instance this event is occuring on.
     * @param code The codes can be looked up here: {@link CloseFrame}
     * @param reason Additional information string
     */
    void onWebsocketCloseInitiated( WebSocket ws, int code, String reason );

    /**
     * Called if an exception worth noting occurred.
     * If an error causes the connection to fail onClose will be called additionally afterwards.
     *
     * @param conn The <tt>WebSocket</tt> instance this event is occuring on.
     * @param ex
     *            The exception that occurred. <br>
     *            Might be null if the exception is not related to any specific connection. For example if the server port could not be bound.
     */
    void onWebsocketError( WebSocket conn, Exception ex );

    /**
     * Called a ping frame has been received.
     * This method must send a corresponding pong by itself.
     *
     * @param conn The <tt>WebSocket</tt> instance this event is occuring on.
     * @param f The ping frame. Control frames may contain payload.
     */
    void onWebsocketPing( WebSocket conn, Framedata f );

    /**
     * Called when a pong frame is received.
     *
     * @param conn The <tt>WebSocket</tt> instance this event is occuring on.
     * @param f The pong frame. Control frames may contain payload.
     **/
    void onWebsocketPong( WebSocket conn, Framedata f );

    /** This method is used to inform the selector thread that there is data queued to be written to the socket.
     * @param conn The <tt>WebSocket</tt> instance this event is occuring on.
     */
    void onWriteDemand( WebSocket conn );

    /**
     * @see  WebSocket#getLocalSocketAddress()
     *
     * @param conn The <tt>WebSocket</tt> instance this event is occuring on.
     * @return Returns the address of the endpoint this socket is bound to.
     */
    InetSocketAddress getLocalSocketAddress( WebSocket conn );

    /**
     * @see  WebSocket#getRemoteSocketAddress()
     *
     * @param conn The <tt>WebSocket</tt> instance this event is occuring on.
     * @return Returns the address of the endpoint this socket is connected to, or{@code null} if it is unconnected.
     */
    InetSocketAddress getRemoteSocketAddress( WebSocket conn );
}

WebSocketHandler 是个很重要的概念,我们无论是 WebSocket 的初始化、创建连接、断开连接、数据收发等等都要使用它来实现。

/**
 * WebSocket 用户控制句柄
 */
public class WebSocketHandler {

    private final static String TAG = "WebSocketHandler";

    /**
     * 消息发送引擎
     */
    private static WebSocketEngine webSocketEngine;
    /**
     * 消息处理引擎
     */
    private static ResponseProcessEngine responseProcessEngine;
    /**
     * 默认的 WebSocket 连接
     */
    private static WebSocketManager defaultWebSocket;

    /**
     * 对 {@link #mWebSocketMap} 操作时的锁
     */
    private static final Object WS_MAP_BLOCK = new HashMap<>();

    /**
     * 通过 Map 存储 WSM 对象,以此支持多个连接
     */
    private static Map<String, WebSocketManager> mWebSocketMap;

    private static Logable mLog;

    /**
     * 初始化默认的 WebSocket 连接
     *
     * @param setting 该连接的相关设置参数
     */
    public static WebSocketManager init(WebSocketSetting setting) {
        defaultWebSocket = null;
        synchronized (WebSocketHandler.class) {
            if (webSocketEngine == null) {
                webSocketEngine = new WebSocketEngine();
            }
            if (responseProcessEngine == null) {
                responseProcessEngine = new ResponseProcessEngine();
            }
            if (defaultWebSocket == null) {
                defaultWebSocket = new WebSocketManager(setting,
                        webSocketEngine,
                        responseProcessEngine);
            }
        }

        return defaultWebSocket;
    }

    public static void clearDefaultWebSocket(){
        defaultWebSocket = null;
    }

    /**
     * 通过唯一标识符新建一个 WebSocket 连接
     *
     * @param key     该 WebSocketManager 的唯一标识符,
     *                后面需要通过这个 key 来获取到对应的 WebSocketManager
     * @param setting 该连接的相关设置参数
     */
    public static WebSocketManager initGeneralWebSocket(String key, WebSocketSetting setting) {
        checkEngineNullAndInit();
        checkWebSocketMapNullAndInit();
        synchronized (WS_MAP_BLOCK) {
            if (mWebSocketMap.containsKey(key)) {
                LogUtil.e(TAG, "WebSocketManager exists!do not start again!");
                return mWebSocketMap.get(key);
            } else {
                WebSocketManager wsm = new WebSocketManager(setting,
                        webSocketEngine,
                        responseProcessEngine);
                mWebSocketMap.put(key, wsm);
                return wsm;
            }
        }
    }

    /**
     * 获取默认的 WebSocket 连接,
     * 调用此方法之前需要先调用 {@link #init(WebSocketSetting)} 方法初始化
     *
     * @return 返回一个 {@link WebSocketManager} 实例
     */
    public static WebSocketManager getDefault() {
        return defaultWebSocket;
    }

    /**
     * 获取 WebSocketManager 对象
     *
     * @param key 该 WebSocketManager 的 key
     * @return 可能为空,代表该 WebSocketManager 对象不存在或已移除
     */
    public static WebSocketManager getWebSocket(String key) {
        checkWebSocketMapNullAndInit();
        if (mWebSocketMap.containsKey(key)) {
            return mWebSocketMap.get(key);
        } else {
            return null;
        }
    }

    /**
     * 获取所有 WebSocketManager(defaultWebSocketManager 除外)
     */
    public static Map<String, WebSocketManager> getAllWebSocket() {
        checkWebSocketMapNullAndInit();
        return mWebSocketMap;
    }

    /**
     * 移除一个 WebSocketManager 对象
     *
     * @param key 该 WebSocketManager 的 key
     * @return 返回移除的 WebSocketManager,可能为空
     */
    public static WebSocketManager removeWebSocket(String key) {
        checkWebSocketMapNullAndInit();
        if (mWebSocketMap.containsKey(key)) {
            WebSocketManager removed = mWebSocketMap.get(key);
            synchronized (WS_MAP_BLOCK) {
                mWebSocketMap.remove(key);
            }
            return removed;
        } else {
            return null;
        }
    }

    /**
     * 注册网络变化监听广播,网络由不可用变为可用时会重新连接 WebSocket
     *
     * @param context 此处应该使用 ApplicationContext,避免内存泄漏以及其它异常。
     */
    public static void registerNetworkChangedReceiver(Context context) {
        if (PermissionUtil.checkPermission(context, Manifest.permission.ACCESS_NETWORK_STATE)) {
            try {
                IntentFilter filter = new IntentFilter();
                filter.addAction(ConnectivityManager.CONNECTIVITY_ACTION);
                context.registerReceiver(new NetworkChangedReceiver(), filter);
            } catch (Exception e) {
                LogUtil.e(TAG, "网络监听广播注册失败:", e);
            }
        } else {
            LogUtil.e(TAG, "未获取到网络状态权限,广播监听器无法注册");
        }
    }

    /**
     * 初始化引擎
     */
    private static void checkEngineNullAndInit() {
        if (webSocketEngine == null || responseProcessEngine == null) {
            synchronized (WebSocketHandler.class) {
                if (webSocketEngine == null) {
                    webSocketEngine = new WebSocketEngine();
                }
                if (responseProcessEngine == null) {
                    responseProcessEngine = new ResponseProcessEngine();
                }
            }
        }
    }

    /**
     * 初始化 mWebSocketMap
     */
    private static void checkWebSocketMapNullAndInit() {
        if (mWebSocketMap == null) {
            synchronized (WS_MAP_BLOCK) {
                if (mWebSocketMap == null) {
                    mWebSocketMap = new HashMap<>();
                }
            }
        }
    }

    /**
     * 设置打印日志实现类,设置完成后内部运行日志会通过设置的实现类打印。
     * 需实现 {@link Logable} 接口
     */
    public static void setLogable(Logable logable) {
        mLog = logable;
    }

    public static Logable getLogable() {
        if (mLog == null) {
            mLog = new LogImpl();
        }
        return mLog;
    }

}
上一篇下一篇

猜你喜欢

热点阅读