Skip to content
This repository has been archived by the owner on Jan 6, 2023. It is now read-only.

Resume point AssertionError: Assert failed: (= slot-migration :direct) for {:mode :initialize} #870

Closed
sundbry opened this issue Sep 13, 2018 · 0 comments

Comments

@sundbry
Copy link
Contributor

sundbry commented Sep 13, 2018

I'm getting assert failures when I try to submit a job with an initial resume point. I can't add {:slot-imgration :direct} to the resume point map or it will fail validation?

{:message Onyx lifecycle exception, :lifecycle #:lifecycle{:task :all, :calls :app.data.core/standard-lifecycle-calls, :doc Standard lifecycles including error handling}, :phase :lifecycle/recover-input}
java.lang.AssertionError: Assert failed: (= slot-migration :direct)
        at onyx.peer.resume_point$recover_input.invokeStatic(resume_point.clj:118)
        at onyx.peer.resume_point$recover_input.invoke(resume_point.clj:114)
        at onyx.peer.task_lifecycle$recover_input.invokeStatic(task_lifecycle.clj:447)
        at onyx.peer.task_lifecycle$recover_input.invoke(task_lifecycle.clj:442)
        at onyx.peer.task_lifecycle.TaskStateMachine.exec(task_lifecycle.clj:1075)
        at onyx.peer.task_lifecycle$run_task_lifecycle_BANG_.invokeStatic(task_lifecycle.clj:554)
        at onyx.peer.task_lifecycle$run_task_lifecycle_BANG_.invoke(task_lifecycle.clj:544)
        at onyx.peer.task_lifecycle$start_task_lifecycle_BANG_$fn__46871.invoke(task_lifecycle.clj:1162)
        at clojure.core.async$thread_call$fn__11025.invoke(async.clj:441)
        at clojure.lang.AFn.run(AFn.java:22)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

The job spec looks like this:

{:workflow [[:telemetry-events :telemetry-handler] ...]
 :catalog 
   [{:onyx/name :telemetry-events
     :onyx/plugin :onyx.plugin.kafka/read-messages
     :onyx/type :input
     :onyx/medium :kafka
     :kafka/topic "telemetry"
     :kafka/receive-buffer-bytes 65536
     :kafka/zookeeper "my.zookeeper.host:2181"
     :kafka/offset-reset :earliest
     :kafka/deserializer-fn :app.core/deserialize-json
     :kafka/wrap-with-metadata? false
     :onyx/batch-timeout 50
     :onyx/n-peers 2
     :onyx/batch-size 100
     :onyx/doc "Reads messages from a Kafka topic"}
    ...]
 :lifecycles ...
 :flow-conditions ...
 :task-scheduler :onyx.task-scheduler/balanced
 :resume-point {:telemetry-events {:input {:mode :initialize}}}}

I'm not sure what I did wrong? I think a resume point example in the onyx-examples would also be very helpful.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant