网络Android-Rxjava&retrofit&dagger设计方案

Okhttp实现WebSocket长连接

2020-02-29  本文已影响0人  唐_夏影

Okhttp实现WebSocket长连接

项目源代码

1.概述


有一些需求是需要建立长连接的,比如零食机付款时就需要建立客户端和服务器端的链接,当付款完成时服务端会向客户端推送消息,结束付款中界面,也有另外一种不通过长连接的实现方式,就是不断循环的向服务器发送接口请求

2.工具


1> 测试接口

WebSocket.org,该网站提供了一个测试接口wss://echo.websocket.org和该接口的简单调试器

测试接口.png

2> 调试工具

如果是其他的WebSocket接口调试,可以使用apizza

调试工具.png

3.实现


1> 配置

添加依赖

implementation 'com.squareup.okhttp3:okhttp:3.8.1'
implementation 'com.squareup.okhttp3:mockwebserver:3.8.1'

添加权限

  <uses-permission android:name="android.permission.INTERNET"/>

2>创建类继承于WebSocketListener

class EchoHeartWebSocketListener : WebSocketListener() {
    //日志TAG
    val TAG: String = "WebSocket"
    
    //连接到长连接地址
    override fun onOpen(webSocket: WebSocket?, response: Response?) {
        super.onOpen(webSocket, response)
        //webSocket.close(1000, "再见");
        Log.d(TAG, "onOpen")
        Log.d(TAG, "发送消息:hello world")
        webSocket?.send("hello world")
        mWebSocket = webSocket
        sendRepeartHeartMessage()//开始发送心跳包
    }

    //接受String数据
    override fun onMessage(webSocket: WebSocket?, text: String?) {
        super.onMessage(webSocket, text)
        Log.d(TAG, "接受到消息:$text")
        mResponsePayLoad = text.toString()
    }

    //接受ByteString数据
    override fun onMessage(webSocket: WebSocket?, bytes: ByteString?) {
        super.onMessage(webSocket, bytes)
        Log.d(TAG, "onMessage byteString:" + bytes!!)
    }

    //长连接关闭中
    override fun onClosed(webSocket: WebSocket?, code: Int, reason: String?) {
        super.onClosed(webSocket, code, reason)
        Log.d(TAG, "onClosing:$code/$reason")
        connect()//重新连接
    }

    //长连接被断开
    override fun onClosing(webSocket: WebSocket?, code: Int, reason: String?) {
        super.onClosing(webSocket, code, reason)
        Log.d(TAG, "onClosed:$code/$reason")
        connect()//重新连接
    }

    //连接失败
    override fun onFailure(webSocket: WebSocket?, t: Throwable?, response: Response?) {
        super.onFailure(webSocket, t, response)
        Log.d(TAG, "onFailure:" + t!!.message)
        connect()//重新连接
    }
}

3 >Actiivty中初始化

/**
 * 简单的WebSocket连接机制
 */
class SimpleActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_simple)
        start.setOnClickListener {
            connect()
        }
    }

    private fun connect() {
        val listener = EchoWebSocketListener()
        val request = Request.Builder()
            .url("ws://echo.websocket.org")
            .build()
        val client = OkHttpClient()
        client.newWebSocket(request, listener)
        client.dispatcher().executorService().shutdown()
    }
}

运行程序观看日志

D/user: onOpen
D/user: 发送消息:hello world
D/user: 接受到消息:hello world

可以看到发送什么就接受到了什么

4.心跳


1> 心跳技术

要保持WebSocket的长时间连接就需要一段时间后向服务器发送一些消息来告诉服务器你还需要保持连接,当长连接意外中断时也可以凭借发送的信息是否正常返回来及时进行长连接重连,这就是心跳包

先写一个延迟执行代码的kotlin扩展方法


/**
 * 写一个kotlin延迟执行扩展函数
 * 延时执行
 */
inline fun delayNotStop(million: Int, crossinline func: () -> Unit) {
    if (million < 0) return
    Observable.just(million)
        .delay(million.toLong(), TimeUnit.MILLISECONDS)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .doOnComplete { func() }
        .subscribe()
}

创建一个WebSocket封装工具类,EchoHeartWebSocketListener接口在后面的代码示例里


object SocketListenerUtil {

    //控制心跳包循环发送的开始和结束
    private var mDisposable: Disposable? = null

    //发送的数据
    private var mPostPayload: String = ""

    //发送后接受的数据
    var mResponsePayLoad: String = ""

    //WebSocket实例对象,建立连接后返回
    var mWebSocket: WebSocket? = null

    //日志TAG
    val TAG: String = "WebSocket"

    /**
     * 建立WebSocket连接
     */
    fun connect() {
        clear()
        //构建Builder()对象
        val listener = EchoHeartWebSocketListener()
        val request = Request.Builder()
            .url("ws://echo.websocket.org")
            .build()
        val client = OkHttpClient()
        client.newWebSocket(request, listener)
    }


    /**
     * 发送心跳包
     */
    fun sendRepeartHeartMessage() {
        //停止发送心跳,重置状态
        if (mDisposable != null) {
            mDisposable?.dispose()
            mDisposable = null
        }
        mDisposable = Observable.interval(0, 3, TimeUnit.SECONDS)
            .subscribe {
                Log.d(TAG,"")
                /**
                 * 如果mPostPayload发送文本和mResponsePayload相应文本不相等,说明8秒之前发送的心跳没有返回,            * 
                 * mResponsePayload仍然是上上次返回的内容,这时候长连接应该就是断了的,开始尝试重新连接
                 * (备用方案,相当于自己实现了判断长连接是否断开的逻辑)
                 */
                if (TextUtils.equals(mPostPayload, mResponsePayLoad)) {
                    mPostPayload = System.currentTimeMillis().toString()
                    //发送的数据为当前时间的时间戳
                    mPostPayload = System.currentTimeMillis().toString()
                    sendMessageDetail(mPostPayload)
                } else {
                    //延迟6000秒后重连
                    delayNotStop(6000) {
                        connect()
                    }
                }
            }
    }

    private fun clear() {
        mPostPayload = ""
        mResponsePayLoad = ""
        if (mDisposable != null) {
            mDisposable?.dispose()
            mDisposable = null
        }
        if (mWebSocket != null) {
            mWebSocket?.cancel()
            mWebSocket = null
        }
    }

    private fun sendMessageDetail(mPostPayload: String) {
        if (mWebSocket != null) {
            Log.d(TAG, "发送的消息:$mPostPayload")
            mWebSocket?.send(mPostPayload)
        }
    }
}

我们每隔三秒发送一个字符串,这个字符串为当前时间戳.toString,将发送的字符串记录下来,将接受到的字符串记录下来

在下一次需要发送事件时判断上一次发送的数据和接受的数据是否相等,也就是一个发送和接受的流程是否已经走完,如果不等说明心跳数据没有返回或者有什么异常,我们尝试重新连接

WebSocketListener接口


class EchoHeartWebSocketListener : WebSocketListener() {
    //日志TAG
    val TAG: String = "WebSocket"
    
    //连接到长连接地址
    override fun onOpen(webSocket: WebSocket?, response: Response?) {
        super.onOpen(webSocket, response)
        //webSocket.close(1000, "再见");
        Log.d(TAG, "onOpen")
        Log.d(TAG, "发送消息:hello world")
        webSocket?.send("hello world")
        mWebSocket = webSocket
        sendRepeartHeartMessage()//开始发送心跳包
    }

    //接受String数据
    override fun onMessage(webSocket: WebSocket?, text: String?) {
        super.onMessage(webSocket, text)
        Log.d(TAG, "接受到消息:$text")
        mResponsePayLoad = text.toString()
    }

    //接受ByteString数据
    override fun onMessage(webSocket: WebSocket?, bytes: ByteString?) {
        super.onMessage(webSocket, bytes)
        Log.d(TAG, "onMessage byteString:" + bytes!!)
    }

    //长连接关闭中
    override fun onClosed(webSocket: WebSocket?, code: Int, reason: String?) {
        super.onClosed(webSocket, code, reason)
        Log.d(TAG, "onClosing:$code/$reason")
        connect()//重新连接
    }

    //长连接被断开
    override fun onClosing(webSocket: WebSocket?, code: Int, reason: String?) {
        super.onClosing(webSocket, code, reason)
        Log.d(TAG, "onClosed:$code/$reason")
        connect()//重新连接
    }

    //连接失败
    override fun onFailure(webSocket: WebSocket?, t: Throwable?, response: Response?) {
        super.onFailure(webSocket, t, response)
        Log.d(TAG, "onFailure:" + t!!.message)
        connect()//重新连接
    }
}

Activity中使用

class HeartActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_heart)
        mBtn_01.setOnClickListener {
            SocketListenerUtil.connect()
        }
    }
}

5.开源


1>资料

https://github.com/Rabtman/WsManager

https://blog.rabtman.com/2017/01/21/okhttp_ws_use/

https://blog.rabtman.com/2017/01/28/okhttp_ws_source/

start数虽不高,但是我们可以用来学习一下他的封装思想

2>依赖

compile 'com.rabtman.wsmanager:wsmanager:1.0.2'

3>xml

<?xml version="1.0" encoding="utf-8"?>
<LinearLayout
        xmlns:android="http://schemas.android.com/apk/res/android"
        xmlns:tools="http://schemas.android.com/tools"
        xmlns:app="http://schemas.android.com/apk/res-auto"
        android:layout_width="match_parent"
        android:layout_height="match_parent"
        android:orientation="vertical"
        tools:context=".library.WsManagerActivity">

    <Button
            android:id="@+id/mBtn_01"
            android:layout_width="match_parent"
            android:layout_height="wrap_content"
            android:text="开始连接"
            />

    <Button
            android:id="@+id/mBtn_02"
            android:layout_width="match_parent"
            android:layout_height="wrap_content"
            android:text="终止连接"
            />

    <Button
            android:id="@+id/mBtn_03"
            android:layout_width="match_parent"
            android:layout_height="wrap_content"
            android:text="发送消息"
            />
</LinearLayout>

4>Activity

/**
 * 开源WebSocket封装库
 */
class WsManagerActivity : AppCompatActivity() {

    //日志TAG
    val TAG: String = "WebSocket"

    val okHttpClient = OkHttpClient().newBuilder()
        .pingInterval(15, TimeUnit.SECONDS)
        .retryOnConnectionFailure(true)
        .build()

    val wsManager = WsManager.Builder(this)
        .wsUrl("ws://echo.websocket.org")
        .needReconnect(true)
        .client(okHttpClient)
        .build()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(com.example.websocket.R.layout.activity_ws_manager)
        mBtn_01.setOnClickListener {
            wsManager.startConnect()
        }
        mBtn_02.setOnClickListener {
            wsManager.sendMessage("Hello")
        }
        wsManager.setWsStatusListener(object : WsStatusListener() {
            override fun onOpen(response: Response?) {
                super.onOpen(response)
                Log.d(TAG, "onOpen")
                wsManager.sendMessage("Hi")
            }

            override fun onMessage(bytes: ByteString?) {
                super.onMessage(bytes)
                Log.d(TAG, "onMessage$bytes")
            }

            override fun onMessage(text: String?) {
                super.onMessage(text)
                Log.d(TAG, "onMessage:$text")
            }

            override fun onReconnect() {
                super.onReconnect()
                Log.d(TAG, "onReconnect")
            }

            override fun onClosed(code: Int, reason: String?) {
                super.onClosed(code, reason)
                Log.d(TAG, "onClosed")
            }

            override fun onClosing(code: Int, reason: String?) {
                super.onClosing(code, reason)
                Log.d(TAG, "onClosing")
            }

            override fun onFailure(t: Throwable?, response: Response?) {
                super.onFailure(t, response)
                Log.d(TAG, "onFailure")
            }
        })
    }
}

我们构建了一个OkHttpClient对象和一个WsManager对象,然后将OkHttpClient对象传入WsManager中,通过按钮1的 wsManager.startConnect()来进行连接

5>查看日志

点击开始按钮观看日志

D/WebSocket: onOpen
D/WebSocket: onMessage:Hi

然后我们将手机断开网络,查看日志

D/WebSocket: onOpen
D/WebSocket: onMessage:Hi
D/WebSocket: onReconnect
D/WebSocket: onFailure
D/WebSocket: onFailure
D/WebSocket: onReconnect
D/WebSocket: onFailure

这个库自动帮我们完成了重连的操作,而且之后如果网络不恢复会一直持续尝试重连

上一篇下一篇

猜你喜欢

热点阅读