Skip to content

Commit

Permalink
Merge branch 'expired-jobs-devnet' into 'main'
Browse files Browse the repository at this point in the history
keep running expired jobs as temp solution for wrong devnet timestamp

See merge request nosana-ci/apps/platform/nosana-node!74
  • Loading branch information
jeisses committed Feb 14, 2024
2 parents b96e9c7 + 6bee9e8 commit 4300578
Showing 1 changed file with 54 additions and 49 deletions.
103 changes: 54 additions & 49 deletions src/nosana_node/nosana.clj
Original file line number Diff line number Diff line change
Expand Up @@ -613,56 +613,61 @@

(defn start-flow-for-run!
"Start running a new Nostromo flow and return its flow ID."
[[run-addr run] conf {:nos/keys [store flow-chan]}]
(try
(let [job (get-job conf (:job run))
job-addr (-> run :job .toString)
job-info (download-job-ipfs (:ipfsJob job) conf)
flow (cond-> (create-flow job-info run-addr run conf)
([run conf sys] (start-flow-for-run! run conf sys true))
([[run-addr run] conf {:nos/keys [store flow-chan]} check-expired?]
(try
(let [job (get-job conf (:job run))
job-addr (-> run :job .toString)
job-info (download-job-ipfs (:ipfsJob job) conf)
flow (cond-> (create-flow job-info run-addr run conf)
(int? (:job-timeout conf))
(assoc :expires (+ (:time run) (:job-timeout conf))))

[_flow-valid? error-msg] (validate-flow-ops flow conf)

expired? (and
check-expired?
(int? (:job-timeout conf))
(assoc :expires (+ (:time run) (:job-timeout conf))))

[_flow-valid? error-msg] (validate-flow-ops flow conf)
expired? (and (int? (:job-timeout conf))
(> (flow/current-time)
(+ (:time run) (:job-timeout conf))))
flow-id (:id flow)]
(cond
error-msg
(go
(log :info "Finishing job because of unsupported OP. Waiting for finish transaction.")
(let [results-ipfs (finish-flow (assoc-in flow [:state :nosana/error] error-msg) conf)
sig (finish-job conf
(PublicKey. job-addr)
(PublicKey. run-addr)
(:market job)
results-ipfs)
_ (log :trace "Finish tx id " sig)
tx (<! (sol/await-tx< sig (:network conf)))]
(log :info "Submitted finish job tx " sig tx))
nil)

expired?
(throw (ex-info "Run has expired" {:run-time (:time run)
:job-timeout (:job-timeout conf)}))

:else
(do
(log :info "Starting job" job-addr)
(log :info "Processing flow" flow-id)
(<!! (kv/assoc store [:job->flow job-addr] flow-id))
(go
(<! (flow/save-flow flow store))
(>! flow-chan [:trigger flow-id])
flow-id))))
(catch Exception e
(log :error "Error starting flow")
(log :debug e)
(go
(log :info "Quit run because of error" (.toString run-addr))
(let [sig (quit-job conf (sol/public-key run-addr))]
(<! (sol/await-tx< sig (:network conf)))
nil)))))
(> (flow/current-time)
(+ (:time run) (:job-timeout conf))))

flow-id (:id flow)]
(cond
error-msg
(go
(log :info "Finishing job because of unsupported OP. Waiting for finish transaction.")
(let [results-ipfs (finish-flow (assoc-in flow [:state :nosana/error] error-msg) conf)
sig (finish-job conf
(PublicKey. job-addr)
(PublicKey. run-addr)
(:market job)
results-ipfs)
_ (log :trace "Finish tx id " sig)
tx (<! (sol/await-tx< sig (:network conf)))]
(log :info "Submitted finish job tx " sig tx))
nil)

expired?
(throw (ex-info "Run has expired" {:run-time (:time run)
:job-timeout (:job-timeout conf)}))

:else
(do
(log :info "Starting job" job-addr)
(log :info "Processing flow" flow-id)
(<!! (kv/assoc store [:job->flow job-addr] flow-id))
(go
(<! (flow/save-flow flow store))
(>! flow-chan [:trigger flow-id])
flow-id))))
(catch Exception e
(log :error "Error starting flow")
(log :debug e)
(go
(log :info "Quit run because of error" (.toString run-addr))
(let [sig (quit-job conf (sol/public-key run-addr))]
(<! (sol/await-tx< sig (:network conf)))
nil))))))

(defn exit-work-loop!
"Stop the main work loop for the system"
Expand Down

0 comments on commit 4300578

Please sign in to comment.