clojure&clojurescript前后端实现websoc

2019-10-31  本文已影响0人  小马将过河

服务端

服务端配置websocket相对来说挺简单,因为我们项目当初new的时候没有加websocket,现在是参考luminusweb websocket往里增加关键代码

(ns alk-wxapi.routes.websockets
  (:require
   [alk-wxapi.routes.service.ws-service :refer [ws-handler countdown]]))

(defn websocket-routes []
  ["/ws"
   {:swagger {:tags ["websocket"]}}

   [""
    {:get {:summary    "websocket 入口"
           :parameters {}
           :handler    (fn [request]
                         (ws-handler request))}}]
   ])
(ns alk-wxapi.routes.service.ws-service
  (:require
   [clojure.tools.logging :as log]
   [immutant.web.async :as async]
   [immutant.web.sse :as sse]))

(defonce channels (atom #{}))

(defn broadcast-msg
  "广播消息"
  [data]
  (log/info "广播消息:" data)
  (doseq [channel @channels]
    (async/send! channel (str data))))

(defn notify-clients! [channel data]
  (log/info (java.util.Date.) "收到客户端发送的message:" data)
  (async/send! channel (str {:message data
                             :type "reply"})))

(defn connect! [channel]
  (log/info "channel open")
  (swap! channels conj channel))

(defn disconnect! [channel {:keys [code reason]}]
  (log/info "close code:" code "reason:" reason)
  (swap! channels #(remove #{channel} %)))

(def websocket-callbacks
  "WebSocket callback functions"
  {:on-open connect!
   :on-close disconnect!
   :on-message notify-clients!})

(defn ws-handler [request]
  (async/as-channel request websocket-callbacks))

$ wscat -c ws://localhost:3000/api/ws
Connected (press CTRL+C to quit)
>

客户端

在任意一个页面找个契机开始创建连接,参考multi-client-ws-immutant创建时指定消息处理回调函数。
代码如下:

(defn- receive-transit-msg!
  [update-fn]
  (fn [msg]
    (js/clearTimeout clear-time)
    (update-fn (cljs.reader/read-string (.-data msg)))))

(defn- send-transit-msg!
  [msg]
  (if @ws-chan
    (.send @ws-chan msg)
    (throw (js/Error. "Websocket is not available!"))))
(defn make-websocket! [url receive-handler]
 (println "attempting to connect websocket")
 (if-let [chan (js/WebSocket. url)]
   (do
     (set! (.-onmessage chan) (receive-transit-msg! receive-handler))
     (reset! ws-chan chan)
     (println "Websocket connection established with: " url))
   (throw (js/Error. "Websocket connection failed!"))))
(require ' [alk-doc.ws :as ws :refer [webSocketUrl click-one]])

(defn handler-ws-msg
 “可以根据消息内容或者自定义的类型,进行相应的业务处理”
 [msg]
 (js/console.info "收到广播消息:" msg))

(ws/make-websocket! webSocketUrl handler-ws-msg)

这样就建立了前后端ws通信。

生产环境上的特殊处理

但是在生产环境(https+nginx)上要做点特殊处理,这里的处理,服务端和客户端都有。

因此websocket的连接变成了wss://www.abc.com/api/ws/

生产环境服务端的接口使用nginx做了反向代理,不出意外的话,上面的连接即使改成wss也连接不成功,需要nginx的配置如下:

http {
    map $http_upgrade $connection_upgrade {
        default upgrade;
        ''      close;
    }

    server {
        ...

        location /chat/ {
            proxy_pass http://backend;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection $connection_upgrade;
            proxy_read_timeout 600s; 
        }
  }

我们的程序是一个jar直接java启动的,这样在连接上后不会自己断开的,但是上面说了,生产环境使用nginx代理的,默认情况下,利用nginx代理websocket的时候,发现客户端和服务器握手成功后,如果在60s时间内没有数据交互,连接就会自动断开。因此上面配置了proxy_read_timeout 600s,也就是10分钟没有通信再断开。
这个时间可以长,但我没试过是不是可以无限长。
所以最好是在客户端加上心跳检测,断开后有能力自己重连,及时服务器不设置10分钟,60秒钟断开也可以自己重连。
终极方案是心跳检测+延长read-timeout时间,客户端的代码:

(require '[reagent.core :as reagent :refer [atom]])
(defonce ws-chan (atom nil))
(def lockReconnect (atom false))
(def clear-time (atom nil))
(def click-one (atom true))
(def webSocketUrl "ws://182.61.51.177:3055/api/ws")

(defn receive-transit-msg!
  [update-fn]
  (fn [msg]
    (js/clearTimeout clear-time)
    (update-fn (cljs.reader/read-string (.-data msg)))))

(defn send-transit-msg!
  [msg]
  (if @ws-chan
    (.send @ws-chan msg)
    (throw (js/Error. "Websocket is not available!"))))

(set! (.-onbeforeunload js/window) (fn []
                                     (.close (js/WebSocket. webSocketUrl))))

(defn initEventHandle [url chan receive-handler]
  (do
    (set! (.-onmessage chan) (receive-transit-msg! receive-handler))
    (reset! ws-chan chan)
    (js/console.log "Websocket connection established with: " url)
    (set! (.-onopen chan) (fn []
                            (js/console.log "连接成功")
                            (js/clearTimeout clear-time)))
    (set! (.-onclose chan) (fn []
                             (js/console.log "连接断开>>>")
                             (reset! lockReconnect true)
                             (if (true? @lockReconnect)
                               (do                                 
                                 ;;没连接上会一直重连,设置延迟避免请求过多
                                 (reset! clear-time
                                         (js/setTimeout (fn []
                                                          (initEventHandle webSocketUrl
                                                                           (js/WebSocket. url)
                                                                           receive-handler)
                                                          (reset! lockReconnect false)) 2000))
                                 (reset! lockReconnect false))
                               (do                                 
                                 (js/clearTimeout clear-time)))))
    (set! (.-onerror chan) (fn []
                             (js/console.Error "连接错误")
                             (reset! lockReconnect true)
                             (if (true? @lockReconnect)
                               (do                                 
                                 ;;没连接上会一直重连,设置延迟避免请求过多
                                 (reset! clear-time
                                         (js/setTimeout (fn []
                                                          (initEventHandle webSocketUrl
                                                                           (js/WebSocket. url)
                                                                           receive-handler)
                                                          (reset! lockReconnect false)) 2000))
                                 (reset! lockReconnect false))
                               (do                                 
                                 (js/clearTimeout clear-time)))))))

(defn make-websocket! [url receive-handler]
  (js/console.log "attempting to connect websocket")
  (if-let [chan (js/WebSocket. url)]
    (initEventHandle url chan receive-handler)
    (throw (js/Error. "Websocket connection failed!"))))

参考官方demo的实现后,只是成功进行了连接,并不能保证真正的长连,因此上面代码里对当前channel的oncloseonerror事件的处理是我们前端的姑娘参考WebSocket加入心跳包防止自动断开连接js版本用cljs做的实现,给她点个赞👍。

到这里,clojure和clojurescript里搭建websocket通信就算是可用了。

client testing

(require '[reagent.core :as reagent :refer [atom]])

(defonce messages (reagent/atom []))

(defn message-list []
  [:ul
   (for [[i message] (map-indexed vector @messages)]
     ^{:key i}
     [:li message])])

(defn message-input []
  (reagent/with-let [value (reagent/atom nil)]
    [:input
     {:type        :text
      :placeholder "输入&回车发送"
      :value       @value
      :on-change   #(reset! value (-> % .-target .-value))
      :on-key-down #(when (= (.-keyCode %) 13)
                      (send-transit-msg!
                       {:message @value})
                      (reset! value nil))}]))

(defn update-messages! [{:keys [message]}]
  (js/console.info "收到服务端返回的消息")
  (swap! messages #(vec (take 10 (conj % message)))))

(make-websocket! webSocketUrl update-messages!)
(defn home-page []
 [:div.container
  [:div.row
   [:div.col-md-12
    [:h2 "Welcome to chat"]
    [:h4 "服务端将你发送的内容加了个key做了回答"]]]
  [:div.row
   [:div.col-sm-6
    [message-list]]]
  [:div.row
   [:div.col-sm-6
    [message-input]]]])

[home-page]

参考

immutant websockets sample

WebSocket proxying

WebSocket加入心跳包防止自动断开连接

上一篇下一篇

猜你喜欢

热点阅读