Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
silently dead connection and triggers a reconnect. It is off by default and
enabled via the `:heartbeat-interval` option; it never disrupts a server that
doesn't answer pings.
* The server no longer rejects a second client. Several clients may be connected
at once; evaluations go to the most recently connected one (so a new client
takes over the REPL) while the others stay connected and their output still
reaches the REPL. An evaluation whose target client disconnects mid-flight now
reports an error instead of hanging the REPL.
* Ship a `deps.edn` so the library can be consumed via the Clojure CLI / tools.deps.
* Add a GitHub Actions CI pipeline and a basic test suite, including a Node
round-trip integration test that exercises the full eval cycle over a real
Expand Down
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,11 @@ java.io.IOException: No client connected to Websocket
nil
```

Only a single client can be connected to the REPL at once. Attempting
to connect to an occupied REPL server will throw an exception in the
client.
More than one client may be connected at once. Evaluations are sent to the
most recently connected client, so a newly connected client takes over the
REPL; the others stay connected and their printed output still reaches the
REPL. This pairs naturally with auto-reconnect - a client that drops and
comes back simply becomes the active one again.

## Example

Expand Down
39 changes: 31 additions & 8 deletions dev/weasel/integration.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* the heartbeat (the client keeps pinging, the server pongs, and the
connection is not torn down)
* auto-reconnect after the server is bounced
* takeover: a second client becomes the active eval target

Run with:

Expand All @@ -26,15 +27,16 @@
(defn- fresh-round! []
(reset! signals {:ready (promise) :printed (promise) :result (promise)}))

(defn- handle [data]
(defn- handle [channel data]
(let [msg (edn/read-string data)
{:keys [ready printed result]} @signals]
(case (:op msg)
:ready (do (swap! ready-count inc) (deliver ready true))
:print (deliver printed (:value msg))
:result (deliver result (:value msg))
:ping (do (swap! ping-count inc)
(server/send! (pr-str {:op :pong})))
;; pong back to the client that pinged, like the real server
(server/send-to! channel (pr-str {:op :pong})))
nil)))

(defn- await! [what p]
Expand All @@ -46,20 +48,24 @@
(defn- wait-for-client! [what]
;; like server/wait-for-client, but bounded so a broken reconnect fails the
;; run instead of hanging forever
(when (= ::timeout (deref (server/channel) 10000 ::timeout))
(when (= ::timeout (deref (server/ready) 10000 ::timeout))
(throw (ex-info (str "timed out waiting for " what) {}))))

(defn- eval! [code]
(server/send! (pr-str {:op :eval-js :code code})))

(defn- ^Process launch-client [id]
(let [^"[Ljava.lang.String;" cmd (into-array String
["node" client-js (str "ws://127.0.0.1:" port)
(str heartbeat-ms) id])]
(-> (ProcessBuilder. cmd) (.inheritIO) (.start))))

(defn -main [& _]
(let [ok? (atom false)]
(fresh-round!)
(server/start handle :ip "127.0.0.1" :port port)
(let [^"[Ljava.lang.String;" cmd (into-array String
["node" client-js (str "ws://127.0.0.1:" port)
(str heartbeat-ms)])
proc (-> (ProcessBuilder. cmd) (.inheritIO) (.start))]
(let [proc (launch-client "A")
proc-b (atom nil)]
(try
;; round 1 - result and print travel back over the socket
(wait-for-client! "client to connect")
Expand Down Expand Up @@ -90,13 +96,30 @@
(assert (= "42" (:value (await! ":result (after reconnect)" (:result @signals))))
"reconnected eval returned the wrong value")

;; round 3 - a second client connects and takes over as the eval target;
;; client A stays connected (passive)
(fresh-round!)
(reset! proc-b (launch-client "B"))
(await! ":ready (client B)" (:ready @signals))
(let [ready-at-takeover @ready-count]
(eval! "globalThis.CLIENT_ID")
(assert (= "B" (:value (await! ":result (takeover)" (:result @signals))))
"eval did not target the most recently connected client")
;; both clients keep pinging; with pongs routed to the pinger neither
;; is torn down, so no client reconnects (which would bump ready-count)
(Thread/sleep (long (* 5 heartbeat-ms)))
(assert (= ready-at-takeover @ready-count)
(str "a client reconnected during coexistence (pong misrouting?); "
":ready count went " ready-at-takeover " -> " @ready-count)))

(reset! ok? true)
(println (str "PASS - eval, print, heartbeat (" @ping-count
" pings) and reconnect all verified"))
" pings), reconnect and takeover all verified"))
(catch Throwable e
(println "FAIL:" (.getMessage e)))
(finally
(.destroy proc)
(when-let [^Process p @proc-b] (.destroy p))
(server/stop))))
(when-not @ok?
(System/exit 1))))
132 changes: 102 additions & 30 deletions src/clj/weasel/repl/server.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,53 +3,125 @@
(:import [java.io IOException]))

(defonce state (atom {:server nil
:channel nil ; when the server starts, a
; promise that derefs to a
; channel when a client
; connects
:response-fn nil}))
:clients [] ; connected channels, oldest first
:ready nil ; a promise, realized while at least one
; client is connected and replaced with a
; fresh one once the last client leaves;
; nil while the server is stopped
:response-fn nil ; (fn [channel data])
:on-disconnect nil})) ; (fn [channel])

;; Guards the :clients/:ready invariant (":ready is a realized promise iff
;; :clients is non-empty, and nil iff the server is stopped") so add/remove,
;; start and stop stay consistent.
(def ^:private lock (Object.))

(defn- add-client! [channel]
(locking lock
;; ignore a connection that races server shutdown (:ready is nil when stopped)
(when (:ready @state)
(swap! state update :clients conj channel)
;; idempotent: only the first client of an empty server realizes the promise
(deliver (:ready @state) true))))

(defn- remove-client! [channel]
(let [removed?
(locking lock
(when (some #(identical? % channel) (:clients @state))
(swap! state
(fn [s]
(let [clients (vec (remove #(identical? % channel) (:clients s)))]
(assoc s :clients clients
;; re-arm a fresh promise only while the server runs and
;; the last client just left, so the next eval blocks
;; until a client reconnects
:ready (if (seq clients) (:ready s) (promise))))))
true))]
(when removed?
(when-let [f (:on-disconnect @state)]
(f channel)))))

(defn handler [request]
(if-not (:websocket? request)
{:status 200 :body "Please connect with a websocket!"}
(with-channel request channel
(if (realized? (:channel @state))
(do
(http/send! channel (pr-str {:op :error, :type :occupied}))
(http/close channel))
(do
(deliver (:channel @state) channel)
(on-close channel (fn [_] (swap! state assoc :channel (promise))))
(on-receive channel (:response-fn @state)))))))
;; multiple clients may connect; the most recent one wins evaluations,
;; while the others stay connected so their prints still reach the REPL
(on-receive channel (fn [data]
(when-let [f (:response-fn @state)]
(f channel data))))
(add-client! channel)
;; register the close handler only after adding the client: http-kit
;; invokes it synchronously if the socket is already closed, so this
;; removes a socket that dropped mid-handshake instead of leaving it as a
;; zombie in :clients
(on-close channel (fn [_] (remove-client! channel))))))

(defn active-channel
"Blocks until at least one client is connected, then returns the channel that
should receive evaluations: the most recently connected one. Throws once the
server is stopped."
[]
(loop []
(if-let [ready (:ready @state)]
(do
(deref ready)
(or (peek (:clients @state)) (recur)))
(throw (IOException. "WebSocket server not started!")))))

(defn send-to!
"Sends `msg` to a specific `channel`. Returns false when the channel is
already closed."
[channel msg]
(http/send! channel msg))

(defn send!
"Sends `msg` to the active (most recently connected) client, blocking until a
client is connected."
[msg]
(if-let [channel (:channel @state)]
(http/send! (deref channel) msg)
(throw (IOException. "WebSocket server not started!"))))
(send-to! (active-channel) msg))

(defn ready
"Returns the promise that is realized while at least one client is connected."
[]
(:ready @state))

(defn channel []
(:channel @state))
(defn on-disconnect!
"Registers a one-arg function invoked with a channel whenever a client
disconnects."
[f]
(swap! state assoc :on-disconnect f))

(defn start
[f & {:keys [ip port] :as opts}]
{:pre [(ifn? f)]}
(swap! state
assoc :server (http/run-server #'handler opts)
:channel (promise)
:response-fn f))
(locking lock
(swap! state assoc
:server (http/run-server #'handler opts)
:clients []
:ready (promise)
:response-fn f
:on-disconnect nil)))

(defn stop []
(let [stop-server (:server @state)]
(when-not (nil? stop-server)
(stop-server)
(reset! state {:server nil
:channel nil
:response-fn nil})
@state)))
(locking lock
(let [stop-server (:server @state)
ready (:ready @state)]
(when-not (nil? stop-server)
(stop-server)
;; wake anything blocked in active-channel/wait-for-client so it can
;; observe the stopped server and bail out cleanly
(when ready (deliver ready true))
(reset! state {:server nil
:clients []
:ready nil
:response-fn nil
:on-disconnect nil})
@state))))

(defn wait-for-client []
(deref (:channel @state))
(when-let [ready (:ready @state)]
(deref ready))
nil)

(defn restart []
Expand Down
59 changes: 41 additions & 18 deletions src/clj/weasel/repl/websocket.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@
"stores the value of *out* when the server is started"
(atom nil))

(def ^:private client-response
"stores a promise fulfilled by a client's eval response"
(def ^:private pending-eval
"the outstanding evaluation as {:channel ch :promise p}, or nil. Keeping the
target channel and its result promise in one atom means they are always read
and written together, so only the client an eval was sent to can satisfy it."
(atom nil))

(declare
send-for-eval!
websocket-setup-env
websocket-eval
load-javascript
websocket-tear-down-env
on-client-disconnect
transitive-deps)

(defrecord WebsocketEnv []
Expand All @@ -37,14 +39,25 @@
:port 9001}
opts))

(def ^:private disconnect-result
{:status :exception
:value "Weasel client disconnected before returning a result"
:stacktrace "No stacktrace available."})

(defn- deliver-if-active!
"Delivers `value` to the outstanding evaluation, but only if it was sent to
`channel` - so a stale or foreign message can't satisfy the wrong eval."
[channel value]
(when-let [{:keys [promise] ch :channel} @pending-eval]
(when (= channel ch)
(deliver promise value))))

(defmulti ^:private process-message (fn [_ msg] (:op msg)))

(defmethod process-message
:result
[_ message]
(let [result (:value message)]
(when-not (nil? @client-response)
(deliver @client-response result))))
[channel message]
(deliver-if-active! channel (:value message)))

(defmethod process-message
:print
Expand All @@ -59,20 +72,28 @@

(defmethod process-message
:ping
[_ _]
(server/send! (pr-str {:op :pong})))
[channel _]
;; pong back to the client that pinged, not the active one
(server/send-to! channel (pr-str {:op :pong})))

(defmethod process-message
:default
[_ _])

(defn- on-client-disconnect
"Unblocks an outstanding evaluation when the client it was sent to drops, so
the REPL reports an error instead of hanging forever."
[channel]
(deliver-if-active! channel disconnect-result))

(defn- websocket-setup-env
[this opts]
(reset! repl-out *out*)
(server/start
(fn [data] (process-message this (read-string data)))
(fn [channel data] (process-message channel (read-string data)))
:ip (:ip this)
:port (:port this))
(server/on-disconnect! on-client-disconnect)
(let [{:keys [ip pre-connect]} this]
(let [port (-> @server/state :server meta :local-port)]
(println (str "<< started Weasel server on ws://" ip ":" port " >>")))
Expand All @@ -90,16 +111,18 @@

(defn- websocket-eval
[js]
(reset! client-response (promise))
(send-for-eval! js)
(let [ret @@client-response]
(reset! client-response nil)
ret))
(let [channel (server/active-channel)
p (promise)]
(reset! pending-eval {:channel channel :promise p})
;; if the channel is already closed the message is silently dropped, so
;; surface that immediately rather than waiting for a result that won't come
(when (false? (server/send-to! channel (pr-str {:op :eval-js, :code js})))
(deliver p disconnect-result))
(let [ret @p]
(reset! pending-eval nil)
ret)))

(defn- load-javascript
[_ provides _]
(websocket-eval
(str "goog.require('" (cmp/munge (first provides)) "')")))

(defn- send-for-eval! [js]
(server/send! (pr-str {:op :eval-js, :code js})))
Loading
Loading