Skip to content

Commit

Permalink
Add & integrate open-telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
madis committed Apr 3, 2024
1 parent 9fecc59 commit 64fbf27
Show file tree
Hide file tree
Showing 8 changed files with 709 additions and 24 deletions.
6 changes: 6 additions & 0 deletions server/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
{
"dependencies": {
"@sentry/node": "7.31.1",
"@opentelemetry/context-zone": "^1.22.0",
"@opentelemetry/exporter-trace-otlp-http": "^0.49.1",
"@opentelemetry/resources": "^1.22.0",
"@opentelemetry/sdk-node": "^0.49.1",
"@opentelemetry/sdk-trace-base": "^1.22.0",
"@opentelemetry/semantic-conventions": "^1.22.0",
"apollo-server-express": "2.12.0",
"axios": "1.3.3",
"better-sqlite3": "7.6.2",
Expand Down
1 change: 1 addition & 0 deletions server/src/ethlance/server/core.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
[district.server.db.honeysql-extensions]
[district.server.logging]
[district.server.smart-contracts]
[ethlance.server.tracing.core]
[district.server.web3]
[district.server.web3-events]
[district.shared.async-helpers :as async-helpers :refer [safe-go]]
Expand Down
75 changes: 52 additions & 23 deletions server/src/ethlance/server/syncer.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
[ethlance.server.db :as ethlance-db]
[ethlance.server.event-replay-queue :as replay-queue]
[ethlance.server.ipfs :as ipfs]
[ethlance.server.tracing.api :as t-api]
[ethlance.server.tracing.macros :refer-macros [go! safe-go!]]
[ethlance.server.utils :as server-utils]
[ethlance.shared.contract-constants :refer [offered-vec->flat-map
enum-val->token-type]]
Expand Down Expand Up @@ -64,28 +66,41 @@

(defn handle-job-created
[conn _ {:keys [args] :as event}]
(safe-go
(log/info (str ">>> Handling event job-created" args))
(safe-go!
(log/info (str ">>> Handling event job-created" args " | " (t-api/get-active-span)))
(println ">>> ipfs-data | type ipfs-data" {:ipfs-data (:ipfs-data args) :event event})
(let [ipfs-hash (shared-utils/hex->base58 (:ipfs-data args))
(let [span (t-api/get-active-span)
ipfs-hash (shared-utils/hex->base58 (:ipfs-data args))
ipfs-job-content (<? (server-utils/get-ipfs-meta @ipfs/ipfs ipfs-hash))
offered-value (offered-vec->flat-map (first (:offered-values args)))
token-address (:token-address offered-value)
token-type (enum-val->token-type (:token-type offered-value))
token-amount (:token-amount offered-value)
for-the-db (merge {:job/id (:job args)
:job/status "active" ; draft -> active -> finished hiring -> closed
:job/creator (:creator args)
:job/date-created (get-timestamp event)
:job/date-updated (get-timestamp event)

:job/token-type token-type
:job/token-amount (:token-amount offered-value)
:job/token-amount token-amount
:job/token-address token-address
:job/token-id (:token-id offered-value)
:invited-arbiters (get args :invited-arbiters [])}
(build-ethlance-job-data-from-ipfs-object ipfs-job-content))]
(<? (ensure-db-token-details token-type token-address conn))
(<? (ethlance-db/add-job conn for-the-db)))))
(t-api/add-event! span "job-details" {:ipfs-hash ipfs-hash :job-id (:job args) :token-type token-type :token-amount token-amount})
(t-api/start-active-span
"ensure-token-details"
(fn [span]
(go!
(<? (ensure-db-token-details token-type token-address conn))
(t-api/end-span! span))))
(t-api/start-active-span
"db.add-job"
(fn [span]
(go!
(<? (ethlance-db/add-job conn for-the-db))
(t-api/end-span! span)))))))


(defn handle-invoice-created
Expand Down Expand Up @@ -279,25 +294,34 @@

(defn handle-job-funds-change
[movement-sign-fn conn _ {:keys [args] :as event}]
(safe-go
(log/info (str "handle-job-funds-change outflow?" (quote movement-sign-fn) " | event: " event))
(let [funds (:funds args)
(safe-go!
(log/info (str "handle-job-funds-change"))
(let [span (t-api/get-active-span)
funds (:funds args)
funds-map (map offered-vec->flat-map funds)
job-id (:job args)
funding-base {:tx (:transaction-hash event)
:job/id job-id
:job-funding/created-at (get-timestamp)}
extract-token-info (fn [funds]
extract-token-info (fn [funds]
[(-> funds :token-type enum-val->token-type)
(:token-address funds)])
tokens-info (map extract-token-info funds-map)
funding-updates (map (fn [tv]
(merge funding-base {:job-funding/amount (movement-sign-fn (:token-amount tv))
:token-detail/id (:token-address tv)}))
(merge
funding-base
{:job-funding/amount (movement-sign-fn (:token-amount tv))
:token-detail/id (:token-address tv)}))
funds-map)]
(doseq [[token-type token-address] tokens-info] (ensure-db-token-details token-type token-address conn))
(doseq [funding funding-updates]
(ethlance-db/insert-row! conn :JobFunding funding :ignore-conflict-on [:tx])))))
(t-api/add-event! span "funding-details" funding)
(t-api/start-active-span
"add-funding-to-db"
(fn [span]
(go!
(<? (ethlance-db/insert-row! conn :JobFunding funding :ignore-conflict-on [:tx]))
(t-api/end-span! span))))))))


(defn handle-test-event
Expand Down Expand Up @@ -334,34 +358,39 @@
{}
web3-events-map)]
(fn [err {:keys [:block-number] :as event}]
(safe-go
(safe-go!
(let [contract-key (-> event :contract :contract-key)
event-key (-> event :event)
handler (get contract-ev->handler [contract-key event-key])
span (t-api/start-span (str (name contract-key) "." (name event-key)))
conn (<? (db/get-connection))]
(println ">>> syncer DISPATCHER handling" {:contract-key contract-key :event-key event-key :handler handler :event event})
(try
(let [block-timestamp (<? (block-timestamp block-number))
event (-> event
(update :event camel-snake-kebab/->kebab-case)
(update-in [:args :version] bn/number)
(update-in [:args :timestamp] (fn [timestamp]
(if timestamp
(bn/number timestamp)
block-timestamp))))
(update ,,, :event camel-snake-kebab/->kebab-case)
(update-in ,,, [:args :version] bn/number)
(update-in ,,, [:args :timestamp] (fn [timestamp]
(if timestamp
(bn/number timestamp)
block-timestamp))))
_ (db/begin-tx conn)
res (handler conn err event)
res (t-api/with-span-context span #(handler conn err event))
_ (db/commit-tx conn)]
(t-api/set-span-ok! span)
;; Calling a handler can throw or return a go block (when using safe-go)
;; in the case of async ones, the go block will return the js/Error.
;; In either cases push the event to the queue, so it can be replayed later
(when (satisfies? ReadPort res)
(let [r (<! res)]
(when (instance? js/Error r)
(throw r))))
(throw r))
(t-api/set-span-ok! span)
(t-api/end-span! span)))
res)
(catch js/Error error
(replay-queue/push-event conn event)
(t-api/set-span-error! span error)
(t-api/end-span! span)
(db/rollback-tx conn)
(throw error))
(finally
Expand Down
66 changes: 66 additions & 0 deletions server/src/ethlance/server/tracing/api.cljs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
(ns ethlance.server.tracing.api
(:require
["@opentelemetry/api" :refer [trace context SpanStatusCode ROOT_CONTEXT]]))

(def SPAN_STATUS_OK (.-OK SpanStatusCode))
(def SPAN_STATUS_ERROR (.-ERROR SpanStatusCode))

(defn set-span-context! [span]
(.setSpan trace (.active context) span))

(defn active-context []
(.active context))

(defn set-span-attributes!
[span attributes]
(doseq [[k v] attributes] (.setAttribute span k v))
span)

(defonce tracer (.getTracer trace "ethl-dev" "0.0.1"))

(defn start-span
([span-name]
(start-span span-name nil (active-context)))
([span-name attributes]
(start-span span-name attributes (active-context)))
([span-name attributes context]
(.startSpan tracer span-name (clj->js {"attributes" attributes}) context)))

(defn start-nested-span [parent span-name & [attributes]]
(let [ctx (set-span-context! parent)
span (start-span span-name attributes ctx)]
(set-span-attributes! span attributes)))

(defn end-span! [span]
(.end span))

(defn add-event! [span event-name k-v-map]
(.addEvent span event-name (clj->js k-v-map)))

(defn start-active-span [span-name callback]
(.startActiveSpan tracer span-name callback))

(defn with-active-span [span-name callback]
(-> (start-active-span span-name callback)
(end-span! ,,,)))

(defn get-active-span
([]
(.getActiveSpan trace)))

(defn set-span-error!
[span error & [message]]
(doto span
(.setStatus ,,, (clj->js {"code" SPAN_STATUS_ERROR "message" message}))
(.recordException ,,, error)))

(defn set-span-ok!
[span]
(.setStatus span (clj->js {"code" SPAN_STATUS_OK})))

(defn with-context
[provided-context fn-to-call]
(.with context provided-context fn-to-call js/undefined))

(defn with-span-context [span fn-to-call]
(with-context (set-span-context! span) fn-to-call))
28 changes: 28 additions & 0 deletions server/src/ethlance/server/tracing/macros.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
(ns ethlance.server.tracing.macros
(:require
[taoensso.timbre]
[cljs.core.async.impl.ioc-macros :as ioc]
[district.shared.error-handling]))

(defmacro go!
"just like go, just executes immediately until the first put/take"
[& body]
`(let [c# (cljs.core.async/chan 1)
f# ~(ioc/state-machine body 1 &env ioc/async-custom-terminators)
state# (-> (f#)
(ioc/aset-all! cljs.core.async.impl.ioc-helpers/USER-START-IDX c#))]
(cljs.core.async.impl.ioc-helpers/run-state-machine state#)
c#))

(defmacro safe-go! [& body]
`(go!
(try
~@body
(catch :default e#
(when-let [span# (ethlance.server.tracing.api/get-active-span)]
(ethlance.server.tracing.api/set-span-error! span# e#))
(taoensso.timbre/error "Go block exception"
(merge {:error e#}
(ex-data e#)
~(district.shared.error-handling/compiletime-info &env &form *ns*)))
e#))))
2 changes: 2 additions & 0 deletions server/src/ethlance/server/tracing/macros.cljs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
(ns ethlance.server.tracing.macros
(:require-macros [ethlance.server.tracing.macros]))
57 changes: 57 additions & 0 deletions server/src/ethlance/server/tracing/setup.cljs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
(ns ethlance.server.tracing.setup
(:require
["@opentelemetry/sdk-node" :refer [NodeSDK]]
["@opentelemetry/sdk-trace-node" :refer [ConsoleSpanExporter]]
["@opentelemetry/exporter-trace-otlp-http" :refer [OTLPTraceExporter]]

["@opentelemetry/sdk-metrics" :refer [PeriodicExportingMetricReader
ConsoleMetricExporter]]
["@opentelemetry/resources" :refer [Resource]]
["@opentelemetry/semantic-conventions" :refer [SEMRESATTRS_SERVICE_NAME
SEMRESATTRS_SERVICE_VERSION]]
["@opentelemetry/api" :refer [trace]]))

;; TODO: pass in config via mount
; (def signoz-config
; {:url "https://ingest.us.signoz.cloud:443/v1/traces"
; :headers {"signoz-access-token" "b7d50281-ea48-41fe-be44-48274a3b14c9"}})

; (defn get-http-trace-exporter []
; (new OTLPTraceExporter (clj->js signoz-config)))

(def exporters
{:OTLPTraceExporter (fn [config] (new OTLPTraceExporter (clj->js config)))
:ConsoleSpanExporter (fn [& [config]] (new ConsoleSpanExporter))})

(defn get-exporter [{:keys [name config]}]
((get exporters name) config))

(defn init-sdk [config]
(let [service-name (get-in config [:sdk :name])
service-version (get-in config [:sdk :version])
resource (new Resource (clj->js {SEMRESATTRS_SERVICE_NAME service-name
SEMRESATTRS_SERVICE_VERSION service-version}))
trace-exporter (get-exporter (get-in config [:trace-exporter]))
metric-exporter (new ConsoleMetricExporter)
params {:resource resource
:traceExporter trace-exporter
:metricReader (new PeriodicExportingMetricReader (clj->js {:exporter metric-exporter}))}]
(new NodeSDK (clj->js params))))

(def sdk (atom nil))

(defn start [config]
(let [instance (init-sdk config)]
(.start instance)
(reset! sdk instance)))

(defn stop []
(.shutdown @sdk))

;; It’s generally recommended to call getTracer in your app when you need it
;; rather than exporting the tracer instance to the rest of your app.
;; This helps avoid trickier application load issues when other required dependencies are involved.
(defn get-tracer [scope-name scope-version]
(.getTracer trace scope-name scope-version))

(defonce tracer (get-tracer "syncer" "0.0.1"))
Loading

0 comments on commit 64fbf27

Please sign in to comment.