Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mount and core.async #84

Open
fcabouat opened this issue May 5, 2017 · 10 comments
Open

Mount and core.async #84

fcabouat opened this issue May 5, 2017 · 10 comments

Comments

@fcabouat
Copy link

fcabouat commented May 5, 2017

Hi,

Sorry to post here instead of Slack (blocked by proxy), and it might be a bit of a noob question, but I've got trouble finding documentation on Mount regarding the stopping process / running concurrent processes.

When I have a starting order : state1 -> state2 -> state3
And the stopping order : state3 -> state2 -> state1

Can I block in the state3 function, waiting to clean up some running concurrent process ?

(For more context :

I've got quite a few channels and go-loops running, with dependencies between them (1 go loop giving tempo -> 3 thread I/O workers -> 1 test go-loop downstream-consumer).

I have the following states in my namespace :

(defstate workers-nbr :start 3)
(defstate close-chan  :start (a/chan))

(...)

(defstate heartbeat-chan :start (heartbeat-start) ; starts a go-loop alt!-ing between a timeout and close-chan, returns a chan
                         :stop  #(a/close! close-chan))

(...)

(defstate work-chan :start (workers-start) ; starts 3 long running threads doing IO while heartbeat-chan isn't closed, returns a chan
                    :stop #(a/close! heartbeat-chan))

(...)

(defstate work-test :start (work-test-start) ; starts a go-loop and returns the go-loop chan
                    :stop  #(a/close! work-chan))

And my "main" :

  (m/start)
  (.addShutdownHook (Runtime/getRuntime)
                    (Thread.
                      (fn []
                        (a/close! work-test)
                        (m/stop))))
  (<!! work-test))

I get these kinds of exceptions when I exit :

Exception in thread "async-thread-macro-3" java.lang.IllegalArgumentException: No implementation of method: :take! of protocol: #'clojure.core.async.impl.protocols/ReadPort found for class: mount.core.NotStartedState

I guess the states get stopped while my go-loops are still running meaning work-chan is stopped right after #(a/close! work-chan) got fired, but before the go-loop had time to pick up the closed signal, hence the go-loop trying a <! on an already closed state.

I'm thinking of blocking inside the stop functions on the go-loop / threads like :

(defstate work-test :start (work-test-start)
                    :stop  #(do (a/close! work-chan)
                                (<!!      work-test)

Will it work properly ? Are mount and core.async meant to be combined in this way ? Or is it bad design ?

Cheers,

François

@tolitius
Copy link
Owner

tolitius commented May 8, 2017

a couple observations:

configuration over public constants

(defstate workers-nbr :start 3)

this would usually come from configuration / program parameters rather than being defined as a state

stop functions

is there any reason your stop functions are anonymous functions:

:stop #(a/close! heartbeat-chan)

rather than just:

:stop (a/close! heartbeat-chan)

not stopping state on stop

most likely the root of your problem is that a single state is started and stopped in different states, whereas it should be started and stopped in/as the same state:

(defstate a :start (foo)
            :stop  b)

(defstate b :start (bar)
            :stop  a)

vs.

(defstate a :start (foo)
            :stop  a)

(defstate b :start (bar)
            :stop  b)

@thurn
Copy link

thurn commented Jun 17, 2017

Hi, I'm having a very similar problem. In general, I think mount is not currently a good tool for managing channel state. My initial instinct was to do something like this:

(mount/defstate requests-channel
  :start (async/chan)
  :stop (async/close! requests-channel))

But this doesn't actually work. The basic problem is that asynchronous goroutines expect to see a channel in the "closed" state in order to terminate themselves, but mount replaces the channel object with a NotStartedState instance and causes them to crash. Eg. if you have standard goroutine code like this:

(async/go-loop []
    (when-let [request (async/<! requests/requests-channel)]
      ;; handle request
      (recur)))

Your program will randomly crash because requests-channel is gone.

Personally, I'm not really a fan of the whole (undocumented) NotStartedState thing. If it were up to me, stopped states would just get assigned the value you return from :stop, similar to how component works. That was actually how I thought it worked from reading the documentation.

@tolitius
Copy link
Owner

Personally, I'm not really a fan of the whole (undocumented) NotStartedState thing

why NotStartedState

The initial idea was to follow this succession of values:

NotStartedState => (start it) => (stop it) => NotStartedState

Reasoning:

  1. The state, once stopped returns to its initial value of NotStartedState which gives an immediate feedback in case it is used while it should not be: i.e. rather than chasing NullPointers, etc.

  2. There are states that do not require a :stop function, in which case, if they are stopped, should "no longer be started", and also should not be used: NotStartedState clearly communicates this (in case they are stopped).

stop value has "value"

I do not disagree it creates confusion in cases where other parts of an application rely on a stop value to control their flow. However I also think controlling flow based on mutation is a smell.

core.async made certain design choices, and returning nil from a closed channel is one of them. It works well in some cases, does not work well in others: for example when you depend on it to get out of a tight go-loop.

I usually do something like (pseudo code):

(async/go-loop []
    (when-let [request (alt!
                         request-channel ([msg] msg)
                         stop-channel ([_] nil))]
      ;; handle request
      (recur)))

and then in a :stop function(s) I send a message to a stop-channel which:

  • explicitly communicates a stop action
  • go loop reads without an implicit knowledge that it will stop ones "something" returns nil.

I am not saying my way is better, it is just something I feel more comfortable with.

However 47% of me agrees with you that it would be more expected (less surprising) if a state ended up bound to a "stop value" once a :stop function is called. Another 4% and I'll just do it, so I am more than willing to have a discussion about it.

@thurn
Copy link

thurn commented Jun 19, 2017

Yeah, I see your point. NotStartedState obviously does have some error-detection benefits. The interaction with channels is just unfortunate, since the "loop until it returns nil" idiom is so widespread and convenient, especially for things like correctly cleaning up child channels created via sub or tap. Wish there was a good compromise solution.

@tolitius
Copy link
Owner

tolitius commented Jun 19, 2017

I don't think mount has limitations when it comes to core.async. Besides the solution above with a stop-channel you can do (pseudo code):

(defn listen-to-events []
  (let [req-chan (chan)]
    (async/go-loop []
       (when-let [request (<! req-chan)]
         ;; handle request
         (recur)))
    req-chan))

(defstate listener :start (listen-to-events)
                   :stop (async/close! listener))

It also conveys a notion of state a bit better since it is not just a channel that is stateful, but a listener as well.

@fcabouat
Copy link
Author

Hi, sorry for my low participation after asking a question, changed a bit of focus at work and went on holidays.

You were obviously right for your first two remarks (external conf, using anonymous function instead of a simple function call block). Your third remark was a bit confusing to me since you display a cyclic dependency with a and b. My go-loop pipeline is acyclic, with a-fetching -> b-accumulating -> c-output. And I had in mind I would be able to manage dependant states/the dependency order with mount, starting automatically c, then b, then a, and stopping a, then b, then c.

I'll have to wrap my head around this a bit more, since it's still not clear to me at this point what could/should be handled by mount as state (the channels, the go-loops, should I have just one restartable state for the whole pipeline ?, etc.). I might still be doing pretty basic mistakes.

Anyway, I'm glad thurn stepped in and fueled the discussion a bit further.

Cheers,

@tolitius
Copy link
Owner

it's still not clear to me at this point what could/should be handled by mount as state (the channels, the go-loops, should I have just one restartable state for the whole pipeline ?, etc.) I might still be doing pretty basic mistakes.

These are not basic mistakes, different developers would come up with different designs and will stand their ground to prove their way is the best :)

It would depend on the application design and usage of course, but when it has to do with core.async I tend to wrap go-loops in mount states. Since a go-loop should be started and should be stopped and it is stateful.

Sometimes it makes sense to deem the whole pipeline as a stateful component: https://stackoverflow.com/a/40435030/211277

Things to consider when deciding when create a mount state and what should be "in it":

  • can I easily test it?
  • would some state within "this state" be useful/needed by itself? then split into two states.
  • can I avoid creating a mount state by simply using a locally scoped state (i.e. (let [a 42] ..))?
  • does this state really have / need a lifecycle: i.e. start / stop
  • etc. :)

@fcabouat
Copy link
Author

fcabouat commented Jul 5, 2017

This SO answer is awesome / many food for thoughts, thanx !

@fcabouat
Copy link
Author

fcabouat commented Jul 7, 2017

Little update : I made sure states are responsible for stopping only themselves, but still since I have running go-loops /thread-loops and dependencies (state B loop depending on state A), I keep having some kind of race condition.

I tried with a stop-channel, it seems like state B gets his stop called, which would close its loop in the next cycle (when alt!!-ing on the stop channel)... and then A gets stopped...

But during this last cycle, the B loop is still running and calls A -> IllegalArgumentException: No implementation of method: :my-method! of protocol: #'my-ns/StateAProtocol found for class: mount.core.NotStartedState.

So as I get it, there are only two options here :

  • have a way to make the stop process block... with B finishing its whole loop cycle before closing A (not so easy to block on go-loops everywhere) ?
  • have a stop value, so my-method! on the stopped state A would result in a no-op (could be easier) ?

Regards,

@buzzdan
Copy link

buzzdan commented May 19, 2019

I don't think mount has limitations when it comes to core.async. Besides the solution above with a stop-channel you can do (pseudo code):

(defn listen-to-events []
  (let [req-chan (chan)]
    (async/go-loop []
       (when-let [request (<! req-chan)]
         ;; handle request
         (recur)))
    req-chan))

(defstate listener :start (listen-to-events)
                   :stop (async/close! listener))

It also conveys a notion of state a bit better since it is not just a channel that is stateful, but a listener as well.

i just wanna put it here for the ones to come -
this is working:

(defstate listener
          :start (listen-to-events)
          :stop (async/close! ^clojure.core.async.impl.protocols/Channel listener))

notice the type hint above --> ^clojure.core.async.impl.protocols/Channel
this is what actually making the difference

Why ?

before using the type hint we get this message:

Execution error (IllegalArgumentException) at clojure.core.async.impl.protocols/eval5371$fn$G (protocols.clj:21).
No implementation of method: :close! of protocol: #'clojure.core.async.impl.protocols/Channel found for class: mount.core.DerefableState

it means that it gets mount.core.DerefableState as the type instead of a channel
so helping it by hinting it to ^Channel helps!

Enjoy 😉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants