后台的luminus模板的clojure项目和前端reagent clojurescript项目,各自搭建websocket通信
服务端
服务端配置websocket相对来说挺简单,因为我们项目当初new的时候没有加websocket,现在是参考luminusweb websocket往里增加关键代码
- 创建ws-route
创建如下route,并跟普通http api route一样加入handler序列中。
(ns alk-wxapi.routes.websockets
(:require
[alk-wxapi.routes.service.ws-service :refer [ws-handler countdown]]))
(defn websocket-routes []
["/ws"
{:swagger {:tags ["websocket"]}}
[""
{:get {:summary "websocket 入口"
:parameters {}
:handler (fn [request]
(ws-handler request))}}]
])
- 创建ws-service
即上面require的ws-service,内容如下:
(ns alk-wxapi.routes.service.ws-service
(:require
[clojure.tools.logging :as log]
[immutant.web.async :as async]
[immutant.web.sse :as sse]))
(defonce channels (atom #{}))
(defn broadcast-msg
"广播消息"
[data]
(log/info "广播消息:" data)
(doseq [channel @channels]
(async/send! channel (str data))))
(defn notify-clients! [channel data]
(log/info (java.util.Date.) "收到客户端发送的message:" data)
(async/send! channel (str {:message data
:type "reply"})))
(defn connect! [channel]
(log/info "channel open")
(swap! channels conj channel))
(defn disconnect! [channel {:keys [code reason]}]
(log/info "close code:" code "reason:" reason)
(swap! channels #(remove #{channel} %)))
(def websocket-callbacks
"WebSocket callback functions"
{:on-open connect!
:on-close disconnect!
:on-message notify-clients!})
(defn ws-handler [request]
(async/as-channel request websocket-callbacks))
$ wscat -c ws://localhost:3000/api/ws
Connected (press CTRL+C to quit)
>
客户端
在任意一个页面找个契机开始创建连接,参考multi-client-ws-immutant创建时指定消息处理回调函数。
代码如下:
- 根目录或者任意目录创建ws连接工具类
(defn- receive-transit-msg!
[update-fn]
(fn [msg]
(js/clearTimeout clear-time)
(update-fn (cljs.reader/read-string (.-data msg)))))
(defn- send-transit-msg!
[msg]
(if @ws-chan
(.send @ws-chan msg)
(throw (js/Error. "Websocket is not available!"))))
(defn make-websocket! [url receive-handler]
(println "attempting to connect websocket")
(if-let [chan (js/WebSocket. url)]
(do
(set! (.-onmessage chan) (receive-transit-msg! receive-handler))
(reset! ws-chan chan)
(println "Websocket connection established with: " url))
(throw (js/Error. "Websocket connection failed!"))))
- 在某个页面触发websocket连接
(require ' [alk-doc.ws :as ws :refer [webSocketUrl click-one]])
(defn handler-ws-msg
“可以根据消息内容或者自定义的类型,进行相应的业务处理”
[msg]
(js/console.info "收到广播消息:" msg))
(ws/make-websocket! webSocketUrl handler-ws-msg)
这样就建立了前后端ws通信。
生产环境上的特殊处理
但是在生产环境(https+nginx)上要做点特殊处理,这里的处理,服务端和客户端都有。
- 生产环境多是https的,这就要求页面里面的websocket必须要是
wss
的,而不能是ws
的
因此websocket的连接变成了wss://www.abc.com/api/ws/
- 服务端nginx配置支持websocket
生产环境服务端的接口使用nginx做了反向代理,不出意外的话,上面的连接即使改成wss也连接不成功,需要nginx的配置如下:
http {
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
server {
...
location /chat/ {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
proxy_read_timeout 600s;
}
}
- 最重要的,客户端加入心跳检测,端口后自动重连
我们的程序是一个jar直接java启动的,这样在连接上后不会自己断开的,但是上面说了,生产环境使用nginx代理的,默认情况下,利用nginx代理websocket的时候,发现客户端和服务器握手成功后,如果在60s时间内没有数据交互,连接就会自动断开。因此上面配置了proxy_read_timeout 600s
,也就是10分钟没有通信再断开。
这个时间可以长,但我没试过是不是可以无限长。
所以最好是在客户端加上心跳检测,断开后有能力自己重连,及时服务器不设置10分钟,60秒钟断开也可以自己重连。
终极方案是心跳检测+延长read-timeout时间,客户端的代码:
(require '[reagent.core :as reagent :refer [atom]])
(defonce ws-chan (atom nil))
(def lockReconnect (atom false))
(def clear-time (atom nil))
(def click-one (atom true))
(def webSocketUrl "ws://182.61.51.177:3055/api/ws")
(defn receive-transit-msg!
[update-fn]
(fn [msg]
(js/clearTimeout clear-time)
(update-fn (cljs.reader/read-string (.-data msg)))))
(defn send-transit-msg!
[msg]
(if @ws-chan
(.send @ws-chan msg)
(throw (js/Error. "Websocket is not available!"))))
(set! (.-onbeforeunload js/window) (fn []
(.close (js/WebSocket. webSocketUrl))))
(defn initEventHandle [url chan receive-handler]
(do
(set! (.-onmessage chan) (receive-transit-msg! receive-handler))
(reset! ws-chan chan)
(js/console.log "Websocket connection established with: " url)
(set! (.-onopen chan) (fn []
(js/console.log "连接成功")
(js/clearTimeout clear-time)))
(set! (.-onclose chan) (fn []
(js/console.log "连接断开>>>")
(reset! lockReconnect true)
(if (true? @lockReconnect)
(do
;;没连接上会一直重连,设置延迟避免请求过多
(reset! clear-time
(js/setTimeout (fn []
(initEventHandle webSocketUrl
(js/WebSocket. url)
receive-handler)
(reset! lockReconnect false)) 2000))
(reset! lockReconnect false))
(do
(js/clearTimeout clear-time)))))
(set! (.-onerror chan) (fn []
(js/console.Error "连接错误")
(reset! lockReconnect true)
(if (true? @lockReconnect)
(do
;;没连接上会一直重连,设置延迟避免请求过多
(reset! clear-time
(js/setTimeout (fn []
(initEventHandle webSocketUrl
(js/WebSocket. url)
receive-handler)
(reset! lockReconnect false)) 2000))
(reset! lockReconnect false))
(do
(js/clearTimeout clear-time)))))))
(defn make-websocket! [url receive-handler]
(js/console.log "attempting to connect websocket")
(if-let [chan (js/WebSocket. url)]
(initEventHandle url chan receive-handler)
(throw (js/Error. "Websocket connection failed!"))))
参考官方demo的实现后,只是成功进行了连接,并不能保证真正的长连,因此上面代码里对当前channel的onclose
和onerror
事件的处理是我们前端的姑娘参考WebSocket加入心跳包防止自动断开连接js版本用cljs做的实现,给她点个赞👍。
到这里,clojure和clojurescript里搭建websocket通信就算是可用了。
client testing
(require '[reagent.core :as reagent :refer [atom]])
(defonce messages (reagent/atom []))
(defn message-list []
[:ul
(for [[i message] (map-indexed vector @messages)]
^{:key i}
[:li message])])
(defn message-input []
(reagent/with-let [value (reagent/atom nil)]
[:input
{:type :text
:placeholder "输入&回车发送"
:value @value
:on-change #(reset! value (-> % .-target .-value))
:on-key-down #(when (= (.-keyCode %) 13)
(send-transit-msg!
{:message @value})
(reset! value nil))}]))
(defn update-messages! [{:keys [message]}]
(js/console.info "收到服务端返回的消息")
(swap! messages #(vec (take 10 (conj % message)))))
(make-websocket! webSocketUrl update-messages!)
(defn home-page []
[:div.container
[:div.row
[:div.col-md-12
[:h2 "Welcome to chat"]
[:h4 "服务端将你发送的内容加了个key做了回答"]]]
[:div.row
[:div.col-sm-6
[message-list]]]
[:div.row
[:div.col-sm-6
[message-input]]]])
[home-page]