-
Notifications
You must be signed in to change notification settings - Fork 146
Remove blocking async from overlay service loop #373
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Remove blocking async from overlay service loop via spawning new tasks for utp & content validation. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -317,11 +317,11 @@ pub struct OverlayService<TContentKey, TMetric, TValidator> { | |
/// Metrics reporting component | ||
metrics: Option<OverlayMetrics>, | ||
/// Validator for overlay network content. | ||
validator: TValidator, | ||
validator: Arc<TValidator>, | ||
} | ||
|
||
impl< | ||
TContentKey: OverlayContentKey + Send + Sync, | ||
TContentKey: 'static + OverlayContentKey + Send + Sync, | ||
TMetric: Metric + Send + Sync, | ||
TValidator: 'static + Validator<TContentKey> + Send + Sync, | ||
> OverlayService<TContentKey, TMetric, TValidator> | ||
|
@@ -343,7 +343,7 @@ where | |
protocol: ProtocolId, | ||
utp_listener_sender: UnboundedSender<UtpListenerRequest>, | ||
enable_metrics: bool, | ||
validator: TValidator, | ||
validator: Arc<TValidator>, | ||
query_timeout: Duration, | ||
query_peer_timeout: Duration, | ||
query_parallelism: usize, | ||
|
@@ -486,7 +486,7 @@ where | |
|
||
// Perform background processing. | ||
match response.response { | ||
Ok(response) => self.process_response(response, active_request.destination, active_request.request, active_request.query_id).await, | ||
Ok(response) => self.process_response(response, active_request.destination, active_request.request, active_request.query_id), | ||
Err(error) => self.process_request_failure(response.request_id, active_request.destination, error), | ||
} | ||
|
||
|
@@ -1108,7 +1108,7 @@ where | |
} | ||
|
||
/// Processes a response to an outgoing request from some source node. | ||
async fn process_response( | ||
fn process_response( | ||
&mut self, | ||
response: Response, | ||
source: Enr, | ||
|
@@ -1171,7 +1171,6 @@ where | |
} | ||
}; | ||
self.process_content(content, source, find_content_request, query_id) | ||
.await | ||
} | ||
Response::Accept(accept) => { | ||
let offer_request = match request { | ||
|
@@ -1182,18 +1181,15 @@ where | |
} | ||
}; | ||
|
||
if let Err(err) = self | ||
.process_accept(accept, source, offer_request.content_keys) | ||
.await | ||
{ | ||
if let Err(err) = self.process_accept(accept, source, offer_request.content_keys) { | ||
error!("Error processing ACCEPT response in overlay service: {err}") | ||
} | ||
} | ||
} | ||
} | ||
|
||
/// Process ACCEPT response | ||
pub async fn process_accept( | ||
pub fn process_accept( | ||
&self, | ||
response: Accept, | ||
enr: Enr, | ||
|
@@ -1227,17 +1223,22 @@ where | |
self.utp_listener_tx | ||
.send(utp_request).map_err(|err| anyhow!("Unable to send Connect request to UtpListener when processing ACCEPT message: {err}"))?; | ||
|
||
let mut conn = rx.await?; | ||
// Handle STATE packet for SYN | ||
let mut buf = [0; BUF_SIZE]; | ||
conn.recv(&mut buf).await?; | ||
let storage = Arc::clone(&self.storage); | ||
let response_clone = response.clone(); | ||
|
||
tokio::spawn(async move { | ||
let mut conn = rx.await.unwrap(); | ||
// Handle STATE packet for SYN | ||
let mut buf = [0; BUF_SIZE]; | ||
conn.recv(&mut buf).await.unwrap(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ogenev Are there any consequences I'm missing from moving this login into the spawned task? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should be fine but I think we have to call So I will move those lines after let content_items = self.provide_requested_content(&response, content_keys_offered)?;
let content_payload = ContentPayloadList::new(content_items)
.map_err(|err| anyhow!("Unable to build content payload: {err:?}"))?; |
||
|
||
let content_items = self.provide_requested_content(&response, content_keys_offered)?; | ||
let content_items = | ||
Self::provide_requested_content(storage, &response_clone, content_keys_offered) | ||
.expect("Unable to provide requested content for acceptor: {msg:?}"); | ||
|
||
let content_payload = ContentPayloadList::new(content_items) | ||
.map_err(|err| anyhow!("Unable to build content payload: {err:?}"))?; | ||
let content_payload = ContentPayloadList::new(content_items) | ||
.expect("Unable to build content payload: {msg:?}"); | ||
|
||
tokio::spawn(async move { | ||
// send the content to the acceptor over a uTP stream | ||
if let Err(err) = conn.send_to(&content_payload.as_ssz_bytes()).await { | ||
warn!("Error sending content {err}"); | ||
|
@@ -1297,7 +1298,7 @@ where | |
} | ||
|
||
/// Processes a Content response. | ||
async fn process_content( | ||
fn process_content( | ||
&mut self, | ||
content: Content, | ||
source: Enr, | ||
|
@@ -1315,8 +1316,7 @@ where | |
self.protocol, id | ||
), | ||
Content::Content(content) => { | ||
self.process_received_content(content.clone(), request) | ||
.await; | ||
self.process_received_content(content.clone(), request); | ||
// TODO: Should we only advance the query if the content has been validated? | ||
if let Some(query_id) = query_id { | ||
self.advance_find_content_query_with_content(&query_id, source, content.into()); | ||
|
@@ -1332,7 +1332,7 @@ where | |
} | ||
} | ||
|
||
async fn process_received_content(&mut self, content: ByteList, request: FindContent) { | ||
fn process_received_content(&mut self, content: ByteList, request: FindContent) { | ||
let content_key = match TContentKey::try_from(request.content_key) { | ||
Ok(val) => val, | ||
Err(msg) => { | ||
|
@@ -1344,19 +1344,23 @@ where | |
match should_store { | ||
Ok(val) => { | ||
if val { | ||
// validate content before storing | ||
if let Err(err) = self | ||
.validator | ||
.validate_content(&content_key, &content.to_vec()) | ||
.await | ||
{ | ||
error!("Unable to validate received content: {err:?}"); | ||
return; | ||
}; | ||
let validator = Arc::clone(&self.validator); | ||
let storage = Arc::clone(&self.storage); | ||
// Spawn task that validates content before storing. | ||
// Allows for non-blocking requests to this/other overlay services. | ||
tokio::spawn(async move { | ||
if let Err(err) = validator | ||
.validate_content(&content_key, &content.to_vec()) | ||
.await | ||
{ | ||
warn!("Unable to validate received content: {err:?}"); | ||
return; | ||
}; | ||
|
||
if let Err(err) = self.storage.write().store(&content_key, &content.into()) { | ||
error!("Content received, but not stored: {err}") | ||
} | ||
if let Err(err) = storage.write().store(&content_key, &content.into()) { | ||
error!("Content received, but not stored: {err}") | ||
} | ||
}); | ||
} else { | ||
debug!( | ||
"Content received, but not stored: Content is already stored or its distance falls outside current radius." | ||
|
@@ -1456,7 +1460,7 @@ where | |
|
||
/// Provide the requested content key and content value for the acceptor | ||
fn provide_requested_content( | ||
&self, | ||
storage: Arc<RwLock<PortalStorage>>, | ||
accept_message: &Accept, | ||
content_keys_offered: Vec<TContentKey>, | ||
) -> anyhow::Result<Vec<ByteList>> { | ||
|
@@ -1469,7 +1473,7 @@ where | |
.zip(content_keys_offered.iter()) | ||
{ | ||
if i == true { | ||
match self.storage.read().get(key) { | ||
match storage.read().get(key) { | ||
Ok(content) => match content { | ||
Some(content) => content_items.push(content.into()), | ||
None => return Err(anyhow!("Unable to read offered content!")), | ||
|
@@ -1993,7 +1997,7 @@ mod tests { | |
let (request_tx, request_rx) = mpsc::unbounded_channel(); | ||
let (response_tx, response_rx) = mpsc::unbounded_channel(); | ||
let metrics = None; | ||
let validator = MockValidator {}; | ||
let validator = Arc::new(MockValidator {}); | ||
|
||
let service = OverlayService { | ||
discovery, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to put
self.process_response
intokio::spawn
block here and keep theawait
?. I think this way it will be more clear that the whole task is on a different thread?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really without majorly complicating things. I spent probably too much time trying this approach. The problem is that since we have a loop continually spawning async tasks in parallel, these tasks each require a mutable reference to
self
. This requires introducing'static
lifetime onself
, wrappingself
in a arc/mutex, and even then I wasn't able to satisfy the compiler when we spawn an overlay service. @jacobkaufmann pointed out that if we only spawn what is strictly necessary (which only requires cloning twoArc<RwLock<>>
s) in each new task, then all that complexity disappears.I see that it's unnecessary work. But, again we run into the problem of require a mutable ref to
self
if we want to use it inside the async task. I think that the cost of unnecessary work is worth it in this case rather than implementing the complexity of amut ref
forself
as described above. But please lmk if you feel differently, or see an approach that I'm missing.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At first glance, it looks like you could implement
provide_requested_content
where the storage is passed as an argument. I'm not sure to what extent it's an anti-pattern to passArc
around whenever you run into an issue withself
though.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand... For sure
provide_request_content
could decouple fromself
and accept anArc
forstorage
. Whether or not that's an anti-pattern... I don't have any strong instinct, but it seems to be the best option from what I have come across. I actually also use it in #376, to achieve the same goal (decoupling fromself
).But, I'm not sure I understand the benefit here. Simply decoupling
provide_requested_content
à la your suggestion wouldn't allow us to move thetokio::spawn
block up toself.process_response
as @ogenev requested. Assuming, I'm understanding correctly, something like this will not fly for the reasons explained above.Within
process_response()
we access multiple properties onself
, so we'd need toarc<rwlock>>
ify all of those, which imo is overkillAnother option would be to move the
tokio::spawn()
down a level, something like ....Now, as @jacobkaufmann suggests,
process_accept()
appears fairly minimal in terms of the arc-lock-clones it would need (self.protocol
,self.utp_listener_tx
&self.storage
forself.provide_requested_content
). However,process_content()
needs to access...self.process_received_content()
self.advance_find_content_query_with_enrs()
self.advance_find_content_query_with_content()
self.process_discovered_enrs()
This seems prohibitive to me (in terms of introduced complexity) in terms of justifying moving the
tokio::spawn
to a slightly more readable location. And the compromise of spawning tasks at different levels forprocess_accept()
andprocess_content()
doesn't seem ideal, and could lead to more confusion as one who is reading the code might not recognize that there is another async task spawned at a deeper level than the other.Idk, that's kinda what I'm thinking here, but definitely open to pushback or something I'm misunderstanding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@njgheorghita sorry for not clarifying. My comment was isolated w.r.t.
provide_requested_content
. I was not pushing for the call toprocess_response
to move into a tokio task.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand your suggestion here... the call to
self.provide_requested_content
(on line 1227) is not async, and it seems to me like there's no reason to move it into thetokio::spawn({})
block on line 1232There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a concern about the possibility of hitting the database despite a failed connection, then the suggestion would be to move that call into the tokio task after the connection is established. This would require that we refactor
provide_requested_content
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, spawning self with
tokio::spawn
seems to add a lot of complexity. I'm not sure what pattern is used in Rust to minimize such complexities but my feeling is that we are probably missing something.Anyways, we can try Jacob's suggestion above and pass the storage as an argument to
provide_requested_content
and remove the self. This way we can add it insidetokio::spawn
.