Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle ZK connection state and show in the Dashboard #63

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

; onyx deps
^{:voom {:repo "[email protected]:onyx-platform/onyx.git" :branch "master"}}
[org.onyxplatform/onyx "0.9.12"]
[org.onyxplatform/onyx "0.9.12" :exclusions [org.slf4j/slf4j-nop]]
[org.onyxplatform/lib-onyx "0.9.10.0" :exclusions [ring-jetty-component org.onyxplatform/onyx]]
[org.onyxplatform/onyx-visualization "0.4.0"]

Expand Down
25 changes: 25 additions & 0 deletions src/clj/onyx_dashboard/channels.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
(ns onyx-dashboard.channels
(:require [clojure.core.async :refer [close! chan]]
[com.stuartsierra.component :as component]))

; Responsibilities
; - hold channels
; Useable to make duplex comunication between components
; CompA -> chan1 -> CompB
; CompB <- chan2 <- CompB
(defrecord Channels []
component/Lifecycle
(start [component]
(println "Starting Channels")
(let [cmds-deployments-ch (chan 100)]

(assoc component
:cmds-deployments-ch cmds-deployments-ch)))

(stop [{:keys [cmds-deployments-ch] :as component}]
(println "Stopping Channels")
(when cmds-deployments-ch (close! cmds-deployments-ch))
(assoc component :cmds-deployments-ch nil)))

(defn new-channels []
(map->Channels {}))
91 changes: 91 additions & 0 deletions src/clj/onyx_dashboard/http/deployments.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
(ns onyx-dashboard.http.deployments
(:require [clojure.core.async :refer [close! chan go-loop <!]]
[com.stuartsierra.component :as component]
[onyx.log.curator :as zk]
[onyx.log.zookeeper :as zk-onyx]))

(defn zk-deployment-entry-stat [zk-client entry]
(:stat (zk/data zk-client (zk-onyx/prefix-path entry))))

(defn distribute-deployment-listing [send-all-fn! listing]
(send-all-fn! [:deployment/listing listing]))

; stops when ZK unreachable
(defn deployments-watch [send-all-fn! zk-client deployments]
(try
(loop []
(if-not (Thread/interrupted)
(try
(if-let [children (zk/children zk-client zk-onyx/root-path)]
(do (->> children
(map (juxt identity
(partial zk-deployment-entry-stat zk-client)))
(map (fn [[child stat]]
(vector child
{:created-at (java.util.Date. (:ctime stat))
:modified-at (java.util.Date. (:mtime stat))})))
(into {})
(reset! deployments)
(distribute-deployment-listing send-all-fn!))
(Thread/sleep 1000))
(do
(println (format "Could not find deployments at %s. Retrying in 1s." zk-onyx/root-path))
(Thread/sleep 1000)))
; if we connected to early try again
(catch org.apache.zookeeper.KeeperException$NoNodeException e
(do
(println (format "Could not find deployments at %s. Retrying in 1s." zk-onyx/root-path))
(Thread/sleep 1000)))))
(recur))
(catch java.lang.IllegalStateException e)
(catch org.apache.zookeeper.KeeperException$ConnectionLossException e
(println (format "ZK connection lost at %s. Deployments watch stopped." zk-onyx/root-path)))
(catch Throwable t
(println "Error :" t))
)
)

(defn start-deployments-watch [sente zk deployments]
(future (deployments-watch (-> sente :send-mult)
(-> zk :zk-client)
deployments)))
; Responsibilities
; - watch for deployments
; - restart watch after zk reconnected
(defrecord Deployments []
component/Lifecycle
(start [{:keys [channels sente zk] :as component}]
(println "Starting Deployments")
(let [into-br (-> sente :chsk-send!)
cmds-ch (-> channels :cmds-deployments-ch)
deployments (atom {})
deployments-watch (start-deployments-watch sente zk deployments)
cmds (go-loop []
(when-let [[cmd data] (<! cmds-ch)]
(do (case cmd
:deployment/listing
(let [user-id (-> data :user-id)]
(into-br user-id [:deployment/listing @deployments]))

:restart
(do (when deployments-watch (future-cancel deployments-watch))
(assoc component
:deployments-watch (start-deployments-watch sente zk deployments))))
(recur))))]
(assoc component
:deployments deployments
:deployments-watch deployments-watch
:cmds cmds)))

(stop [{:keys [deployments-watch cmds] :as component}]
(println "Stopping Deployments")

(when cmds (close! cmds))
(when deployments-watch (future-cancel deployments-watch))

(assoc component :cmds nil
:deployments nil
:deployments-watch nil)))

(defn new-deployments []
(map->Deployments {}))
32 changes: 25 additions & 7 deletions src/clj/onyx_dashboard/http/sente.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,41 @@

(def packer (sente-transit/get-flexi-packer :edn))

(defn send-mult-fn [send-fn! connected-uids msg]
(doseq [uid (:any @connected-uids)]
(send-fn! uid msg)))

; Responsibilities
; create websocket for backend
; create functions for send/receive data via websocket
(defrecord Sente []
component/Lifecycle
(start [component]
(println "Starting Sente")
(let [x (make-channel-socket!
taoensso.sente.server-adapters.http-kit/http-kit-adapter
{:user-id-fn user-id-fn :packer packer})]
{:user-id-fn user-id-fn :packer packer})

ring-ajax-post (-> x :ajax-post-fn)
ring-ajax-get-or-ws-handshake (-> x :ajax-get-or-ws-handshake-fn)
ch-chsk (-> x :ch-recv)
chsk-send! (-> x :send-fn)
connected-uids (-> x :connected-uids)

send-mult (partial send-mult-fn
chsk-send!
connected-uids)]
(assoc component
:ring-ajax-post (:ajax-post-fn x)
:ring-ajax-get-or-ws-handshake (:ajax-get-or-ws-handshake-fn x)
:ch-chsk (:ch-recv x)
:chsk-send! (:send-fn x)
:connected-uids (:connected-uids x))))
:ring-ajax-post ring-ajax-post
:ring-ajax-get-or-ws-handshake ring-ajax-get-or-ws-handshake
:ch-chsk ch-chsk
:chsk-send! chsk-send!
:connected-uids connected-uids
:send-mult send-mult)))

(stop [component]
(println "Stopping Sente")
(close! (:ch-chsk component))
(close! (-> component :ch-chsk))
(assoc component :server nil)))

(defn sente []
Expand Down
88 changes: 51 additions & 37 deletions src/clj/onyx_dashboard/http/server.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(ns onyx-dashboard.http.server
(:require [clojure.core.async :refer [chan timeout thread <!! alts!!]]
(:require [clojure.core.async :refer [chan timeout thread <!! >!! alts!! go-loop <! >! close! go]]
[onyx-dashboard.dev :refer [is-dev? inject-devmode-html #_browser-repl start-figwheel]]
[org.httpkit.server :as http-kit-server]
[com.stuartsierra.component :as component]
Expand All @@ -12,7 +12,6 @@
[ring.util.response :refer [resource-response response content-type]]
[compojure.core :as comp :refer (defroutes GET POST)]
[compojure.route :as route]
[com.stuartsierra.component :as component]
[onyx-dashboard.tenancy :as tenancy]
[onyx.log.curator :as zk]
[taoensso.timbre :as timbre :refer [info error spy]]))
Expand All @@ -28,22 +27,38 @@
(defn event->uid [event]
(get-in event [:ring-req :cookies "ring-session" :value]))

(defn start-event-handler [sente peer-config deployments tracking]
(future
(loop []
; WebSockets events send from browser
; Responsibilities
; - receive events from browser WS and call other components to handle it
(defn events-from-browser [sente peer-config channels-comp tracking zk-comp]
(future
(loop []
(when-let [event (<!! (:ch-chsk sente))]
;; TODO: more sophisticated tracking,
;; should track by cluster id rather than user
;; and count the number of users tracking it. When the user count drops to 0
;; then stop the future.
(let [user-id (event->uid event)]
(case (:id event)
(let [user-id (event->uid event)
{:keys [zk-client notify-zc-ch]} zk-comp
{:keys [cmds-deployments-ch]} channels-comp]

(case (:id event)
:chsk/uidport-open (go (>! notify-zc-ch [:attach-browser-notify {:user-id user-id}]))
:chsk/uidport-close (do (swap! tracking tenancy/stop-tracking! user-id)
(go (>! notify-zc-ch [:remove-browser-notify {:user-id user-id}])))

:deployment/track (tenancy/start-tracking! (:chsk-send! sente)
peer-config
tracking
(:?data event)
user-id)
:deployment/get-listing ((:chsk-send! sente) user-id [:deployment/listing @deployments])
user-id
zk-client)

:deployment/get-listing (do
;((:chsk-send! sente) user-id [:deployment/listing @deployments])
(println "Send listing into channel:" cmds-deployments-ch)
(go (>! cmds-deployments-ch [:deployment/listing {:user-id user-id}]))
(go (>! notify-zc-ch [:browser-refresh-zk-conn {:user-id user-id}])))
:job/kill (tenancy/kill-job peer-config
(:deployment-id (:?data event))
(:job (:?data event)))
Expand All @@ -53,22 +68,15 @@
:job/restart (tenancy/restart-job peer-config
(:deployment-id (:?data event))
(:job (:?data event)))
:chsk/uidport-close (swap! tracking tenancy/stop-tracking! user-id)
:chsk/ws-ping nil
nil #_(println "Dunno what to do with: " event)))
(recur)))))

(defn send-mult-fn [send-fn! connected-uids msg]
(doseq [uid (:any @connected-uids)]
(send-fn! uid msg)))

(defrecord HttpServer [peer-config]
component/Lifecycle
(start [{:keys [sente] :as component}]
(start [{:keys [channels sente zk deployments] :as component}]
(println "Starting HTTP Server")
(let [send-f (partial send-mult-fn
(:chsk-send! sente)
(:connected-uids sente))]
(let []
(defroutes routes
(GET "/" [] (page))
(GET "/chsk" req ((:ring-ajax-get-or-ws-handshake sente) req))
Expand All @@ -77,37 +85,43 @@
(resources "/react" {:root "react"})
(route/not-found "Page not found"))

(let [deployments (atom {})
tracking (atom {})
event-handler-fut (start-event-handler sente peer-config deployments tracking)
(let [tracking (atom {})

; server
handler (ring.middleware.defaults/wrap-defaults routes ring-defaults-config)
port (Integer. (or (System/getenv "PORT") "3000"))
server (http-kit-server/run-server handler {:port port})
uri (format "http://localhost:%s/" (:local-port (meta server)))
conn (zk/connect (:zookeeper/address peer-config))
refresh-fut (future (tenancy/refresh-deployments-watch send-f conn deployments))]

zk-client (-> zk :zk-client)
; futures
events-from-browser (events-from-browser sente peer-config channels tracking zk)
]
(println "Http-kit server is running at" uri)
(assoc component
:conn conn
:server server
:refresh-fut refresh-fut
:event-handler-fut event-handler-fut
:deployments deployments
:tracking tracking))))
(stop [{:keys [server tracking deployments conn] :as component}]
:zk-client zk-client
:server server
:events-from-browser events-from-browser
:deployments deployments
:tracking tracking))))

(stop [{:keys [server tracking deployments zk-client events-from-browser deployments-watch] :as component}]
(println "Stopping HTTP Server")
(try
(try
(server :timeout 100)
(finally
(try
(swap! tracking tenancy/stop-all-tracking!)
(finally
(try
(future-cancel (:event-handler-fut component))
(future-cancel (:refresh-fut component))
(finally
(if (.. conn isStarted) (zk/close conn))))))))
(assoc component :server nil :event-handler-fut nil :deployments nil :tracking nil)))
(try
(when events-from-browser (future-cancel events-from-browser)))))))
(assoc component
:zk-client nil
:server nil
:events-from-browser nil
:deployments-watch nil
:deployments nil
:tracking nil)))

(defn new-http-server [peer-config]
(map->HttpServer {:peer-config peer-config}))
Loading