iOS学习笔记产品与运营直播

iOS Starscream实现Websocket通讯

2020-05-12  本文已影响0人  玉思盈蝶

封装WebSocket:

import Starscream
import SwiftyJSON

// MARK: - WebSocketManagerDelegate
protocol WebSocketManagerDelegate: class {
    /// 已建立连接,包括正常连接和自动重新连接两种情况。
    func webSocketManagerDidConnect(manager: WebSocketManager)
    /// 已断开连接,包括正常和非正常断开连接两种情况。参数 `isReconnecting` 表示是否处于等待重新连接状态。
    func webSocketManagerDidDisconnect(manager: WebSocketManager, error: Error?, isReconnecting: Bool)
}

// MARK: - WebSocketManager
class WebSocketManager {

    /// 收到新的打印异常
    var hasNewPrintException = false

    /// 尚未收到过打印异常
    var hasNotReceivedPrintException = true

    weak var delegate: WebSocketManagerDelegate?

    var enableLog = false

    private var isHeartbeatTimeout = false
    private var heartbeatInterval: TimeInterval = 5
    private var heartbeatTimeout: TimeInterval = 10
    private var heartbeatTimestamp: TimeInterval = 0 // 毫秒

    private var reconnectRetryCount = 0
    private var maxReconnectRetryCount = 3
    private var reconnectOperation: DispatchWorkItem?
    private var reconnectRetryInterval: TimeInterval = 5

    private var error: Error?

    private(set) var state = State.closed {
        didSet {
            stateChanged(oldState: oldValue)
        }
    }

    private var isHeartbeatTimerSuspended = true

    private lazy var heartbeatTimer: DispatchSourceTimer = {
        let timer = DispatchSource.makeTimerSource(queue: .main)
        timer.schedule(deadline: .now(), repeating: heartbeatInterval, leeway: .milliseconds(100))
        timer.setEventHandler { [weak self] in
            guard let `self` = self else { return }
            if !self.isHeartbeatTimerSuspended {
                self.sendHeartbeat()
            }
        }
        return timer
    }()

    private let url: URL

    private lazy var socket: WebSocket = {
        let socket = WebSocket(url: url)
        socket.delegate = self
        return socket
    }()

    deinit {
        disconnect()
        destroyHeartbeatTimer()
    }

    init(url: URL) {
        self.url = url
    }

}

// MARK: - 连接相关
extension WebSocketManager {

    enum State {
        /// 等待重连中
        case reconnecting
        /// 正在连接中
        case connecting
        /// 已连接
        case connected
        /// 断开连接中
        case closing
        /// 已断开
        case closed
    }

    /// 只在 `state` 值为 `closed` 时有效果。
    func connect() {
        if state == .closed {
            state = .connecting
        }
    }

    /// 只在 `state` 值为 `reconnecting` 或 `connecting` 或 `connected` 时有效果。
    func disconnect() {
        switch state {
        case .reconnecting, .connecting, .connected:
            state = .closing
        case .closing, .closed:
            break
        }
    }

    private func reconnect() {
        guard state == .reconnecting else { return }

        guard reconnectRetryCount < maxReconnectRetryCount else {
            state = .closed
            return
        }

        delegate?.webSocketManagerDidDisconnect(manager: self, error: error, isReconnecting: true)

        // webSocketManagerDidDisconnect 方法中可能会进行 disconnect 操作
        guard state == .reconnecting else { return }

        reconnectOperation = DispatchWorkItem { [weak self] in
            guard let `self` = self else { return }

            self.increaseReconnectRetryCount()
            self.clearReconnectOperation()
            self.state = .connecting
        }

        if reconnectRetryCount == 0 {
            reconnectOperation?.perform()
        } else {
            DispatchQueue.main.asyncAfter(deadline: .now() + reconnectRetryInterval, execute: reconnectOperation!)
        }
    }

    private func clearReconnectOperation() {
        reconnectOperation = nil
    }

    private func cancelAndClearReconnectOperation() {
        reconnectOperation?.cancel()
        clearReconnectOperation()
    }

    private func increaseReconnectRetryCount() {
        reconnectRetryCount = min(reconnectRetryCount + 1, maxReconnectRetryCount)
    }

    private func resetReconnectRetryCount() {
        reconnectRetryCount = 0
    }

    private func stateChanged(oldState: State) {
        switch state {

        case .connecting:
            handleConnectingState()

        case .connected:
            handleConnectedState()

        case .reconnecting:
            handleReconnectingState()

        case .closing:
            handleClosingState(oldState: oldState)

        case .closed:
            handleClosedState(oldState: oldState)
        }
    }

    private func handleConnectingState() {
        socket.connect()
    }

    private func handleConnectedState() {
        resumeHeartbeatTimer()
        resetReconnectRetryCount()
        delegate?.webSocketManagerDidConnect(manager: self)
    }

    private func handleReconnectingState() {
        suspendHeartbeatTimer()
        reconnect()
    }

    private func handleClosingState(oldState: State) {
        switch oldState {

        case .connecting, .connected:
            suspendHeartbeatTimer()
            socket.disconnect()

        case .reconnecting:
            state = .closed

        case .closing, .closed:
            fatalError()
        }
    }

    private func handleClosedState(oldState: State) {
        switch oldState {

        case .closing: // 主动断开
            resetReconnectRetryCount()
            cancelAndClearReconnectOperation()

            if isHeartbeatTimeout {
                isHeartbeatTimeout = false
                state = .reconnecting // 心跳超时重连
            } else {
                delegate?.webSocketManagerDidDisconnect(
                    manager: self,
                    error: error,
                    isReconnecting: false)
            }

        case .reconnecting: // 重连次数上限
            resetReconnectRetryCount()
            delegate?.webSocketManagerDidDisconnect(
                manager: self,
                error: error,
                isReconnecting: false)

        case .connected, .connecting, .closed:
            fatalError()
        }
    }

}

// MARK: - 心跳相关
private extension WebSocketManager {

    func sendHeartbeat() {
        if socket.isConnected {
            updateHeartbeatTimestamp()
            socket.write(string: "\(heartbeatTimestamp)")
        }
    }

    func updateHeartbeatTimestamp() {
        heartbeatTimestamp = Date().timeIntervalSince1970 * 1_000
    }

    private func handleHeartbeatTimeout() {
        isHeartbeatTimeout = true
        disconnect()
    }

    func resumeHeartbeatTimer() {
        if isHeartbeatTimerSuspended {
            isHeartbeatTimerSuspended = false
            heartbeatTimer.resume()
        }
    }

    func suspendHeartbeatTimer() {
        if !isHeartbeatTimerSuspended {
            isHeartbeatTimerSuspended = true
            heartbeatTimer.suspend()
        }
    }

    func destroyHeartbeatTimer() {
        heartbeatTimer.cancel()
        resumeHeartbeatTimer()
    }

}

// MARK: - 消息处理
extension WebSocketManager {

    enum MessageType: String {

        case beat
        case print

        fileprivate func messageHandler(_ manager: WebSocketManager) -> (JSON) -> Void {
            switch self {
            case .beat:
                return WebSocketManager.handleBeatMessage(manager)
            case .print:
                return WebSocketManager.handlePrintMessage(manager)
            }
        }

    }

    private func handleBeatMessage(_ msg: JSON) {
        // {"msgType":"beat","ts":"1536134255660.87"}
        guard let ts = Double(msg["ts"].stringValue) else {
            return
        }

        // 超时重连
        if ts - heartbeatTimestamp > heartbeatTimeout {
            handleHeartbeatTimeout()
        }
    }

    private func handlePrintMessage(_ msg: JSON) {
        hasNewPrintException = true
        hasNotReceivedPrintException = false
        NotificationCenter.default.post(name: .PrintExceptionDidReceive, object: nil)
    }

}

// MARK: - WebSocketDelegate
extension WebSocketManager: WebSocketDelegate {
    
    func websocketDidConnect(socket: WebSocketClient) {
        log("websocketDidConnect")
        state = .connected
    }

    func websocketDidDisconnect(socket: WebSocketClient, error: Error?) {
        log("websocketDidDisconnect error: \(error as Any)")

        self.error = error
        
        switch state {

        case .connecting, .connected:
            state = .reconnecting // 断线重连

        case .closing:
            state = .closed // 主动关闭

        case .closed, .reconnecting:
            fatalError()
        }
    }

    func websocketDidReceiveMessage(socket: WebSocketClient, text: String) {
        log("websocketDidReceiveMessage text: \(text)")

        if state == .connected {
            let message = JSON(parseJSON: text)
            MessageType(rawValue: message["msgType"].stringValue)?.messageHandler(self)(message)
        }
    }

    func websocketDidReceiveData(socket: WebSocketClient, data: Data) {
        log("websocketDidReceiveData \n \(String(data: data, encoding: .utf8)!)")
    }

}

extension WebSocketManager {

    func log(
        _ log: @autoclosure () -> String = "",
        file: String = #file,
        line: Int = #line,
        function: String = #function)
    {
        if enableLog {
            print("\(function) at \((file as NSString).lastPathComponent)[\(line)]", log())
        }
    }
}

参考链接:

https://www.jianshu.com/p/7879bd578db7

上一篇下一篇

猜你喜欢

热点阅读