Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add datomic transact timeout #24

Open
wants to merge 5 commits into
base: 0.9.x
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 57 additions & 21 deletions src/onyx/plugin/datomic.clj
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@
read-ch retry-ch commit-ch]
p-ext/Pipeline
(write-batch
[this event]
[this event]
(function/write-batch event))

(read-batch
Expand Down Expand Up @@ -455,12 +455,20 @@

(write-batch
[_ event]
(let [messages (mapcat :leaves (:tree (:onyx.core/results event)))]
{:datomic/written @(d/transact conn
(map (fn [msg] (if (and partition (not (sequential? msg)))
(assoc msg :db/id (d/tempid partition))
msg))
(map :message messages)))
(let [messages (mapcat :leaves (:tree (:onyx.core/results event)))
timeout-ms 15000
written (-> conn
(d/transact (map (fn [msg] (if (and partition (not (sequential? msg)))
(assoc msg :db/id (d/tempid partition))
msg))
(map :message messages)))
(deref timeout-ms ::timed-out))]
(when (= ::timed-out written)
(throw (ex-info "Timed out transacting message to datomic. Rebooting task"
{:restartable? true
:datomic-plugin? true
:timeout timeout-ms})))
{:datomic/written written
:datomic/written? true}))

(seal-resource
Expand All @@ -481,11 +489,22 @@

(write-batch
[_ event]
{;; Transact each tx individually to avoid tempid conflicts.
:datomic/written (mapv (fn [tx]
@(d/transact conn (:tx (:message tx))))
(mapcat :leaves (:tree (:onyx.core/results event))))
:datomic/written? true})

(let [timeout-ms 15000
written (->> (mapcat :leaves (:tree (:onyx.core/results event)))
(map (fn [tx]
(d/transact conn (:tx (:message tx)))))
(doall)
(map #(deref % timeout-ms ::timed-out))
(doall))]
(when (some #(= ::timed-out written) written)
(throw (ex-info "Timed out transacting message to datomic. Rebooting task"
{:restartable? true
:datomic-plugin? true
:timeout timeout-ms})))
{;; Transact each tx individually to avoid tempid conflicts.
:datomic/written written
:datomic/written? true}))

(seal-resource
[_ _]
Expand All @@ -499,22 +518,37 @@
(defrecord DatomicWriteBulkDatomsAsync [conn]
p-ext/Pipeline
(read-batch
[_ event]
[_ event]
(function/read-batch event))

(write-batch
[_ event]
{;; Transact each tx individually to avoid tempid conflicts.
:datomic/written (->> (mapv (fn [tx]
(d/transact-async conn (:tx (:message tx))))
(mapcat :leaves (:tree (:onyx.core/results event))))
(into [] (comp (map deref))))
:datomic/written? true})
(let [timeout-ms 15000
written (->> (mapcat :leaves (:tree (:onyx.core/results event)))
(map (fn [tx]
(d/transact-async conn (:tx (:message tx)))))
(doall)
(map #(deref % timeout-ms ::timed-out))
(doall))]
(when (some #(= ::timed-out written) written)
(throw (ex-info "Timed out writing async message to datomic. Rebooting task"
{:restartable? true
:datomic-plugin? true
:timeout timeout-ms})))
{;; Transact each tx individually to avoid tempid conflicts.
:datomic/written written
:datomic/written? true}))

(seal-resource
[_ _]
{}))

(defn handle-timed-out-exception [event lifecycle lf-kw exception]
(if (and (:datomic-plugin? (ex-data exception))
(:restartable? (ex-data exception)))
:restartable
:kill))

(defn write-bulk-datoms-async [pipeline-data]
(let [task-map (:onyx.core/task-map pipeline-data)
conn (safe-connect task-map)]
Expand All @@ -532,10 +566,12 @@
{:lifecycle/before-task-start inject-write-tx-resources})

(def write-bulk-tx-calls
{:lifecycle/before-task-start inject-write-bulk-tx-resources})
{:lifecycle/before-task-start inject-write-bulk-tx-resources
:lifecycle/handle-exception handle-timed-out-exception})

(def write-bulk-tx-async-calls
{:lifecycle/before-task-start inject-write-bulk-tx-async-resources})
{:lifecycle/before-task-start inject-write-bulk-tx-async-resources
:lifecycle/handle-exception handle-timed-out-exception})

;;;;;;;;;
;;; params lifecycles
Expand Down