Replies: 10 comments 5 replies
-
For me this is very much a learning exercise. I think you are further ahead than me. My initial goal is to have a very lightweight WeeWX install who’s only purpose is to capture and then publish the data. So my focus is on the archive data and reliability over timeliness. My first go around was a REST API and a custom uploader. Then I started tinkering with MQTT. On the publish side I have landed pretty much where you have. My current thought is on a WeeWX new archive record, add the record to a persistent (aka disk, probably SQLite db) queue. This would ensure that any MQTT failures would not interfere with the data capture and I think make any restart/catch up easier... The separate application would publish the data and check the is_publish flag. I might even put a little delay/recheck logic in there (since I’m only publishing ever 5 minute). I’m not sure what I would do if I never get a successful publish acknowledgment or maybe more accurately how long do I wait before saying the publish was not successful ... Until a later message is successful? My reading of the documentation is that the on_publish() is when the broker has received the message. When you said on_request() did you mean on_publish()? I would only be using QOS=2. On the subscribing side I don’t think there is anything I need to do. I plan to experiment with that first. I know there are options for the broker on how many messages to keep, etc. I want to get an idea of what impact this will have on the broker. I too rely on the loop_start. Have not delved into the inner workings of the callbacks. MQTTSubscribe only derives from WeeWX StdService so that it can be notified of new archive or loop packets. I tried to encapsulate the MQTT logic. Like everything software, looking back on it, I would make some different design decisions. I did fork Matthew’s weewx-mqtt which does rely on WeeWX restx infrastructure. I wanted a single connection, multiple topics, additional payload formats... As noted above, my current plan is for the actual publish to be outside of the WeeWX infrastructure. I’ll be very interested in how your development progresses. |
Beta Was this translation helpful? Give feedback.
-
this is the code i have in production, in Publisher just after calling publish(), with placeholders for persisting pending requests qos>0. note that in testing i stuck the successfully pending request (waiting in client-side queue) in a dict here and removed it in on_publish() and the dict always shrank to empty, so they did match up. so far as i can tell, we don't do anything else with these persisted requests, until we resume a session and need to re-publish them all. can you think of anything else that needs to be done to them?
|
Beta Was this translation helpful? Give feedback.
-
i recall one of these reported in forum, and i think you indicated MQTTSubscribe now has a tolerance for how much ‘out of frame’ it can be without being dropped. i haven’t even tested for this situation; instead i work on the age of the datum independent of which frame it arrives in. my assumpion - now i realise it is an assumption - is that the datum is (can be) meaningful for a period longer than the sampling period (archive interval) and though one or two samples (archive packets) might be a little skewed, in the overall context they would cancel each other out - in the nature of sampling. (spikes, if valid data, are always a problem for sampling.) where the assumption weakens is when the data can be held back for a long time - quite possible in mqtt framework - what do we do? which is where you have introduced a tolerance level. for a ‘last good’ value like temperature, that holds fine, assuming it has tiny effect on averaging(*). for a ‘cumulative’ value, it does not hold - we need to account for that missing bit even if it is late!
(*)averaging is skewed anyway if we receive multiple values in one interval, queued up then averaged to produce one value for the archive packet. for example, one record contains the average of [10] = 10, then next has average of [1000, 1010, 990, 1000] = 1000 -> apparent average via records is 10+1000/2=505 not 10+1000+1010+990+1000/5=802. but again, we assume over time this averages out correctly (and bunching and spiking still screw things up)
… On 6 Mar 2021, at 9:31 am, Rich Bell ***@***.***> wrote:
In general for MQTTSubscribe, 'order' issues come up if a message arrives with a time before the interval being processed or if it is overly aggressive about enforcing data must be in the interval... If the time is within the interval, the order doesn't matter.
|
Beta Was this translation helpful? Give feedback.
-
using clean_session=False (i.e. persistent broker-side session) has two relevant features. my latest gateway design, when using persistent broker-side sessions, is to largely ignore (but mitigate) persistent subscriptions, but welcomes broker-side buffered inward message with open arms init()
start()
on_connect()
on_message()
note: this design also includes option in configuration to explicitly unsubscribe a topic, making transition between configurations, with persistent sessions, a little cleaner to manage |
Beta Was this translation helpful? Give feedback.
-
client-side persistence i am still tossing up whether pending publish requests with topics not in current configuration should be dropped as 'no longer of interest' or sent anyway for destinations to decide whether to use them (does our reliability contract extend to 'old' messages?) pending publish requests - re-published in on_connect() - i now store in a persistent dict using 'shelve'. instances of the request are simply stored once, retrieved once, deleted once, so writeback=False is fine |
Beta Was this translation helpful? Give feedback.
-
i expect doing the reconnect is just a call in on_disconnect() (maybe with a few seconds delay) if return code shows an error. it is then up to on_connect() to re-establish everything (including subscriptions if clean_session). presumably with using the persistent queue (perhaps several such), each item needs to info.wait_for_publish() before looking at next item, otherwise the queue has to have ‘grey’ items (published but not committed…) to guarantee broker has taken responsibility - which is what on_publish() is all about
an advantage of using a persistent queue with wait_for_publish() (thus avoiding on_publish()) is being able to distinguish between QOS=0 and QOS>0 - wait_for_publish() only on the latter - whereas on_publish() does not discriminate so you have to persist all requests, you can choose at re-publish time whether to re-publish Q=0 requests
i build all my objects before connect(), and ‘activate’ them in on_connect() (e.g. subscriber objects subscribe). this anticipates losing and regaining connection. (i call loop_stop() in on_disconnect() when return code is successful; the loop thread is supposed to call on_connect() for me when return code was unsuccessful but i haven’t explicitly tested this.)
i am stlll persuing event-driven publishes rather than queue driven, using a persistent dict and on_publish(). the key has to allow for a publish request living across multiple mqtt client instances so mid is not a sufficient distinguisher - i use timestamp of creation of mqtt client instance as a prefix for the keys, which has the advantage of letting me sort on time sequence when it comes to re-publishing on startup.
BUT the mqqt callbacks are driving me nuts, when are they called?? and paho’s loop thread is daemon, which plays havoc when trying to do a clean program exit (including closing persistent dict) in error situation, so i have been thinking about using loop() and doing my own thread
… On 15 Mar 2021, at 7:16 am, Rich Bell ***@***.***> wrote:
Well, I did a 180 on this. I am now using a persistent queue for the data to be published. I landed back here because I really want WeeWX to just gather the data.
In a separate process (though currently a separate thread), the queue is read, processed as needed, and published. I no longer use loop_start(), just loop() to allow the mqtt code to do its thing. It gives me a bit more control on when the callbacks are called, not sure it is warranted. I now have to do the reconnect, so researching that.
Currently "stress testing" by publishing to multiple topics on every loop.
I still need to digest your more recent posts.
|
Beta Was this translation helpful? Give feedback.
-
On 17 Mar 2021, at 1:32 am, Rich Bell ***@***.***> wrote:
re: reconnect - That sounds about right. I haven't given much thought on what to with in-flight messages. Currently I only check the rc and call reconnect(), which appears to call connect under the covers. Next up is to add max retries and some sleep time.
re: wait_for_publish() - I looked into it, but decided against it; because it appears to block forever. I currently have an in memory dictionary that I use to determine what to do in on_publish() - persist or not.
i have a dict ‘pending_pubreqs' which, as part of configuration, is set to either a persistent dict or an in-memory dict, then forget about persistence per se. then i stick every publish request in it because i use on_publish() which processes QOS=0 message as well. on first on_connect() after start-up, i walk pending_pubreqs (sorted into time sequence) to re-publish all, skipping QOS=0 requests
In MQTTSubscribe, I have the same flow of building up objects and 'activating' in on_connect(). I have seen the automatic reconnect happen. I currently have no plans to change the flow of MQTTSubscribe away from loop_start()/loop_stop().
the documentation says that after a successful reconnect, the client automatically republishes all messages pending in the client-side queue. i found that is not quite true: it re-publishes only QOS>0 requests. so i modified my original algorithm so that my code does re-publishing from pending_pubreqs only on first on_connect() and to skip QOS=0 requests, to match that behaviour
i am having trouble getting it to reliably re-connect - still trying to understand what my test scenarios are telling me… i have built os.system(‘sudo systemctl stop mosquitto’) and ‘.. start ..’ into the test programs to time the connection up/down precisely
i did have to add the code after publish() checking for environmental problems - this is where my algorithm realises it has no connection so just sticks it in pending_pubreqs to match up with on_publish() when the mqtt client automatically re-publishes on re-connect - copy of that code attached below
I too am thinking about event driven publishing. This is because I want to publish archive records which only 'appear' every 5 minutes.
So far I am fairly happy with using the 'loop() flow' when publishing. When shutting down, I call this routine to handle in-flight messages.
def wait_for_inflight_messages(self):
""" Wait for acknowledgement that messages have been published. """
# to do configure
wait_count = 5
counter = 0
sleepy = 1
while len(self.mids) > 0 and counter < wait_count:
logdbg("%s in flight messages; on %s loop count %s" %(len(self.mids), counter, self.mids))
time.sleep(sleepy)
self.client.loop(timeout=0.1)
counter += 1
i didn't even think of such a simple approach to drain the client-side queue! i was obviously over-thinking it. thanks for the tip
i have noticed during my testing that, provided i don’t let the loop daemon thread get killed off by an abrupt process exit, when i disconnect(), the client has been draining the client-side queue (by publishing the pending requests) before sending the disconnect request to the broker. this is not consistent with my understanding of the documentation, which on my reading says it drops the connection immediately, and even warns that pending requests (in client-side queue) are the client program’s responsibility, so i don’t want to rely on it
The self.mids is the in memory dictionary I mentioned above.
BUT, when using loop() and QOS = 0, there appears to be a bug/feature where on_publish() is called before publish() is finished. This drove me crazy for a while. So, in on_publish(), I have to check if an entry in self.mids even exists…
i haven’t struck this. i did strike something similar-sounding: sometimes the publish request departs the queue before the publish() call completes, so on_publish() is never called for it, so i have to skip putting it in pending_pubreqs if info.is_published() is true just after the publish() call - also illustrated in code below
once i sort out my current issue of not getting reliable re-connects, i will be able to move on to clean_session=False testing, which of course is really all about inward messages
--
publish code mentioned above:
# publish encoded dataset
info = self._mqtt_client.publish(
self.topic,
payload = payload,
qos = self.qos,
retain = self.retain
)
if tracing > 1:
log.debug(f"{self}"
f" publish result rc={info.rc} mid={info.mid}"
f" is_pub={info.is_published()}"
)
if info.rc == mqtt.MQTT_ERR_SUCCESS:
# request succeeded, but not necessarily reached broker yet
if info.is_published():
# broker has message; we are done
if tracing > 1:
log.debug(f"{self} publish immediate")
return # successful exit
else:
# it is in client-side queue
pass # fall through to persist request.
# this should be picked up by on_publish() soon
elif info.rc in (
mqtt.MQTT_ERR_NO_CONN,
mqtt.MQTT_ERR_QUEUE_SIZE
):
# request failed for environmental reason
pass # fall through to persist request.
# this will be re-published on next on_connect()
else:
# request failed irreparably - not queued
raise RunError(mqtt.error_string(info.rc))
# persist the request
if True: #self.qos > 0: # on_publish does not distinguish QOS
# so even QOS=0 must be persisted, or
# alternatively allow misses on dict
# and *assume* they were QOS=0 requests
keyval = f'{self._persist_key_prefix}{info.mid}'
self._pending_pubreqs[keyval] = PublishRequest(
self.topic,
payload,
self.qos,
self.retain
)
if tracing > 1:
log.debug(f"{self} store request keyval={keyval}")
|
Beta Was this translation helpful? Give feedback.
-
On 17 Mar 2021, at 3:15 am, Graham Eddy ***@***.***> wrote:
once i sort out my current issue of not getting reliable re-connects
i am getting some clarity on what gets re-published and where reconnect fits in…
for the test program schedule:
schedule = [
( 5.0, msgs_1), # publish 3 msgs, one at each QOS
(10.0, broker_down),
(15.0, msgs_2), # publish 3 msgs, one at each QOS
(20.0, broker_up),
(25.0, msgs_3), # publish 3 msgs, one at each QOS
(30.0, finish),
]
i see the following behaviour:
* immediately after ‘broker_down’ it reports lost connection. no reconnect is attempted
* at ‘msgs_2’ there are three quick publishes. mqqt, and my code, sees connection down so stores them for re-publishing
* at ‘broker_up’ nothing appears to happen
* at ‘msgs_3’ there are three quick publishes again. mqqt doesn’t get a word in until that settles, meaning connection is down during publishes, and mqqt and my code both store the messages for re-publishing. *then* mqqt finally issues reconnect
* once the connection is up, mqqt re-publishes the ‘msgs_2’ and ‘msgs_3’ it has stored - but only QOS>0 - and on_publish() called for each, which clears my pending queue of QOS>0 messages, leaving the QOS=0 ones still in it
when i restart my test program, my code picks up those old pending QOS=0 messages on first on_connect() but discards them
conclusion: on *every* on_connect() i need to scan the pending requests and discard any QOS=0 and, for only the first on_connect() do the re-publish of QOS>0 requests (because that re-publish is done by mqqt itself after the first on_connect())
my brain hurts
|
Beta Was this translation helpful? Give feedback.
-
On 17 Mar 2021, at 4:01 pm, Graham Eddy ***@***.***> wrote:
* at ‘broker_up’ nothing appears to happen
* at ‘msgs_3’ there are three quick publishes again. mqqt doesn’t get a word in until that settles, meaning connection is down during publishes, and mqqt and my code both store the messages for re-publishing. *then* mqqt finally issues reconnect
further testing shows that auto-reconnect *does* take place, but not immediately, and not driven by other requests it notices. it is the retry delay period (1 sec then doubling on each try) - so any requests between connection loss and when a retry delay expires will be stored away while awaiting reconnect. be aware of this delay potentially of 1-2 minutes before connection up is noticed! (the delay period is configurable using reconnect_delay_set(min_delay=1, max_delay=120)
with test schedule
schedule = [
( 5.0, msgs_1), # publish 3 msgs at each QOS
(10.0, broker_down),
(15.0, msgs_2), # publish 3 msgs at each QOS
(77.0, broker_up),
(190.0, msgs_3), # publish 3 msgs at each QOS
(200.0, finish),
]
the reconnection happens 127 secs after ‘broker_down’ (exactly per the default) and ‘msgs_2' were re-published immediately, then ‘msgs_3’ happened live as expected when the connection was in place
my previous conclusion still holds:
on *every* on_connect() i need to scan the pending requests and discard any QOS=0 and, for only the first on_connect() do the re-publish of QOS>0 requests (because that re-publish is done by mqqt itself after the first on_connect())
|
Beta Was this translation helpful? Give feedback.
-
On 17 Mar 2021, at 3:15 am, Graham Eddy ***@***.***> wrote:
> The self.mids is the in memory dictionary I mentioned above.
> BUT, when using loop() and QOS = 0, there appears to be a bug/feature where on_publish() is called before publish() is finished. This drove me crazy for a while. So, in on_publish(), I have to check if an entry in self.mids even exists…
>
i haven’t struck this
now i have struck this for the first time - damn! the following sequence clearly shows the publish request made, the on_publish callback called and clear the pending_pubreqs entry (which has not been stored there yet), then the code following the publish request carry on, where it decides not to put the entry in because is_published is true by then - so no harm done in the sense that the outcome is that the entry is no longer there
2021-03-18 17:25:33 DEBUG mqttgw|[Publisher|test/foo0|0|False]>>on_dataset(dataset='1 qos=0 2021-03-18 17:25:33',encode=True)
2021-03-18 17:25:33 DEBUG mqttgw|[Publisher|test/foo0|0|False] encoded payload='"1 qos=0 2021-03-18 17:25:33"'
2021-03-18 17:25:33 DEBUG mqttgw|MQTT Sending PUBLISH (d0, q0, r0, m2), 'b'test/foo0'', ... (29 bytes)
2021-03-18 17:25:33 DEBUG mqttgw|>>_on_publish(client,userdata,mid=2)
2021-03-18 17:25:33 DEBUG mqttgw|forget publish keyval=0318172528:2
2021-03-18 17:25:33 WARNING mqttgw|forget publish request 'b'0318172528:2'' failed: not found
2021-03-18 17:25:33 DEBUG mqttgw|[Publisher|test/foo0|0|False] publish result rc=0 mid=2 is_pub=True
2021-03-18 17:25:33 DEBUG mqttgw|[Publisher|test/foo0|0|False] publish immediate
looks like the thread doing the publish() gave up cpu after the command output and before returning after completion, to the loop thread doing the callbacks such as on_publish, which when finished the callback gave up the cpu back to the publish thread again to do is_published() - which correctly returned true, and we did the correct behaviour of *not* insert the entry
if the loop thread did not call the on_publish callback at that time, or if the publish thread did not give up the cpu at that time, then when we reach is_publish it will return false. correct behaviour then is to insert the entry, and eventually the loop thread will call on_publish and clear it
conclusion: after publish(), test is_published(). if true, do not persist the request, and in on_publish allow for it being called early and the entry not be present or remove it if present. if false, persist the request, and on_publish *must* find and remove the entry. looks like this is what you already coded
[i had to sort out in my mind *why* it was correct to do that and any consequences] i will demote the log.warning in on_publish on missing the key to a ‘tracing > 1’ log.debug...
|
Beta Was this translation helpful? Give feedback.
-
(i should declare that i wrote my own gateway - and continue to tinker with it - as part of learning exercise as i move to an mqtt backbone. weewx has been my starting point, the first large component. i'm at arm's length from your code, figuring i could learn more by hitting issues and asking question than by parroting code.)
i was thinking about reliable publishing - and reliable reception - for rain readings. being cumulative, missing a message leads to an inaccurate total
my reading of the documentation first pointed to the time between issuing publish() and receiving on_publish() as when outgoing message was queued in memory on client side. then i found that in 'info' returned by publish(), if info.is_published() is true then the message has already been sent, so the exposure interval was a little smaller. i was caching the publish request when info.is_published() was false and purging from cache on matching on_request() and this seemed reliable (caveat: no stress testing under heavy load attempted).
i was thinking of making the cache persistent so that it could be replayed on startup (if clean_session = false, obviously). this would be needful for qos=2, not for qos=0, haven't thought about qos=1
without looking at mosquitto source code, a quick 'od -c /var/lib/mosquitto.db' makes it obvious the broker stores clientid/request in this way (presumably for retained values), using something like dbm, so i was thinking of doing this for my persistent cache
a big uncertainty for me is the granularity of execution. i am using loop_start() so relying on a thread to invoke all paho callbacks (like on_message). do you know if these callbacks are executed as a whole unit - not interleaved with other callbacks? i'm concerned about the possibility of a callback's view of the state of things being out of sync (a bit later) than the actual state, and what impact that might have
(i just saw a [weewx-users] question re MQTTSubscribe go past. from this i see you are using weewx's restx framework. i am avoiding weewx frameworks because i intend re-using my classes across non-weewx apps)
Beta Was this translation helpful? Give feedback.
All reactions