Skip to content

Remove blocking async from overlay service loop #373

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions newsfragments/373.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove blocking async from overlay service loop via spawning new tasks for utp & content validation.
4 changes: 2 additions & 2 deletions trin-core/src/portalnet/overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub struct OverlayProtocol<TContentKey, TMetric, TValidator> {
}

impl<
TContentKey: OverlayContentKey + Send + Sync,
TContentKey: 'static + OverlayContentKey + Send + Sync,
TMetric: Metric + Send + Sync,
TValidator: 'static + Validator<TContentKey> + Send + Sync,
> OverlayProtocol<TContentKey, TMetric, TValidator>
Expand All @@ -135,7 +135,7 @@ where
storage: Arc<RwLock<PortalStorage>>,
data_radius: U256,
protocol: ProtocolId,
validator: TValidator,
validator: Arc<TValidator>,
) -> Self {
let kbuckets = Arc::new(RwLock::new(KBucketsTable::new(
discovery.local_enr().node_id().into(),
Expand Down
80 changes: 42 additions & 38 deletions trin-core/src/portalnet/overlay_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,11 @@ pub struct OverlayService<TContentKey, TMetric, TValidator> {
/// Metrics reporting component
metrics: Option<OverlayMetrics>,
/// Validator for overlay network content.
validator: TValidator,
validator: Arc<TValidator>,
}

impl<
TContentKey: OverlayContentKey + Send + Sync,
TContentKey: 'static + OverlayContentKey + Send + Sync,
TMetric: Metric + Send + Sync,
TValidator: 'static + Validator<TContentKey> + Send + Sync,
> OverlayService<TContentKey, TMetric, TValidator>
Expand All @@ -343,7 +343,7 @@ where
protocol: ProtocolId,
utp_listener_sender: UnboundedSender<UtpListenerRequest>,
enable_metrics: bool,
validator: TValidator,
validator: Arc<TValidator>,
query_timeout: Duration,
query_peer_timeout: Duration,
query_parallelism: usize,
Expand Down Expand Up @@ -486,7 +486,7 @@ where

// Perform background processing.
match response.response {
Ok(response) => self.process_response(response, active_request.destination, active_request.request, active_request.query_id).await,
Ok(response) => self.process_response(response, active_request.destination, active_request.request, active_request.query_id),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to put self.process_response in tokio::spawn block here and keep the await?. I think this way it will be more clear that the whole task is on a different thread?

Copy link
Collaborator Author

@njgheorghita njgheorghita Jul 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really without majorly complicating things. I spent probably too much time trying this approach. The problem is that since we have a loop continually spawning async tasks in parallel, these tasks each require a mutable reference to self. This requires introducing 'static lifetime on self, wrapping self in a arc/mutex, and even then I wasn't able to satisfy the compiler when we spawn an overlay service. @jacobkaufmann pointed out that if we only spawn what is strictly necessary (which only requires cloning two Arc<RwLock<>>s) in each new task, then all that complexity disappears.

So I will move those lines after conn.recv(&mut buf).await.unwrap() :

I see that it's unnecessary work. But, again we run into the problem of require a mutable ref to self if we want to use it inside the async task. I think that the cost of unnecessary work is worth it in this case rather than implementing the complexity of a mut ref for self as described above. But please lmk if you feel differently, or see an approach that I'm missing.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first glance, it looks like you could implement provide_requested_content where the storage is passed as an argument. I'm not sure to what extent it's an anti-pattern to pass Arc around whenever you run into an issue with self though.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand... For sure provide_request_content could decouple from self and accept an Arc for storage. Whether or not that's an anti-pattern... I don't have any strong instinct, but it seems to be the best option from what I have come across. I actually also use it in #376, to achieve the same goal (decoupling from self).

But, I'm not sure I understand the benefit here. Simply decoupling provide_requested_content à la your suggestion wouldn't allow us to move the tokio::spawn block up to self.process_response as @ogenev requested. Assuming, I'm understanding correctly, something like this will not fly for the reasons explained above.

// line 489ish
match response.response {
    Ok(response) => {
        tokio::spawn( async move {
            self.process_response(response, active_request.destination, active_request.request, active_request.query_id).await
        });
    },
    Err(error) => self.process_request_failure(response.request_id, active_request.destination, error),
}

Within process_response() we access multiple properties on self, so we'd need to arc<rwlock>>ify all of those, which imo is overkill

Another option would be to move the tokio::spawn() down a level, something like ....

            // line 1170-ish
            Response::Content(content) => {
                let find_content_request = match request {
                    Request::FindContent(find_content) => find_content,
                    _ => {
                        error!("Unable to process received content: Invalid request message.");
                        return;
                    }
                };
                // need to clone some arcs here
                tokio::spawn( async move {
                    OverlayService::<TContentKey, TMetric, TValidator>::process_content(content, source, find_content_request, query_id)
                });
            }
            Response::Accept(accept) => {
                let offer_request = match request {
                    Request::Offer(offer) => offer,
                    _ => {
                        error!("Unable to process received content: Invalid request message.");
                        return;
                    }
                };
                // need to clone some arcs here
                tokio::spawn( async move {
                    OverlayService::<TContentKey, TMetric, TValidator>::process_accept(accept, source, offer_request.content_keys) {
                });
            }

Now, as @jacobkaufmann suggests, process_accept() appears fairly minimal in terms of the arc-lock-clones it would need (self.protocol, self.utp_listener_tx & self.storage for self.provide_requested_content). However, process_content() needs to access...

  • self.process_received_content()
  • self.advance_find_content_query_with_enrs()
  • self.advance_find_content_query_with_content()
  • self.process_discovered_enrs()

This seems prohibitive to me (in terms of introduced complexity) in terms of justifying moving the tokio::spawn to a slightly more readable location. And the compromise of spawning tasks at different levels for process_accept() and process_content() doesn't seem ideal, and could lead to more confusion as one who is reading the code might not recognize that there is another async task spawned at a deeper level than the other.

Idk, that's kinda what I'm thinking here, but definitely open to pushback or something I'm misunderstanding.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@njgheorghita sorry for not clarifying. My comment was isolated w.r.t. provide_requested_content. I was not pushing for the call to process_response to move into a tokio task.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand your suggestion here... the call to self.provide_requested_content (on line 1227) is not async, and it seems to me like there's no reason to move it into the tokio::spawn({}) block on line 1232

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a concern about the possibility of hitting the database despite a failed connection, then the suggestion would be to move that call into the tokio task after the connection is established. This would require that we refactor provide_requested_content.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, spawning self with tokio::spawn seems to add a lot of complexity. I'm not sure what pattern is used in Rust to minimize such complexities but my feeling is that we are probably missing something.

Anyways, we can try Jacob's suggestion above and pass the storage as an argument to provide_requested_content and remove the self. This way we can add it inside tokio::spawn.

Err(error) => self.process_request_failure(response.request_id, active_request.destination, error),
}

Expand Down Expand Up @@ -1108,7 +1108,7 @@ where
}

/// Processes a response to an outgoing request from some source node.
async fn process_response(
fn process_response(
&mut self,
response: Response,
source: Enr,
Expand Down Expand Up @@ -1171,7 +1171,6 @@ where
}
};
self.process_content(content, source, find_content_request, query_id)
.await
}
Response::Accept(accept) => {
let offer_request = match request {
Expand All @@ -1182,18 +1181,15 @@ where
}
};

if let Err(err) = self
.process_accept(accept, source, offer_request.content_keys)
.await
{
if let Err(err) = self.process_accept(accept, source, offer_request.content_keys) {
error!("Error processing ACCEPT response in overlay service: {err}")
}
}
}
}

/// Process ACCEPT response
pub async fn process_accept(
pub fn process_accept(
&self,
response: Accept,
enr: Enr,
Expand Down Expand Up @@ -1227,17 +1223,22 @@ where
self.utp_listener_tx
.send(utp_request).map_err(|err| anyhow!("Unable to send Connect request to UtpListener when processing ACCEPT message: {err}"))?;

let mut conn = rx.await?;
// Handle STATE packet for SYN
let mut buf = [0; BUF_SIZE];
conn.recv(&mut buf).await?;
let storage = Arc::clone(&self.storage);
let response_clone = response.clone();

tokio::spawn(async move {
let mut conn = rx.await.unwrap();
// Handle STATE packet for SYN
let mut buf = [0; BUF_SIZE];
conn.recv(&mut buf).await.unwrap();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ogenev Are there any consequences I'm missing from moving this login into the spawned task?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be fine but I think we have to call provide_requested_content after we initiate the uTP handshake. If we can't establish an uTP connection with the remote node, it doesn't make sense to try to get the requested content from the db.

So I will move those lines after conn.recv(&mut buf).await.unwrap() :

let content_items = self.provide_requested_content(&response, content_keys_offered)?;

let content_payload = ContentPayloadList::new(content_items)
     .map_err(|err| anyhow!("Unable to build content payload: {err:?}"))?;


let content_items = self.provide_requested_content(&response, content_keys_offered)?;
let content_items =
Self::provide_requested_content(storage, &response_clone, content_keys_offered)
.expect("Unable to provide requested content for acceptor: {msg:?}");

let content_payload = ContentPayloadList::new(content_items)
.map_err(|err| anyhow!("Unable to build content payload: {err:?}"))?;
let content_payload = ContentPayloadList::new(content_items)
.expect("Unable to build content payload: {msg:?}");

tokio::spawn(async move {
// send the content to the acceptor over a uTP stream
if let Err(err) = conn.send_to(&content_payload.as_ssz_bytes()).await {
warn!("Error sending content {err}");
Expand Down Expand Up @@ -1297,7 +1298,7 @@ where
}

/// Processes a Content response.
async fn process_content(
fn process_content(
&mut self,
content: Content,
source: Enr,
Expand All @@ -1315,8 +1316,7 @@ where
self.protocol, id
),
Content::Content(content) => {
self.process_received_content(content.clone(), request)
.await;
self.process_received_content(content.clone(), request);
// TODO: Should we only advance the query if the content has been validated?
if let Some(query_id) = query_id {
self.advance_find_content_query_with_content(&query_id, source, content.into());
Expand All @@ -1332,7 +1332,7 @@ where
}
}

async fn process_received_content(&mut self, content: ByteList, request: FindContent) {
fn process_received_content(&mut self, content: ByteList, request: FindContent) {
let content_key = match TContentKey::try_from(request.content_key) {
Ok(val) => val,
Err(msg) => {
Expand All @@ -1344,19 +1344,23 @@ where
match should_store {
Ok(val) => {
if val {
// validate content before storing
if let Err(err) = self
.validator
.validate_content(&content_key, &content.to_vec())
.await
{
error!("Unable to validate received content: {err:?}");
return;
};
let validator = Arc::clone(&self.validator);
let storage = Arc::clone(&self.storage);
// Spawn task that validates content before storing.
// Allows for non-blocking requests to this/other overlay services.
tokio::spawn(async move {
if let Err(err) = validator
.validate_content(&content_key, &content.to_vec())
.await
{
warn!("Unable to validate received content: {err:?}");
return;
};

if let Err(err) = self.storage.write().store(&content_key, &content.into()) {
error!("Content received, but not stored: {err}")
}
if let Err(err) = storage.write().store(&content_key, &content.into()) {
error!("Content received, but not stored: {err}")
}
});
} else {
debug!(
"Content received, but not stored: Content is already stored or its distance falls outside current radius."
Expand Down Expand Up @@ -1456,7 +1460,7 @@ where

/// Provide the requested content key and content value for the acceptor
fn provide_requested_content(
&self,
storage: Arc<RwLock<PortalStorage>>,
accept_message: &Accept,
content_keys_offered: Vec<TContentKey>,
) -> anyhow::Result<Vec<ByteList>> {
Expand All @@ -1469,7 +1473,7 @@ where
.zip(content_keys_offered.iter())
{
if i == true {
match self.storage.read().get(key) {
match storage.read().get(key) {
Ok(content) => match content {
Some(content) => content_items.push(content.into()),
None => return Err(anyhow!("Unable to read offered content!")),
Expand Down Expand Up @@ -1993,7 +1997,7 @@ mod tests {
let (request_tx, request_rx) = mpsc::unbounded_channel();
let (response_tx, response_rx) = mpsc::unbounded_channel();
let metrics = None;
let validator = MockValidator {};
let validator = Arc::new(MockValidator {});

let service = OverlayService {
discovery,
Expand Down
8 changes: 4 additions & 4 deletions trin-core/src/types/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl Default for HeaderOracle {

impl HeaderOracle {
// Currently falls back to infura, to be updated to use canonical block indices network.
pub fn get_hash_at_height(&mut self, block_number: u64) -> anyhow::Result<String> {
pub fn get_hash_at_height(&self, block_number: u64) -> anyhow::Result<String> {
let hex_number = format!("0x:{:02X}", block_number);
let request = JsonRequest {
jsonrpc: "2.0".to_string(),
Expand Down Expand Up @@ -70,7 +70,7 @@ impl HeaderOracle {
Ok(infura_hash.to_owned())
}

pub fn get_header_by_hash(&mut self, block_hash: H256) -> anyhow::Result<Header> {
pub fn get_header_by_hash(&self, block_hash: H256) -> anyhow::Result<Header> {
let block_hash = format!("0x{:02X}", block_hash);
let request = JsonRequest {
jsonrpc: "2.0".to_string(),
Expand Down Expand Up @@ -101,7 +101,7 @@ impl HeaderOracle {
#[async_trait]
pub trait Validator<TContentKey> {
async fn validate_content(
&mut self,
&self,
content_key: &TContentKey,
content: &[u8],
) -> anyhow::Result<()>
Expand All @@ -115,7 +115,7 @@ pub struct MockValidator {}
#[async_trait]
impl Validator<IdentityContentKey> for MockValidator {
async fn validate_content(
&mut self,
&self,
_content_key: &IdentityContentKey,
_content: &[u8],
) -> anyhow::Result<()>
Expand Down
2 changes: 1 addition & 1 deletion trin-core/tests/overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn init_overlay(
let overlay_config = OverlayConfig::default();
// Ignore all uTP events
let (utp_listener_tx, _) = unbounded_channel::<UtpListenerRequest>();
let validator = MockValidator {};
let validator = Arc::new(MockValidator {});

OverlayProtocol::new(
overlay_config,
Expand Down
2 changes: 1 addition & 1 deletion trin-history/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl HistoryNetwork {
..Default::default()
};
let storage = Arc::new(RwLock::new(PortalStorage::new(storage_config).unwrap()));
let validator = ChainHistoryValidator { header_oracle };
let validator = Arc::new(ChainHistoryValidator { header_oracle });
let overlay = OverlayProtocol::new(
config,
discovery,
Expand Down
16 changes: 8 additions & 8 deletions trin-history/src/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct ChainHistoryValidator {
#[async_trait]
impl Validator<HistoryContentKey> for ChainHistoryValidator {
async fn validate_content(
&mut self,
&self,
content_key: &HistoryContentKey,
content: &[u8],
) -> anyhow::Result<()>
Expand Down Expand Up @@ -212,7 +212,7 @@ mod tests {
infura_url,
..HeaderOracle::default()
}));
let mut chain_history_validator = ChainHistoryValidator { header_oracle };
let chain_history_validator = ChainHistoryValidator { header_oracle };
let content_key = HistoryContentKey::BlockHeader(BlockHeader {
chain_id: 1,
block_hash: header.hash().0,
Expand Down Expand Up @@ -240,7 +240,7 @@ mod tests {
infura_url,
..HeaderOracle::default()
}));
let mut chain_history_validator = ChainHistoryValidator { header_oracle };
let chain_history_validator = ChainHistoryValidator { header_oracle };
let content_key = HistoryContentKey::BlockHeader(BlockHeader {
chain_id: 1,
block_hash: header.hash().0,
Expand Down Expand Up @@ -269,7 +269,7 @@ mod tests {
infura_url,
..HeaderOracle::default()
}));
let mut chain_history_validator = ChainHistoryValidator { header_oracle };
let chain_history_validator = ChainHistoryValidator { header_oracle };
let content_key = HistoryContentKey::BlockHeader(BlockHeader {
chain_id: 1,
block_hash: header.hash().0,
Expand All @@ -294,7 +294,7 @@ mod tests {
infura_url,
..HeaderOracle::default()
}));
let mut chain_history_validator = ChainHistoryValidator { header_oracle };
let chain_history_validator = ChainHistoryValidator { header_oracle };
let content_key = block_14764013_body_key();

chain_history_validator
Expand Down Expand Up @@ -327,7 +327,7 @@ mod tests {
infura_url,
..HeaderOracle::default()
}));
let mut chain_history_validator = ChainHistoryValidator { header_oracle };
let chain_history_validator = ChainHistoryValidator { header_oracle };
let content_key = block_14764013_body_key();

chain_history_validator
Expand All @@ -348,7 +348,7 @@ mod tests {
infura_url,
..HeaderOracle::default()
}));
let mut chain_history_validator = ChainHistoryValidator { header_oracle };
let chain_history_validator = ChainHistoryValidator { header_oracle };
let content_key = block_14764013_receipts_key();

chain_history_validator
Expand Down Expand Up @@ -379,7 +379,7 @@ mod tests {
infura_url,
..HeaderOracle::default()
}));
let mut chain_history_validator = ChainHistoryValidator { header_oracle };
let chain_history_validator = ChainHistoryValidator { header_oracle };
let content_key = block_14764013_receipts_key();

chain_history_validator
Expand Down
4 changes: 2 additions & 2 deletions trin-state/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ impl StateNetwork {
let trie = EthTrie::new(Arc::new(triedb));

let storage = Arc::new(RwLock::new(PortalStorage::new(storage_config).unwrap()));
let validator = StateValidator {
let validator = Arc::new(StateValidator {
header_oracle: HeaderOracle::default(),
};
});
let config = OverlayConfig {
bootnode_enrs: portal_config.bootnode_enrs.clone(),
enable_metrics: portal_config.enable_metrics,
Expand Down
2 changes: 1 addition & 1 deletion trin-state/src/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct StateValidator {
#[async_trait]
impl Validator<StateContentKey> for StateValidator {
async fn validate_content(
&mut self,
&self,
_content_key: &StateContentKey,
_content: &[u8],
) -> anyhow::Result<()>
Expand Down