Skip to content

Commit

Permalink
Merge pull request #484 from ring-clojure/websockets
Browse files Browse the repository at this point in the history
Add experimental websocket support
  • Loading branch information
weavejester authored Aug 17, 2023
2 parents c62702a + 7169e59 commit 087886e
Show file tree
Hide file tree
Showing 4 changed files with 333 additions and 32 deletions.
87 changes: 87 additions & 0 deletions ring-core/src/ring/websocket.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
(ns ring.websocket
(:refer-clojure :exclude [send])
(:import [java.nio ByteBuffer]))

(defprotocol Listener
(on-open [listener socket])
(on-message [listener socket message])
(on-pong [listener socket data])
(on-error [listener socket throwable])
(on-close [listener socket code reason]))

(extend-protocol Listener
clojure.lang.IPersistentMap
(on-open [m socket]
(when-let [kv (find m :on-open)] ((val kv) socket)))
(on-message [m socket message]
(when-let [kv (find m :on-message)] ((val kv) socket message)))
(on-pong [m socket data]
(when-let [kv (find m :on-pong)] ((val kv) socket data)))
(on-error [m socket throwable]
(when-let [kv (find m :on-error)] ((val kv) socket throwable)))
(on-close [m socket code reason]
(when-let [kv (find m :on-close)] ((val kv) socket code reason))))

(defprotocol Socket
(-send [socket message])
(-ping [socket data])
(-pong [socket data])
(-close [socket status reason]))

(defprotocol AsyncSocket
(-send-async [socket message succeed fail]))

(defprotocol TextData
(->string [data]))

(defprotocol BinaryData
(->byte-buffer [data]))

(extend-protocol TextData
String
(->string [s] s))

(extend-protocol BinaryData
(Class/forName "[B")
(->byte-buffer [bs] (ByteBuffer/wrap bs))
ByteBuffer
(->byte-buffer [bb] bb))

(defn- encode-message [message]
(cond
(satisfies? TextData message) (->string message)
(satisfies? BinaryData message) (->byte-buffer message)
:else (throw (ex-info "message is not a valid text or binary data type"
{:message message}))))

(defn send
([socket message]
(-send socket (encode-message message)))
([socket message succeed fail]
(-send-async socket (encode-message message) succeed fail)))

(defn ping
([socket]
(-ping socket (ByteBuffer/allocate 0)))
([socket data]
(-ping socket (->byte-buffer data))))

(defn pong
([socket]
(-pong socket (ByteBuffer/allocate 0)))
([socket data]
(-pong socket (->byte-buffer data))))

(defn close
([socket]
(-close socket 1000 "Normal Closure"))
([socket code reason]
(-close socket code reason)))

(defn websocket-request? [request]
(let [headers (:headers request)]
(and (.equalsIgnoreCase "upgrade" (get headers "connection"))
(.equalsIgnoreCase "websocket" (get headers "upgrade")))))

(defn websocket-response? [response]
(contains? response ::listener))
7 changes: 5 additions & 2 deletions ring-jetty-adapter/project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@
:dependencies [[org.clojure/clojure "1.7.0"]
[ring/ring-core "1.10.0"]
[ring/ring-jakarta-servlet "1.10.0"]
[org.eclipse.jetty/jetty-server "11.0.15"]]
[org.eclipse.jetty/jetty-server "11.0.15"]
[org.eclipse.jetty.websocket/websocket-jetty-server "11.0.15"]]
:aliases {"test-all" ["with-profile" "default:+1.8:+1.9:+1.10:+1.11" "test"]}
:profiles
{:dev {:dependencies [[clj-http "3.12.3"]
[less-awful-ssl "1.0.6"]]
[less-awful-ssl "1.0.6"]
[hato "0.9.0"]
[org.slf4j/slf4j-simple "2.0.7"]]
:jvm-opts ["-Dorg.eclipse.jetty.server.HttpChannelState.DEFAULT_TIMEOUT=500"]}
:1.8 {:dependencies [[org.clojure/clojure "1.8.0"]]}
:1.9 {:dependencies [[org.clojure/clojure "1.9.0"]]}
Expand Down
139 changes: 111 additions & 28 deletions ring-jetty-adapter/src/ring/adapter/jetty.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
"A Ring adapter that uses the Jetty 9 embedded web server.
Adapters are used to convert Ring handlers into running web servers."
(:require [ring.util.jakarta.servlet :as servlet])
(:import [org.eclipse.jetty.server
(:require [ring.util.jakarta.servlet :as servlet]
[ring.websocket :as ws])
(:import [java.nio ByteBuffer]
[org.eclipse.jetty.server
Request
Server
ServerConnector
Expand All @@ -12,55 +14,136 @@
HttpConnectionFactory
SslConnectionFactory
SecureRequestCustomizer]
[org.eclipse.jetty.servlet ServletContextHandler ServletHandler]
[org.eclipse.jetty.server.handler AbstractHandler]
[org.eclipse.jetty.util BlockingArrayQueue]
[org.eclipse.jetty.util.thread ThreadPool QueuedThreadPool]
[org.eclipse.jetty.util.ssl SslContextFactory$Server KeyStoreScanner]
[org.eclipse.jetty.websocket.server
JettyWebSocketServerContainer
JettyWebSocketCreator]
[org.eclipse.jetty.websocket.api
Session
WebSocketConnectionListener
WebSocketListener
WebSocketPingPongListener
WriteCallback]
[org.eclipse.jetty.websocket.server.config
JettyWebSocketServletContainerInitializer]
[jakarta.servlet AsyncContext DispatcherType AsyncEvent AsyncListener]
[jakarta.servlet.http HttpServletRequest HttpServletResponse]))

(defn- ^AbstractHandler proxy-handler [handler]
(proxy [AbstractHandler] []
(handle [_ ^Request base-request ^HttpServletRequest request response]
(when-not (= (.getDispatcherType request) DispatcherType/ERROR)
(let [request-map (servlet/build-request-map request)
response-map (handler request-map)]
(servlet/update-servlet-response response response-map)
(.setHandled base-request true))))))
(defn- websocket-socket [^Session session]
(let [remote (.getRemote session)]
(reify
ws/Socket
(-send [_ message]
(if (string? message)
(.sendString remote message)
(.sendBytes remote message)))
(-ping [_ data]
(.sendPing remote data))
(-pong [_ data]
(.sendPong remote data))
(-close [_ status reason]
(.close session status reason))
ws/AsyncSocket
(-send-async [_ message succeed fail]
(let [callback (reify WriteCallback
(writeSuccess [_] (succeed))
(writeFailed [_ ex] (fail ex)))]
(if (string? message)
(.sendString remote message callback)
(.sendBytes remote message callback)))))))

(defn- websocket-listener [listener]
(let [socket (volatile! nil)]
(reify
WebSocketConnectionListener
(onWebSocketConnect [_ session]
(vreset! socket (websocket-socket session))
(ws/on-open listener @socket))
(onWebSocketClose [_ status reason]
(ws/on-close listener @socket status reason))
(onWebSocketError [_ throwable]
(ws/on-error listener @socket throwable))
WebSocketListener
(onWebSocketText [_ message]
(ws/on-message listener @socket message))
(onWebSocketBinary [_ payload offset length]
(let [buffer (ByteBuffer/wrap payload offset length)]
(ws/on-message listener @socket buffer)))
WebSocketPingPongListener
(onWebSocketPing [_ _])
(onWebSocketPong [_ payload]
(ws/on-pong listener @socket payload)))))

(defn- websocket-creator [{listener ::ws/listener}]
(reify JettyWebSocketCreator
(createWebSocket [_ _ _]
(websocket-listener listener))))

(defn- upgrade-to-websocket [^HttpServletRequest request response response-map]
(let [context (.getServletContext request)
container (JettyWebSocketServerContainer/getContainer context)
creator (websocket-creator response-map)]
(.upgrade container creator request response)))

(defn- ^ServletHandler proxy-handler [handler]
(proxy [ServletHandler] []
(doHandle [_ ^Request base-request request response]
(let [request-map (servlet/build-request-map request)
response-map (handler request-map)]
(try
(if (ws/websocket-response? response-map)
(upgrade-to-websocket request response response-map)
(servlet/update-servlet-response response response-map))
(finally
(.setHandled base-request true)))))))

(defn- async-jetty-raise [^AsyncContext context ^HttpServletResponse response]
(fn [^Throwable exception]
(.sendError response 500 (.getMessage exception))
(.complete context)))

(defn- async-jetty-respond [context response]
(defn- async-jetty-respond [context request response]
(fn [response-map]
(servlet/update-servlet-response response context response-map)))
(if (ws/websocket-response? response-map)
(upgrade-to-websocket request response response-map)
(servlet/update-servlet-response response context response-map))))

(defn- async-timeout-listener [request context response handler]
(proxy [AsyncListener] []
(onTimeout [^AsyncEvent _]
(handler (servlet/build-request-map request)
(async-jetty-respond context response)
(async-jetty-respond context request response)
(async-jetty-raise context response)))
(onComplete [^AsyncEvent _])
(onError [^AsyncEvent _])
(onStartAsync [^AsyncEvent _])))

(defn- ^AbstractHandler async-proxy-handler [handler timeout timeout-handler]
(proxy [AbstractHandler] []
(handle [_ ^Request base-request ^HttpServletRequest request ^HttpServletResponse response]
(defn- ^ServletHandler async-proxy-handler [handler timeout timeout-handler]
(proxy [ServletHandler] []
(doHandle [_ ^Request base-request request response]
(let [^AsyncContext context (.startAsync request)]
(.setTimeout context timeout)
(when timeout-handler
(.addListener
context
(async-timeout-listener request context response timeout-handler)))
(handler
(servlet/build-request-map request)
(async-jetty-respond context response)
(async-jetty-raise context response))
(.setHandled base-request true)))))
(try
(handler
(servlet/build-request-map request)
(async-jetty-respond context request response)
(async-jetty-raise context response))
(finally
(.setHandled base-request true)))))))

(defn- ^ServletContextHandler context-handler [proxy-handler]
(doto (ServletContextHandler.)
(.setServletHandler proxy-handler)
(.setAllowNullPathInfo true)
(JettyWebSocketServletContainerInitializer/configure nil)))

(defn- ^ServerConnector server-connector [^Server server & factories]
(ServerConnector. server #^"[Lorg.eclipse.jetty.server.ConnectionFactory;" (into-array ConnectionFactory factories)))
Expand Down Expand Up @@ -213,13 +296,13 @@
:response-header-size - the maximum size of a response header (default 8192)
:send-server-version? - add Server header to HTTP response (default true)"
[handler options]
(let [server (create-server (dissoc options :configurator))]
(if (:async? options)
(.setHandler server
(async-proxy-handler handler
(:async-timeout options 0)
(:async-timeout-handler options)))
(.setHandler server (proxy-handler handler)))
(let [server (create-server (dissoc options :configurator))
proxy (if (:async? options)
(async-proxy-handler handler
(:async-timeout options 0)
(:async-timeout-handler options))
(proxy-handler handler))]
(.setHandler server (context-handler proxy))
(when-let [configurator (:configurator options)]
(configurator server))
(try
Expand Down
Loading

0 comments on commit 087886e

Please sign in to comment.