diff --git a/CHANGES.md b/CHANGES.md index 47497ac..366ee90 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -29,6 +29,11 @@ silently dead connection and triggers a reconnect. It is off by default and enabled via the `:heartbeat-interval` option; it never disrupts a server that doesn't answer pings. +* The server no longer rejects a second client. Several clients may be connected + at once; evaluations go to the most recently connected one (so a new client + takes over the REPL) while the others stay connected and their output still + reaches the REPL. An evaluation whose target client disconnects mid-flight now + reports an error instead of hanging the REPL. * Ship a `deps.edn` so the library can be consumed via the Clojure CLI / tools.deps. * Add a GitHub Actions CI pipeline and a basic test suite, including a Node round-trip integration test that exercises the full eval cycle over a real diff --git a/README.md b/README.md index 5cf51c4..aa764e2 100644 --- a/README.md +++ b/README.md @@ -141,9 +141,11 @@ java.io.IOException: No client connected to Websocket nil ``` -Only a single client can be connected to the REPL at once. Attempting -to connect to an occupied REPL server will throw an exception in the -client. +More than one client may be connected at once. Evaluations are sent to the +most recently connected client, so a newly connected client takes over the +REPL; the others stay connected and their printed output still reaches the +REPL. This pairs naturally with auto-reconnect - a client that drops and +comes back simply becomes the active one again. ## Example diff --git a/dev/weasel/integration.clj b/dev/weasel/integration.clj index b4d8509..dcb5709 100644 --- a/dev/weasel/integration.clj +++ b/dev/weasel/integration.clj @@ -6,6 +6,7 @@ * the heartbeat (the client keeps pinging, the server pongs, and the connection is not torn down) * auto-reconnect after the server is bounced + * takeover: a second client becomes the active eval target Run with: @@ -26,7 +27,7 @@ (defn- fresh-round! [] (reset! signals {:ready (promise) :printed (promise) :result (promise)})) -(defn- handle [data] +(defn- handle [channel data] (let [msg (edn/read-string data) {:keys [ready printed result]} @signals] (case (:op msg) @@ -34,7 +35,8 @@ :print (deliver printed (:value msg)) :result (deliver result (:value msg)) :ping (do (swap! ping-count inc) - (server/send! (pr-str {:op :pong}))) + ;; pong back to the client that pinged, like the real server + (server/send-to! channel (pr-str {:op :pong}))) nil))) (defn- await! [what p] @@ -46,20 +48,24 @@ (defn- wait-for-client! [what] ;; like server/wait-for-client, but bounded so a broken reconnect fails the ;; run instead of hanging forever - (when (= ::timeout (deref (server/channel) 10000 ::timeout)) + (when (= ::timeout (deref (server/ready) 10000 ::timeout)) (throw (ex-info (str "timed out waiting for " what) {})))) (defn- eval! [code] (server/send! (pr-str {:op :eval-js :code code}))) +(defn- ^Process launch-client [id] + (let [^"[Ljava.lang.String;" cmd (into-array String + ["node" client-js (str "ws://127.0.0.1:" port) + (str heartbeat-ms) id])] + (-> (ProcessBuilder. cmd) (.inheritIO) (.start)))) + (defn -main [& _] (let [ok? (atom false)] (fresh-round!) (server/start handle :ip "127.0.0.1" :port port) - (let [^"[Ljava.lang.String;" cmd (into-array String - ["node" client-js (str "ws://127.0.0.1:" port) - (str heartbeat-ms)]) - proc (-> (ProcessBuilder. cmd) (.inheritIO) (.start))] + (let [proc (launch-client "A") + proc-b (atom nil)] (try ;; round 1 - result and print travel back over the socket (wait-for-client! "client to connect") @@ -90,13 +96,30 @@ (assert (= "42" (:value (await! ":result (after reconnect)" (:result @signals)))) "reconnected eval returned the wrong value") + ;; round 3 - a second client connects and takes over as the eval target; + ;; client A stays connected (passive) + (fresh-round!) + (reset! proc-b (launch-client "B")) + (await! ":ready (client B)" (:ready @signals)) + (let [ready-at-takeover @ready-count] + (eval! "globalThis.CLIENT_ID") + (assert (= "B" (:value (await! ":result (takeover)" (:result @signals)))) + "eval did not target the most recently connected client") + ;; both clients keep pinging; with pongs routed to the pinger neither + ;; is torn down, so no client reconnects (which would bump ready-count) + (Thread/sleep (long (* 5 heartbeat-ms))) + (assert (= ready-at-takeover @ready-count) + (str "a client reconnected during coexistence (pong misrouting?); " + ":ready count went " ready-at-takeover " -> " @ready-count))) + (reset! ok? true) (println (str "PASS - eval, print, heartbeat (" @ping-count - " pings) and reconnect all verified")) + " pings), reconnect and takeover all verified")) (catch Throwable e (println "FAIL:" (.getMessage e))) (finally (.destroy proc) + (when-let [^Process p @proc-b] (.destroy p)) (server/stop)))) (when-not @ok? (System/exit 1)))) diff --git a/src/clj/weasel/repl/server.clj b/src/clj/weasel/repl/server.clj index bf43c4e..fdc02cb 100644 --- a/src/clj/weasel/repl/server.clj +++ b/src/clj/weasel/repl/server.clj @@ -3,53 +3,125 @@ (:import [java.io IOException])) (defonce state (atom {:server nil - :channel nil ; when the server starts, a - ; promise that derefs to a - ; channel when a client - ; connects - :response-fn nil})) + :clients [] ; connected channels, oldest first + :ready nil ; a promise, realized while at least one + ; client is connected and replaced with a + ; fresh one once the last client leaves; + ; nil while the server is stopped + :response-fn nil ; (fn [channel data]) + :on-disconnect nil})) ; (fn [channel]) + +;; Guards the :clients/:ready invariant (":ready is a realized promise iff +;; :clients is non-empty, and nil iff the server is stopped") so add/remove, +;; start and stop stay consistent. +(def ^:private lock (Object.)) + +(defn- add-client! [channel] + (locking lock + ;; ignore a connection that races server shutdown (:ready is nil when stopped) + (when (:ready @state) + (swap! state update :clients conj channel) + ;; idempotent: only the first client of an empty server realizes the promise + (deliver (:ready @state) true)))) + +(defn- remove-client! [channel] + (let [removed? + (locking lock + (when (some #(identical? % channel) (:clients @state)) + (swap! state + (fn [s] + (let [clients (vec (remove #(identical? % channel) (:clients s)))] + (assoc s :clients clients + ;; re-arm a fresh promise only while the server runs and + ;; the last client just left, so the next eval blocks + ;; until a client reconnects + :ready (if (seq clients) (:ready s) (promise)))))) + true))] + (when removed? + (when-let [f (:on-disconnect @state)] + (f channel))))) (defn handler [request] (if-not (:websocket? request) {:status 200 :body "Please connect with a websocket!"} (with-channel request channel - (if (realized? (:channel @state)) - (do - (http/send! channel (pr-str {:op :error, :type :occupied})) - (http/close channel)) - (do - (deliver (:channel @state) channel) - (on-close channel (fn [_] (swap! state assoc :channel (promise)))) - (on-receive channel (:response-fn @state))))))) + ;; multiple clients may connect; the most recent one wins evaluations, + ;; while the others stay connected so their prints still reach the REPL + (on-receive channel (fn [data] + (when-let [f (:response-fn @state)] + (f channel data)))) + (add-client! channel) + ;; register the close handler only after adding the client: http-kit + ;; invokes it synchronously if the socket is already closed, so this + ;; removes a socket that dropped mid-handshake instead of leaving it as a + ;; zombie in :clients + (on-close channel (fn [_] (remove-client! channel)))))) + +(defn active-channel + "Blocks until at least one client is connected, then returns the channel that + should receive evaluations: the most recently connected one. Throws once the + server is stopped." + [] + (loop [] + (if-let [ready (:ready @state)] + (do + (deref ready) + (or (peek (:clients @state)) (recur))) + (throw (IOException. "WebSocket server not started!"))))) + +(defn send-to! + "Sends `msg` to a specific `channel`. Returns false when the channel is + already closed." + [channel msg] + (http/send! channel msg)) (defn send! + "Sends `msg` to the active (most recently connected) client, blocking until a + client is connected." [msg] - (if-let [channel (:channel @state)] - (http/send! (deref channel) msg) - (throw (IOException. "WebSocket server not started!")))) + (send-to! (active-channel) msg)) + +(defn ready + "Returns the promise that is realized while at least one client is connected." + [] + (:ready @state)) -(defn channel [] - (:channel @state)) +(defn on-disconnect! + "Registers a one-arg function invoked with a channel whenever a client + disconnects." + [f] + (swap! state assoc :on-disconnect f)) (defn start [f & {:keys [ip port] :as opts}] {:pre [(ifn? f)]} - (swap! state - assoc :server (http/run-server #'handler opts) - :channel (promise) - :response-fn f)) + (locking lock + (swap! state assoc + :server (http/run-server #'handler opts) + :clients [] + :ready (promise) + :response-fn f + :on-disconnect nil))) (defn stop [] - (let [stop-server (:server @state)] - (when-not (nil? stop-server) - (stop-server) - (reset! state {:server nil - :channel nil - :response-fn nil}) - @state))) + (locking lock + (let [stop-server (:server @state) + ready (:ready @state)] + (when-not (nil? stop-server) + (stop-server) + ;; wake anything blocked in active-channel/wait-for-client so it can + ;; observe the stopped server and bail out cleanly + (when ready (deliver ready true)) + (reset! state {:server nil + :clients [] + :ready nil + :response-fn nil + :on-disconnect nil}) + @state)))) (defn wait-for-client [] - (deref (:channel @state)) + (when-let [ready (:ready @state)] + (deref ready)) nil) (defn restart [] diff --git a/src/clj/weasel/repl/websocket.clj b/src/clj/weasel/repl/websocket.clj index 2fbc02c..bc6b75d 100644 --- a/src/clj/weasel/repl/websocket.clj +++ b/src/clj/weasel/repl/websocket.clj @@ -10,16 +10,18 @@ "stores the value of *out* when the server is started" (atom nil)) -(def ^:private client-response - "stores a promise fulfilled by a client's eval response" +(def ^:private pending-eval + "the outstanding evaluation as {:channel ch :promise p}, or nil. Keeping the + target channel and its result promise in one atom means they are always read + and written together, so only the client an eval was sent to can satisfy it." (atom nil)) (declare - send-for-eval! websocket-setup-env websocket-eval load-javascript websocket-tear-down-env + on-client-disconnect transitive-deps) (defrecord WebsocketEnv [] @@ -37,14 +39,25 @@ :port 9001} opts)) +(def ^:private disconnect-result + {:status :exception + :value "Weasel client disconnected before returning a result" + :stacktrace "No stacktrace available."}) + +(defn- deliver-if-active! + "Delivers `value` to the outstanding evaluation, but only if it was sent to + `channel` - so a stale or foreign message can't satisfy the wrong eval." + [channel value] + (when-let [{:keys [promise] ch :channel} @pending-eval] + (when (= channel ch) + (deliver promise value)))) + (defmulti ^:private process-message (fn [_ msg] (:op msg))) (defmethod process-message :result - [_ message] - (let [result (:value message)] - (when-not (nil? @client-response) - (deliver @client-response result)))) + [channel message] + (deliver-if-active! channel (:value message))) (defmethod process-message :print @@ -59,20 +72,28 @@ (defmethod process-message :ping - [_ _] - (server/send! (pr-str {:op :pong}))) + [channel _] + ;; pong back to the client that pinged, not the active one + (server/send-to! channel (pr-str {:op :pong}))) (defmethod process-message :default [_ _]) +(defn- on-client-disconnect + "Unblocks an outstanding evaluation when the client it was sent to drops, so + the REPL reports an error instead of hanging forever." + [channel] + (deliver-if-active! channel disconnect-result)) + (defn- websocket-setup-env [this opts] (reset! repl-out *out*) (server/start - (fn [data] (process-message this (read-string data))) + (fn [channel data] (process-message channel (read-string data))) :ip (:ip this) :port (:port this)) + (server/on-disconnect! on-client-disconnect) (let [{:keys [ip pre-connect]} this] (let [port (-> @server/state :server meta :local-port)] (println (str "<< started Weasel server on ws://" ip ":" port " >>"))) @@ -90,16 +111,18 @@ (defn- websocket-eval [js] - (reset! client-response (promise)) - (send-for-eval! js) - (let [ret @@client-response] - (reset! client-response nil) - ret)) + (let [channel (server/active-channel) + p (promise)] + (reset! pending-eval {:channel channel :promise p}) + ;; if the channel is already closed the message is silently dropped, so + ;; surface that immediately rather than waiting for a result that won't come + (when (false? (server/send-to! channel (pr-str {:op :eval-js, :code js}))) + (deliver p disconnect-result)) + (let [ret @p] + (reset! pending-eval nil) + ret))) (defn- load-javascript [_ provides _] (websocket-eval (str "goog.require('" (cmp/munge (first provides)) "')"))) - -(defn- send-for-eval! [js] - (server/send! (pr-str {:op :eval-js, :code js}))) diff --git a/test/clj/weasel/repl/server_test.clj b/test/clj/weasel/repl/server_test.clj index 9fa2623..3de1892 100644 --- a/test/clj/weasel/repl/server_test.clj +++ b/test/clj/weasel/repl/server_test.clj @@ -5,19 +5,51 @@ (deftest start-and-stop (testing "starting the server populates the shared state" (try - (server/start (fn [_]) :ip "127.0.0.1" :port 0) + (server/start (fn [& _]) :ip "127.0.0.1" :port 0) (is (some? (:server @server/state)) "a stop fn is stored") - (is (instance? clojure.lang.IPending (:channel @server/state)) + (is (instance? clojure.lang.IPending (:ready @server/state)) "a pending client promise is stored") (finally (server/stop)))) (testing "stopping the server clears the shared state" (is (nil? (:server @server/state))) - (is (nil? (:channel @server/state))) - (is (nil? (:response-fn @server/state))))) + (is (nil? (:ready @server/state))) + (is (nil? (:response-fn @server/state))) + (is (empty? (:clients @server/state))))) (deftest send-without-server-throws (testing "sending with no running server raises an IOException" (server/stop) (is (thrown? java.io.IOException (server/send! "anything"))))) + +(deftest most-recent-client-wins + (testing "the active channel is the most recently connected client" + (server/start (fn [& _]) :ip "127.0.0.1" :port 0) + (try + (#'server/add-client! :client-a) + (is (= :client-a (server/active-channel))) + (#'server/add-client! :client-b) + (is (= :client-b (server/active-channel)) "newest client takes over") + (#'server/remove-client! :client-b) + (is (= :client-a (server/active-channel)) "falls back to the remaining client") + (finally + (server/stop))))) + +(deftest stop-wakes-blocked-waiter + (testing "stopping the server unblocks a thread waiting for a client" + (server/start (fn [& _]) :ip "127.0.0.1" :port 0) + (let [waiter (future (server/wait-for-client) :woke)] + (Thread/sleep 100) + (is (not (realized? waiter)) "blocks while no client is connected") + (server/stop) + (is (= :woke (deref waiter 1000 ::timeout)) "stop wakes the waiter")))) + +(deftest stale-disconnect-after-stop-leaves-server-stopped + (testing "a late client close after stop does not re-arm the readiness promise" + (server/start (fn [& _]) :ip "127.0.0.1" :port 0) + (#'server/add-client! :client-a) + (server/stop) + (#'server/remove-client! :client-a) ; the channel's on-close firing late + (is (nil? (:ready @server/state)) "the server stays stopped") + (is (thrown? java.io.IOException (server/active-channel))))) diff --git a/test/clj/weasel/repl/websocket_test.clj b/test/clj/weasel/repl/websocket_test.clj index 4619a36..e6ae7fd 100644 --- a/test/clj/weasel/repl/websocket_test.clj +++ b/test/clj/weasel/repl/websocket_test.clj @@ -1,12 +1,43 @@ (ns weasel.repl.websocket-test - (:require [clojure.test :refer [deftest is testing]] + (:require [clojure.test :refer [deftest is testing use-fixtures]] [clojure.edn :as edn] [weasel.repl.server :as server] [weasel.repl.websocket :as websocket])) +(defn- reset-eval-state! [] + (reset! @#'websocket/pending-eval nil)) + +(use-fixtures :each (fn [t] (reset-eval-state!) (t) (reset-eval-state!))) + (deftest ping-is-answered-with-pong - (testing "a :ping message makes the server send a :pong back" + (testing "a :ping is answered with a :pong sent back to the pinging client" (let [sent (atom nil)] - (with-redefs [server/send! (fn [msg] (reset! sent msg))] - (#'websocket/process-message ::ignored {:op :ping})) - (is (= {:op :pong} (edn/read-string @sent)))))) + (with-redefs [server/send-to! (fn [channel msg] (reset! sent {:channel channel :msg msg}))] + (#'websocket/process-message :the-channel {:op :ping})) + (is (= :the-channel (:channel @sent)) "pong goes to the client that pinged") + (is (= {:op :pong} (edn/read-string (:msg @sent))))))) + +(deftest result-correlation + (testing "only the channel an eval was sent to may answer it" + (let [response (promise)] + (reset! @#'websocket/pending-eval {:channel :ch-a :promise response}) + (#'websocket/process-message :ch-b {:op :result :value "wrong"}) + (is (not (realized? response)) "a foreign client's result is ignored") + (#'websocket/process-message :ch-a {:op :result :value "right"}) + (is (= "right" (deref response 100 ::timeout)) "the eval's client answers it")))) + +(deftest disconnect-unblocks-pending-eval + (testing "the eval's client disconnecting unblocks the pending eval" + (let [response (promise)] + (reset! @#'websocket/pending-eval {:channel :ch-a :promise response}) + (#'websocket/on-client-disconnect :ch-b) + (is (not (realized? response)) "an unrelated disconnect leaves the eval pending") + (#'websocket/on-client-disconnect :ch-a) + (is (= :exception (:status (deref response 100 ::timeout))) + "the eval reports an exception instead of hanging")))) + +(deftest eval-to-closed-channel-errors + (testing "evaluating against an already-closed client returns an error, not a hang" + (with-redefs [server/active-channel (fn [] :closed-channel) + server/send-to! (fn [_ _] false)] + (is (= :exception (:status (#'websocket/websocket-eval "1 + 1"))))))) diff --git a/test/cljs/weasel/node_client.cljs b/test/cljs/weasel/node_client.cljs index 1d7cbb6..90be099 100644 --- a/test/cljs/weasel/node_client.cljs +++ b/test/cljs/weasel/node_client.cljs @@ -2,12 +2,18 @@ "A tiny Node entry point used by the integration test to prove that the REPL client works outside the browser, on a native `WebSocket`. - Args: [heartbeat-interval-ms]" + Args: [heartbeat-interval-ms] [client-id] + + When a client-id is given it is stashed on the global object so the test can + evaluate `globalThis.CLIENT_ID` and confirm which client handled the eval." (:require [weasel.repl :as repl])) (defn -main [& args] (let [url (or (first args) "ws://127.0.0.1:9001") - hb (some-> (second args) (js/parseInt 10))] + hb (some-> (second args) (js/parseInt 10)) + id (nth args 2 nil)] + (when id + (set! (.-CLIENT_ID js/globalThis) id)) (apply repl/connect url :verbose false (if hb [:heartbeat-interval hb] []))))