From 79f0ca6a56e5bf21ec1279fabcb9a1374ff955c6 Mon Sep 17 00:00:00 2001 From: stefan-mysten <135084671+stefan-mysten@users.noreply.github.com> Date: Mon, 30 Sep 2024 18:10:00 -0700 Subject: [PATCH] Add more stream functions --- crates/sui-graphql-client/src/lib.rs | 172 +++++++++++++++++- .../src/query_types/events.rs | 2 +- .../src/query_types/object.rs | 8 +- .../src/query_types/transaction.rs | 2 +- 4 files changed, 170 insertions(+), 14 deletions(-) diff --git a/crates/sui-graphql-client/src/lib.rs b/crates/sui-graphql-client/src/lib.rs index 07cc610b6..1d3f65d4b 100644 --- a/crates/sui-graphql-client/src/lib.rs +++ b/crates/sui-graphql-client/src/lib.rs @@ -6,6 +6,7 @@ pub mod faucet; pub mod query_types; +use async_stream::try_stream; use base64ct::Encoding; use query_types::ActiveValidatorsArgs; use query_types::ActiveValidatorsQuery; @@ -322,8 +323,8 @@ impl Client { pub async fn coins( &self, owner: Address, - after: Option<&str>, - before: Option<&str>, + after: Option, + before: Option, first: Option, last: Option, coin_type: Option<&str>, @@ -359,16 +360,17 @@ impl Client { pub fn coins_stream<'a>( &'a self, owner: Address, - coin_type: Option<&'a str>, + coin_type: Option, ) -> Pin> + 'a>> { - Box::pin(async_stream::try_stream! { + let coin_type = coin_type.unwrap_or_else(|| "0x2::coin::Coin".to_string()); + Box::pin(try_stream! { let mut after = None; loop { let response = self.objects( - after.as_deref(), + after, None, Some(ObjectFilter { - type_: Some(coin_type.unwrap_or("0x2::coin::Coin")), + type_: Some(&coin_type), owner: Some(owner), object_ids: None, object_keys: None, @@ -554,6 +556,35 @@ impl Client { } } + pub async fn events_stream<'a>( + &'a self, + after: Option, + before: Option, + filter: Option, + first: Option, + last: Option, + ) -> Pin> + 'a>> { + Box::pin(try_stream! { + let mut after = after; + loop { + let response = self.events(filter.clone(), after, before.clone(), first, last).await?; + if let Some(page) = response { + for event in page.data { + yield event; + } + + if let Some(end_cursor) = page.page_info.end_cursor { + after = Some(end_cursor); + } else { + break; + } + } else { + break; + } + } + }) + } + // =========================================================================== // Objects API // =========================================================================== @@ -615,8 +646,8 @@ impl Client { /// ``` pub async fn objects( &self, - after: Option<&str>, - before: Option<&str>, + after: Option, + before: Option, filter: Option>, first: Option, last: Option, @@ -659,6 +690,36 @@ impl Client { } } + /// Stream objects. + pub async fn objects_stream<'a>( + &'a self, + after: Option, + before: Option, + filter: Option>, + first: Option, + last: Option, + ) -> Pin> + 'a>> { + Box::pin(try_stream! { + let mut after = after; + loop { + let response = self.objects(after, before.clone(), filter.clone(), first, last).await?; + if let Some(page) = response { + for object in page.data { + yield object; + } + + if let Some(end_cursor) = page.page_info.end_cursor { + after = Some(end_cursor); + } else { + break; + } + } else { + break; + } + } + }) + } + /// Return the object's bcs content [`Vec`] based on the provided [`Address`]. pub async fn object_bcs(&self, object_id: Address) -> Result>, Error> { let operation = ObjectQuery::build(ObjectQueryArgs { @@ -789,12 +850,43 @@ impl Client { Ok(None) } } + + /// Stream of transactions based on the provided filters. + pub async fn transactions_stream<'a>( + &'a self, + after: Option, + before: Option, + first: Option, + last: Option, + filter: Option, + ) -> Pin> + 'a>> { + Box::pin(try_stream! { + let mut after = after; + loop { + let response = self.transactions(after, before.clone(), first, last, filter.clone()).await?; + if let Some(page) = response { + for tx in page.data { + yield tx; + } + + if let Some(end_cursor) = page.page_info.end_cursor { + after = Some(end_cursor); + } else { + break; + } + } else { + break; + } + } + }) + } } #[cfg(test)] mod tests { use futures::StreamExt; + use crate::query_types::{EventFilter, ObjectFilter, TransactionsFilter}; use crate::Client; use crate::DEVNET_HOST; use crate::LOCAL_HOST; @@ -1072,4 +1164,68 @@ mod tests { ); } } + + // TODO remove ignore after PR #20 is merged + #[tokio::test] + #[ignore] + async fn test_events_stream() { + let client = Client::new_testnet(); + let ef = EventFilter { + emitting_module: None, + event_type: None, + sender: None, + transaction_digest: None, + }; + + let mut stream = client + .events_stream(None, None, Some(ef), None, Some(10)) + .await; + + while let Some(result) = stream.next().await { + assert!(result.is_ok()) + } + } + + #[tokio::test] + async fn test_objects_stream() { + let client = Client::new_testnet(); + let obj_filter = ObjectFilter { + type_: Some("0x2::sui::SUI"), + owner: None, + object_ids: None, + object_keys: None, + }; + + let mut stream = client + .objects_stream(None, None, Some(obj_filter), None, Some(10)) + .await; + + while let Some(result) = stream.next().await { + assert!(result.is_ok()) + } + } + + // TODO: remove the ignore after we fix tx bcs stuff + #[tokio::test] + #[ignore] + async fn test_transactions_stream() { + let client = Client::new_testnet(); + let tx_filter = TransactionsFilter { + function: None, + kind: None, + at_checkpoint: None, + before_checkpoint: None, + changed_object: None, + input_object: None, + recv_address: None, + }; + + let mut stream = client + .transactions_stream(None, None, None, Some(10), Some(tx_filter)) + .await; + + while let Some(result) = stream.next().await { + assert!(result.is_ok()) + } + } } diff --git a/crates/sui-graphql-client/src/query_types/events.rs b/crates/sui-graphql-client/src/query_types/events.rs index 4bb70b6f6..3006b9b9a 100644 --- a/crates/sui-graphql-client/src/query_types/events.rs +++ b/crates/sui-graphql-client/src/query_types/events.rs @@ -47,7 +47,7 @@ pub struct EventConnection { pub nodes: Vec, } -#[derive(cynic::InputObject, Debug)] +#[derive(cynic::InputObject, Debug, Clone)] #[cynic(schema = "rpc", graphql_type = "EventFilter")] pub struct EventFilter { pub emitting_module: Option, diff --git a/crates/sui-graphql-client/src/query_types/object.rs b/crates/sui-graphql-client/src/query_types/object.rs index 6f8b99559..c7952738c 100644 --- a/crates/sui-graphql-client/src/query_types/object.rs +++ b/crates/sui-graphql-client/src/query_types/object.rs @@ -37,8 +37,8 @@ pub struct ObjectQueryArgs { #[derive(cynic::QueryVariables, Debug)] pub struct ObjectsQueryArgs<'a> { - pub after: Option<&'a str>, - pub before: Option<&'a str>, + pub after: Option, + pub before: Option, pub filter: Option>, pub first: Option, pub last: Option, @@ -54,7 +54,7 @@ pub struct Object { pub bcs: Option, } -#[derive(cynic::InputObject, Debug)] +#[derive(cynic::InputObject, Debug, Clone)] #[cynic(schema = "rpc", graphql_type = "ObjectFilter")] pub struct ObjectFilter<'a> { #[cynic(rename = "type")] @@ -64,7 +64,7 @@ pub struct ObjectFilter<'a> { pub object_keys: Option>, } -#[derive(cynic::InputObject, Debug)] +#[derive(cynic::InputObject, Debug, Clone)] #[cynic(schema = "rpc", graphql_type = "ObjectKey")] pub struct ObjectKey { pub object_id: Address, diff --git a/crates/sui-graphql-client/src/query_types/transaction.rs b/crates/sui-graphql-client/src/query_types/transaction.rs index 2e71d253d..66d4cbdae 100644 --- a/crates/sui-graphql-client/src/query_types/transaction.rs +++ b/crates/sui-graphql-client/src/query_types/transaction.rs @@ -72,7 +72,7 @@ pub enum TransactionBlockKindInput { ProgrammableTx, } -#[derive(cynic::InputObject, Debug)] +#[derive(cynic::InputObject, Debug, Clone)] #[cynic(schema = "rpc", graphql_type = "TransactionBlockFilter")] pub struct TransactionsFilter { pub function: Option,