-
-
Notifications
You must be signed in to change notification settings - Fork 107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Delayed messages #102
base: main
Are you sure you want to change the base?
Delayed messages #102
Conversation
Codecov Report
@@ Coverage Diff @@
## main #102 +/- ##
==========================================
- Coverage 94.59% 91.91% -2.69%
==========================================
Files 12 12
Lines 481 581 +100
==========================================
+ Hits 455 534 +79
- Misses 26 47 +21
Continue to review full report at Codecov.
|
2890e7d
to
29072ed
Compare
@jodal the PR is ready for the initial review now |
My initial thought upon reading the patch is that monotonic timers should be used. Right now it looks like a collision course for problems around daylight saving time changes and leap seconds. |
Isn't system timer always monotonic and not affected by timezones/daylight savings etc? |
If that was true there wouldn't be any point for |
My bad, I thought |
I should have looked this up initially: https://docs.python.org/3/library/time.html?highlight=monotonic#time.time
You definitely want |
29072ed
to
190998f
Compare
@djmattyg007 , updated the PR, thanks for pointing out this problem. It was pretty easy, there were just two places where |
190998f
to
7659f5c
Compare
ac7574e
to
b9c16d0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of these comments are more nits and open questions. I would not run off and try and address everything/anything I pointed out but perhaps see what @jodal thinks about these before spending time on them.
Perhaps doing a wider documentation pass explaining the architecture and how things fit together makes sense after this is merged?
return len(self.timestamps) == 0 | ||
|
||
def add(self, envelope): | ||
idx = bisect.bisect(self.timestamps, envelope.timestamp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this use heapq
for a priority queue instead of using bisect like this? If there is a reason for preferring it as I would add it as a comment to help future readers og the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea, I admit I wasn't aware of heapq
's existence.
if len(self.timestamps) == 0: | ||
return None | ||
else: | ||
return max(self.timestamps[0] - time.monotonic(), 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Being able to sub in a different time source could be useful for tests. Probably not required for this initial PR, but worth keeping in mind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what kind of API is needed for that, given that it has to match the call in _envelope.py
. Perhaps it will be easier to just mock time.monotonic
in tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might want to read up on the clock pattern:
https://agilewarrior.wordpress.com/2017/03/03/clock-pattern/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So how is that different from mocking time.monotonic
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mocking time.monotonic() in particular may not be feasible. A dedicated clock object can provide more flexibility in this regard, as well as a nicer interface to manage the clock in tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess that's true. But there's still a problem of similarly affecting the timeouted methods of Queue
or Event
.
@@ -98,6 +124,11 @@ def _create_actor_inbox(): | |||
"""Internal method for implementors of new actor types.""" | |||
raise NotImplementedError("Use a subclass of Actor") | |||
|
|||
@staticmethod | |||
def _queue_empty_exception(): | |||
"""Internal method for implementors of new actor types.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this return a type or an instance, could we document this?
# Take all the messages out of the inbox and put them | ||
# in our internal inbox where they're sorted by timestamps | ||
try: | ||
envelope = self.actor_inbox.get(timeout=next_event_in) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens when timeout=0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same as with any other timeout, really. It either returns an Envelope
(if there's one in the queue), or goes into the except self.__queue_empty_exception
branch.
next_event_in = self.__timed_inbox.next_event_in() | ||
|
||
# Check if there's something to be processed right now. | ||
if next_event_in > 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought next event in can be zero? Is the check here so that we skip any future messages that have delays left and only run those that are ready. Perhaps makes this more clear with a comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather, if there are no messages to execute right now (that is, we received some messages, but they were all delayed), we go into the waiting mode again. If there is something to execute, next_event_in
would be 0. I'm not sure how can I make it clearer in the comment; could you propose an alternative?
def __setattr__(self, name, value): | ||
if name == "actor_ref" or name.startswith("_"): | ||
return super().__setattr__(name, value) | ||
message_factory = self._message_builder.setattr(name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading this line I almost assumed it was changing an attr on the builder, but I assume it's creating a builder that has the job of setting an attr? Could we make this more clear with better naming or comments?
@@ -263,9 +343,10 @@ class CallableProxy: | |||
proxy.do_work.defer() | |||
""" | |||
|
|||
def __init__(self, actor_ref, attr_path): | |||
def __init__(self, actor_ref, attr_path, delay=0): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type, units and documentation for delay?
|
||
def ask(self, message, block=True, timeout=None): | ||
def ask(self, message, block=True, timeout=None, delay=0): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document, type, units and semantics of delay in ask
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also should we allow this to be called with None
and is that treated the same as zero? Or a negative delay? Or a duration from datetime
?
Should we support an at=None
API passing in a time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also should we allow this to be called with
None
and is that treated the same as zero?
That's possible.
Or a negative delay?
That already works, although the resulting effect is that the message effectively gets a higher priority (because it will be put at the start of the TimedInbox
). Not sure if that's a useful thing to allow or not.
Or a duration from
datetime
?
Not sure what you mean here.
Should we support an
at=None
API passing in a time?
That's kinda dangerous in view of the earlier discussion about time.monotonic()
. We'll have to translate at
into the monotonic time, and then the user adjusts the clock, or daylight savings happen, and the execution time will change.
actor_ref.tell( | ||
{"command": "callback", "callback": lambda: lst.append(1)}, delay=0.5 | ||
) | ||
event_set = event.wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this have some largish timeout just in case, so the test exists even if this breaks at some point?
}, | ||
delay=delay, | ||
) | ||
event_set = event.wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this have some largish timeout just in case, so the test exists even if this breaks at some point?
This PR implements delayed message delivery, useful for creating event loops which can still be terminated at a moment's notice. Possibly fixes #44. Also fixes an unrelated bug in
eventlet.Event
that causedevent.wait()
to result in anAttributeError
.Supporting delay in
ask
/tell
is pretty straightforward, but doing it forproxy()
requires some additional indirection. It is in the commit by itself now, because I understand that the approach I used may be questionable; I also hasn't changed the docs/tests because there is a probability the interface will be changed. Currently one would use delayed proxies asAll the introspection is only done once when
proxy_base
is created.This PR has some intersection of functionality with PR #95, but is simpler and gives actors more control of the process (specifically, it is more convenient for event loop organization).