-
Notifications
You must be signed in to change notification settings - Fork 146
Remove blocking async from overlay service loop #373
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
324784d
to
37f1bd7
Compare
let mut conn = rx.await.unwrap(); | ||
// Handle STATE packet for SYN | ||
let mut buf = [0; BUF_SIZE]; | ||
conn.recv(&mut buf).await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ogenev Are there any consequences I'm missing from moving this login into the spawned task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be fine but I think we have to call provide_requested_content
after we initiate the uTP handshake. If we can't establish an uTP connection with the remote node, it doesn't make sense to try to get the requested content from the db.
So I will move those lines after conn.recv(&mut buf).await.unwrap()
:
let content_items = self.provide_requested_content(&response, content_keys_offered)?;
let content_payload = ContentPayloadList::new(content_items)
.map_err(|err| anyhow!("Unable to build content payload: {err:?}"))?;
let mut conn = rx.await.unwrap(); | ||
// Handle STATE packet for SYN | ||
let mut buf = [0; BUF_SIZE]; | ||
conn.recv(&mut buf).await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be fine but I think we have to call provide_requested_content
after we initiate the uTP handshake. If we can't establish an uTP connection with the remote node, it doesn't make sense to try to get the requested content from the db.
So I will move those lines after conn.recv(&mut buf).await.unwrap()
:
let content_items = self.provide_requested_content(&response, content_keys_offered)?;
let content_payload = ContentPayloadList::new(content_items)
.map_err(|err| anyhow!("Unable to build content payload: {err:?}"))?;
@@ -486,7 +487,7 @@ where | |||
|
|||
// Perform background processing. | |||
match response.response { | |||
Ok(response) => self.process_response(response, active_request.destination, active_request.request, active_request.query_id).await, | |||
Ok(response) => self.process_response(response, active_request.destination, active_request.request, active_request.query_id), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to put self.process_response
in tokio::spawn
block here and keep the await
?. I think this way it will be more clear that the whole task is on a different thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really without majorly complicating things. I spent probably too much time trying this approach. The problem is that since we have a loop continually spawning async tasks in parallel, these tasks each require a mutable reference to self
. This requires introducing 'static
lifetime on self
, wrapping self
in a arc/mutex, and even then I wasn't able to satisfy the compiler when we spawn an overlay service. @jacobkaufmann pointed out that if we only spawn what is strictly necessary (which only requires cloning two Arc<RwLock<>>
s) in each new task, then all that complexity disappears.
So I will move those lines after conn.recv(&mut buf).await.unwrap() :
I see that it's unnecessary work. But, again we run into the problem of require a mutable ref to self
if we want to use it inside the async task. I think that the cost of unnecessary work is worth it in this case rather than implementing the complexity of a mut ref
for self
as described above. But please lmk if you feel differently, or see an approach that I'm missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At first glance, it looks like you could implement provide_requested_content
where the storage is passed as an argument. I'm not sure to what extent it's an anti-pattern to pass Arc
around whenever you run into an issue with self
though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand... For sure provide_request_content
could decouple from self
and accept an Arc
for storage
. Whether or not that's an anti-pattern... I don't have any strong instinct, but it seems to be the best option from what I have come across. I actually also use it in #376, to achieve the same goal (decoupling from self
).
But, I'm not sure I understand the benefit here. Simply decoupling provide_requested_content
à la your suggestion wouldn't allow us to move the tokio::spawn
block up to self.process_response
as @ogenev requested. Assuming, I'm understanding correctly, something like this will not fly for the reasons explained above.
// line 489ish
match response.response {
Ok(response) => {
tokio::spawn( async move {
self.process_response(response, active_request.destination, active_request.request, active_request.query_id).await
});
},
Err(error) => self.process_request_failure(response.request_id, active_request.destination, error),
}
Within process_response()
we access multiple properties on self
, so we'd need to arc<rwlock>>
ify all of those, which imo is overkill
Another option would be to move the tokio::spawn()
down a level, something like ....
// line 1170-ish
Response::Content(content) => {
let find_content_request = match request {
Request::FindContent(find_content) => find_content,
_ => {
error!("Unable to process received content: Invalid request message.");
return;
}
};
// need to clone some arcs here
tokio::spawn( async move {
OverlayService::<TContentKey, TMetric, TValidator>::process_content(content, source, find_content_request, query_id)
});
}
Response::Accept(accept) => {
let offer_request = match request {
Request::Offer(offer) => offer,
_ => {
error!("Unable to process received content: Invalid request message.");
return;
}
};
// need to clone some arcs here
tokio::spawn( async move {
OverlayService::<TContentKey, TMetric, TValidator>::process_accept(accept, source, offer_request.content_keys) {
});
}
Now, as @jacobkaufmann suggests, process_accept()
appears fairly minimal in terms of the arc-lock-clones it would need (self.protocol
, self.utp_listener_tx
& self.storage
for self.provide_requested_content
). However, process_content()
needs to access...
self.process_received_content()
self.advance_find_content_query_with_enrs()
self.advance_find_content_query_with_content()
self.process_discovered_enrs()
This seems prohibitive to me (in terms of introduced complexity) in terms of justifying moving the tokio::spawn
to a slightly more readable location. And the compromise of spawning tasks at different levels for process_accept()
and process_content()
doesn't seem ideal, and could lead to more confusion as one who is reading the code might not recognize that there is another async task spawned at a deeper level than the other.
Idk, that's kinda what I'm thinking here, but definitely open to pushback or something I'm misunderstanding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@njgheorghita sorry for not clarifying. My comment was isolated w.r.t. provide_requested_content
. I was not pushing for the call to process_response
to move into a tokio task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand your suggestion here... the call to self.provide_requested_content
(on line 1227) is not async, and it seems to me like there's no reason to move it into the tokio::spawn({})
block on line 1232
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a concern about the possibility of hitting the database despite a failed connection, then the suggestion would be to move that call into the tokio task after the connection is established. This would require that we refactor provide_requested_content
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, spawning self with tokio::spawn
seems to add a lot of complexity. I'm not sure what pattern is used in Rust to minimize such complexities but my feeling is that we are probably missing something.
Anyways, we can try Jacob's suggestion above and pass the storage as an argument to provide_requested_content
and remove the self. This way we can add it inside tokio::spawn
.
584e88a
to
66e9f6e
Compare
66e9f6e
to
9d8438c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Requesting some minor changes here, mainly the use of a synchronous locking mechanism. I'm also against the CI change that removes checksum checks. I think we should resolve this independently.
@@ -68,6 +68,7 @@ use ssz::Encode; | |||
use ssz_types::{BitList, VariableList}; | |||
use thiserror::Error; | |||
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; | |||
use tokio::sync::Mutex; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm in favor of a synchronous mutex here (parking_lot::Mutex
). An asynchronous mutex would only be necessary if we need to hold a lock across await
points, which does not appear to be the case. We can acquire the validator lock and call validate in a single statement.
Further, I guess the validate method on the validator requires &mut self
, so a RwLock
does not make sense here, but I'm not sure why that method takes &mut self
in the first place. That's a separate issue though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be wrong here, but re: "asynchronous mutex would only be necessary if we need to hold a lock across await points"
... isn't that exactly what we're doing here?
I'm not sure why that method takes &mut self in the first place
- agreed, but updating the self mut ref on validator and changing the Mutex
to a RwLock
doesn't change that since validator.validate_content().await
is async, and a future created by an async block is not Send
, which is a required trait by tokio::spawn
. lmk if I'm missing something about this situation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, so I was wrong above on that. But on the second point, if we change the validator to require &self
instead of &mut self
, then we can just wrap the validator in an Arc
(without a lock) which is Send
.
Looking at the ChainHistoryValidator
, I don't see why we need a &mut self
receiver.
Ok(val) => val, | ||
Err(msg) => { | ||
warn!("Unable to provide requested content for acceptor: {msg:?}"); | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Given that we are in a tokio task context here, we can call expect
because tokio catches panics within a spawned task. This would clean up the match
logic here. Although, I'm not sure if this goes against idiomatic Rust. Ultimately I'm okay with whichever you prefer but I thought I would call that out.
if let Err(err) = | ||
lock.validate_content(&content_key, &content.to_vec()).await | ||
{ | ||
error!("Unable to validate received content: {err:?}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think we can go with warn!
here because this is not necessarily erroneous behavior from trin
. It might indicate that some peer is giving us invalid data.
.circleci/config.yml
Outdated
# todo: Remove --ignore-checksums flag | ||
command: choco install rustup.install --ignore-checksums |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer that we do not push this to the main branch. If we do this at all, it should be an isolated PR.
9d8438c
to
57d734a
Compare
What was wrong?
Fixes #342
async
expressions in the handlers of mainOverlayService
loop block the loop from continuing while they are awaitedHow was it fixed?
Spawn new tasks for
async
expressions.To-Do