Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
renan061 committed Aug 16, 2024
1 parent 54c0204 commit 3f3830c
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 35 deletions.
41 changes: 24 additions & 17 deletions offchain/authority-claimer/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ pub trait BrokerListener: Debug {
pub enum BrokerListenerError {
#[snafu(display("broker error"))]
BrokerError { source: BrokerError },

#[snafu(display("no applications configured"))]
NoApplicationsConfigured,
}

// ------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -121,9 +118,7 @@ impl MultidappBrokerListener {

// Gets the dapps from the broker.
let dapps = self.broker.get_dapps().await.context(BrokerSnafu)?;
if dapps.is_empty() {
return Err(BrokerListenerError::NoApplicationsConfigured);
}
assert!(!dapps.is_empty());
tracing::info!(
"Got the following dapps from key \"{}\": {:?}",
rollups_events::DAPPS_KEY,
Expand Down Expand Up @@ -161,13 +156,18 @@ impl MultidappBrokerListener {
Ok(())
}

async fn fill_buffer(&mut self) -> Result<(), BrokerListenerError> {
let streams_and_events: Vec<_> = self
// Returns true if it succeeded in filling the buffer and false otherwise.
async fn fill_buffer(&mut self) -> Result<bool, BrokerListenerError> {
let streams_and_events = self
.broker
.consume_blocking_from_multiple_streams(self.streams.clone())
.await
.context(BrokerSnafu)?;
.context(BrokerSnafu);
if let Err(BrokerError::FailedToConsume) = streams_and_events {
return false;
}

let streams_and_events = streams_and_events?;
for (stream, event) in streams_and_events {
// Updates the last-consumed-id from the stream.
let replaced = self.streams.insert(stream.clone(), event.id);
Expand All @@ -177,7 +177,7 @@ impl MultidappBrokerListener {
assert!(replaced.is_none());
}

Ok(())
Ok((true))
}
}

Expand All @@ -190,7 +190,13 @@ impl BrokerListener for MultidappBrokerListener {

tracing::trace!("Waiting for a claim");
if self.buffer.is_empty() {
self.fill_buffer().await?;
loop {
if self.fill_buffer().await? {
break;
} else {
self.update_streams().await?;
}
}
}

let buffer = self.buffer.clone();
Expand All @@ -217,7 +223,7 @@ mod tests {
RollupsClaim, RollupsClaimsStream, Url,
};

use crate::listener::{BrokerListener, BrokerListenerError};
use crate::listener::BrokerListener;

use super::{DefaultBrokerListener, MultidappBrokerListener};

Expand Down Expand Up @@ -494,12 +500,13 @@ mod tests {
let mut epochs = vec![0; dapps.len()];
let indexes = vec![0, 1, 2];
multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await;
let result = listener.listen().await;

let thread = tokio::spawn(async move {
let _ = listener.listen().await;
unreachable!();
});
let result = tokio::time::timeout(Duration::from_secs(3), thread).await;
assert!(result.is_err());
assert_eq!(
BrokerListenerError::NoApplicationsConfigured.to_string(),
result.unwrap_err().to_string()
);
}

#[tokio::test]
Expand Down
51 changes: 33 additions & 18 deletions offchain/rollups-events/src/broker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,8 @@ impl Broker {

/// Consume the next event from one of the streams.
///
/// This function blocks until a new event is available in one of the streams,
/// and retries whenever a timeout happens instead of returning an error.
/// This function blocks until a new event is available in one of the streams.
/// It timeouts with BrokerError::FailedToConsume.
///
/// To consume the first event for a stream, `last_consumed_id[...]` should be `INITIAL_ID`.
#[tracing::instrument(level = "trace", skip_all)]
Expand All @@ -352,26 +352,23 @@ impl Broker {
let (streams, last_consumed_ids): (Vec<_>, Vec<_>) =
streams.into_iter().map(identity).unzip();

loop {
let result = self
._consume_blocking_from_multiple_streams(
&streams,
&last_consumed_ids,
)
.await;
let result = self
._consume_blocking_from_multiple_streams(
&streams,
&last_consumed_ids,
)
.await;

if let Err(BrokerError::ConsumeTimeout) = result {
tracing::trace!("consume timed out, retrying");
} else {
return result;
}
if let Err(BrokerError::ConsumeTimeout) = result {
Err(BrokerError::FailedToConsume)
} else {
result
}
}

/// Gets the dapp addresses.
#[tracing::instrument(level = "trace", skip_all)]
pub async fn get_dapps(&mut self) -> Result<Vec<Address>, BrokerError> {
retry(self.backoff.clone(), || async {
pub async fn _get_dapps(&mut self) -> Result<Vec<Address>, BrokerError> {
let reply = retry(self.backoff.clone(), || async {
tracing::trace!(key = DAPPS_KEY, "getting key");
let reply: Vec<String> =
self.connection.clone().smembers(DAPPS_KEY).await?;
Expand All @@ -382,7 +379,25 @@ impl Broker {
Ok(dapp_addresses)
})
.await
.context(ConnectionSnafu)
.context(ConnectionSnafu)?;

if reply.is_empty() {
Err(BrokerError::ConsumeTimeout)
} else {
Ok(reply)
}
}

/// Gets the dapp addresses.
pub async fn get_dapps(&mut self) -> Result<Vec<Address>, BrokerError> {
loop {
let result = self._get_dapps().await;
if let Err(BrokerError::ConsumeTimeout) = result {
tracing::trace!("consume timed out, retrying");
} else {
return result;
}
}
}

/// Sets the dapp addresses.
Expand Down

0 comments on commit 3f3830c

Please sign in to comment.