diff --git a/src/nosana_node/nosana.clj b/src/nosana_node/nosana.clj index 672b73b..7af7b66 100644 --- a/src/nosana_node/nosana.clj +++ b/src/nosana_node/nosana.clj @@ -613,56 +613,61 @@ (defn start-flow-for-run! "Start running a new Nostromo flow and return its flow ID." - [[run-addr run] conf {:nos/keys [store flow-chan]}] - (try - (let [job (get-job conf (:job run)) - job-addr (-> run :job .toString) - job-info (download-job-ipfs (:ipfsJob job) conf) - flow (cond-> (create-flow job-info run-addr run conf) + ([run conf sys] (start-flow-for-run! run conf sys true)) + ([[run-addr run] conf {:nos/keys [store flow-chan]} check-expired?] + (try + (let [job (get-job conf (:job run)) + job-addr (-> run :job .toString) + job-info (download-job-ipfs (:ipfsJob job) conf) + flow (cond-> (create-flow job-info run-addr run conf) + (int? (:job-timeout conf)) + (assoc :expires (+ (:time run) (:job-timeout conf)))) + + [_flow-valid? error-msg] (validate-flow-ops flow conf) + + expired? (and + check-expired? (int? (:job-timeout conf)) - (assoc :expires (+ (:time run) (:job-timeout conf)))) - - [_flow-valid? error-msg] (validate-flow-ops flow conf) - expired? (and (int? (:job-timeout conf)) - (> (flow/current-time) - (+ (:time run) (:job-timeout conf)))) - flow-id (:id flow)] - (cond - error-msg - (go - (log :info "Finishing job because of unsupported OP. Waiting for finish transaction.") - (let [results-ipfs (finish-flow (assoc-in flow [:state :nosana/error] error-msg) conf) - sig (finish-job conf - (PublicKey. job-addr) - (PublicKey. run-addr) - (:market job) - results-ipfs) - _ (log :trace "Finish tx id " sig) - tx (flow job-addr] flow-id)) - (go - (! flow-chan [:trigger flow-id]) - flow-id)))) - (catch Exception e - (log :error "Error starting flow") - (log :debug e) - (go - (log :info "Quit run because of error" (.toString run-addr)) - (let [sig (quit-job conf (sol/public-key run-addr))] - ( (flow/current-time) + (+ (:time run) (:job-timeout conf)))) + + flow-id (:id flow)] + (cond + error-msg + (go + (log :info "Finishing job because of unsupported OP. Waiting for finish transaction.") + (let [results-ipfs (finish-flow (assoc-in flow [:state :nosana/error] error-msg) conf) + sig (finish-job conf + (PublicKey. job-addr) + (PublicKey. run-addr) + (:market job) + results-ipfs) + _ (log :trace "Finish tx id " sig) + tx (flow job-addr] flow-id)) + (go + (! flow-chan [:trigger flow-id]) + flow-id)))) + (catch Exception e + (log :error "Error starting flow") + (log :debug e) + (go + (log :info "Quit run because of error" (.toString run-addr)) + (let [sig (quit-job conf (sol/public-key run-addr))] + (