diff --git a/src/onyx/plugin/datomic.clj b/src/onyx/plugin/datomic.clj index fc5a26d..96c7c57 100644 --- a/src/onyx/plugin/datomic.clj +++ b/src/onyx/plugin/datomic.clj @@ -150,7 +150,7 @@ read-ch retry-ch commit-ch] p-ext/Pipeline (write-batch - [this event] + [this event] (function/write-batch event)) (read-batch @@ -455,12 +455,20 @@ (write-batch [_ event] - (let [messages (mapcat :leaves (:tree (:onyx.core/results event)))] - {:datomic/written @(d/transact conn - (map (fn [msg] (if (and partition (not (sequential? msg))) - (assoc msg :db/id (d/tempid partition)) - msg)) - (map :message messages))) + (let [messages (mapcat :leaves (:tree (:onyx.core/results event))) + timeout-ms 15000 + written (-> conn + (d/transact (map (fn [msg] (if (and partition (not (sequential? msg))) + (assoc msg :db/id (d/tempid partition)) + msg)) + (map :message messages))) + (deref timeout-ms ::timed-out))] + (when (= ::timed-out written) + (throw (ex-info "Timed out transacting message to datomic. Rebooting task" + {:restartable? true + :datomic-plugin? true + :timeout timeout-ms}))) + {:datomic/written written :datomic/written? true})) (seal-resource @@ -481,11 +489,22 @@ (write-batch [_ event] - {;; Transact each tx individually to avoid tempid conflicts. - :datomic/written (mapv (fn [tx] - @(d/transact conn (:tx (:message tx)))) - (mapcat :leaves (:tree (:onyx.core/results event)))) - :datomic/written? true}) + + (let [timeout-ms 15000 + written (->> (mapcat :leaves (:tree (:onyx.core/results event))) + (map (fn [tx] + (d/transact conn (:tx (:message tx))))) + (doall) + (map #(deref % timeout-ms ::timed-out)) + (doall))] + (when (some #(= ::timed-out written) written) + (throw (ex-info "Timed out transacting message to datomic. Rebooting task" + {:restartable? true + :datomic-plugin? true + :timeout timeout-ms}))) + {;; Transact each tx individually to avoid tempid conflicts. + :datomic/written written + :datomic/written? true})) (seal-resource [_ _] @@ -499,22 +518,37 @@ (defrecord DatomicWriteBulkDatomsAsync [conn] p-ext/Pipeline (read-batch - [_ event] + [_ event] (function/read-batch event)) (write-batch [_ event] - {;; Transact each tx individually to avoid tempid conflicts. - :datomic/written (->> (mapv (fn [tx] - (d/transact-async conn (:tx (:message tx)))) - (mapcat :leaves (:tree (:onyx.core/results event)))) - (into [] (comp (map deref)))) - :datomic/written? true}) + (let [timeout-ms 15000 + written (->> (mapcat :leaves (:tree (:onyx.core/results event))) + (map (fn [tx] + (d/transact-async conn (:tx (:message tx))))) + (doall) + (map #(deref % timeout-ms ::timed-out)) + (doall))] + (when (some #(= ::timed-out written) written) + (throw (ex-info "Timed out writing async message to datomic. Rebooting task" + {:restartable? true + :datomic-plugin? true + :timeout timeout-ms}))) + {;; Transact each tx individually to avoid tempid conflicts. + :datomic/written written + :datomic/written? true})) (seal-resource [_ _] {})) +(defn handle-timed-out-exception [event lifecycle lf-kw exception] + (if (and (:datomic-plugin? (ex-data exception)) + (:restartable? (ex-data exception))) + :restartable + :kill)) + (defn write-bulk-datoms-async [pipeline-data] (let [task-map (:onyx.core/task-map pipeline-data) conn (safe-connect task-map)] @@ -532,10 +566,12 @@ {:lifecycle/before-task-start inject-write-tx-resources}) (def write-bulk-tx-calls - {:lifecycle/before-task-start inject-write-bulk-tx-resources}) + {:lifecycle/before-task-start inject-write-bulk-tx-resources + :lifecycle/handle-exception handle-timed-out-exception}) (def write-bulk-tx-async-calls - {:lifecycle/before-task-start inject-write-bulk-tx-async-resources}) + {:lifecycle/before-task-start inject-write-bulk-tx-async-resources + :lifecycle/handle-exception handle-timed-out-exception}) ;;;;;;;;; ;;; params lifecycles