Skip to content

Commit

Permalink
refactor: Wallet owns its update stream
Browse files Browse the repository at this point in the history
  • Loading branch information
tvolk131 committed Oct 3, 2024
1 parent 02875cc commit 95b8c44
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 20 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ nostr-sdk = "0.35.0"
palette = "0.7.6"
secp256k1 = { version = "0.29.1", features = ["global-context"] }
tokio = "1.40.0"
tokio-stream = "0.1.16"
tracing-subscriber = "0.3.18"

[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ impl App {
.map(Message::UpdateWalletView);

while let Some(msg) = stream.next().await {
println!("Wallet update!");
yield msg;
}
},
Expand Down
58 changes: 38 additions & 20 deletions src/fedimint.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::fmt::Display;
use std::pin::Pin;
use std::{
collections::{BTreeMap, HashMap},
path::PathBuf,
Expand Down Expand Up @@ -73,45 +72,64 @@ pub struct Wallet {
derivable_secret: DerivableSecret,
clients: Arc<Mutex<HashMap<FederationId, ClientHandle>>>,
fedimint_clients_data_dir: PathBuf,
view_update_receiver: tokio::sync::watch::Receiver<WalletView>,
view_update_task: tokio::task::JoinHandle<()>,
}

impl Drop for Wallet {
fn drop(&mut self) {
self.view_update_task.abort();
}
}

impl Wallet {
pub fn new(xprivkey: Xpriv, network: Network, project_dirs: &ProjectDirs) -> Self {
Self {
derivable_secret: get_derivable_secret(&xprivkey, network),
clients: Arc::new(Mutex::new(HashMap::new())),
fedimint_clients_data_dir: project_dirs.data_dir().join(FEDIMINT_CLIENTS_DATA_DIR_NAME),
}
}
let (sender, receiver) = tokio::sync::watch::channel(WalletView {
federations: BTreeMap::new(),
});

// TODO: Optimize this. Repeated polling is not ideal.
pub fn get_update_stream(
&self,
) -> Pin<Box<dyn iced::futures::Stream<Item = WalletView> + Send>> {
let clients = self.clients.clone();
Box::pin(async_stream::stream! {
let clients = Arc::new(Mutex::new(HashMap::new()));

let clients_clone = clients.clone();
let view_update_task = tokio::spawn(async move {
let mut last_state_or = None;

// TODO: Optimize this. Repeated polling is not ideal.
loop {
let current_state = Self::get_current_state(clients.lock().await).await;
let current_state = Self::get_current_state(clients_clone.lock().await).await;

// Ignoring clippy lint here since the `match` provides better clarity.
#[allow(clippy::option_if_let_else)]
let has_changed = match &last_state_or {
Some(last_state) => {
&current_state != last_state
}
Some(last_state) => &current_state != last_state,
// If there was no last state, the state has changed.
None => true,
};

if has_changed {
last_state_or = Some(current_state.clone());
yield current_state;

// If all receivers have been dropped, stop the task.
if sender.send(current_state).is_err() {
break;
}
}

tokio::time::sleep(std::time::Duration::from_secs(1)).await;
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
})
});

Self {
derivable_secret: get_derivable_secret(&xprivkey, network),
clients,
fedimint_clients_data_dir: project_dirs.data_dir().join(FEDIMINT_CLIENTS_DATA_DIR_NAME),
view_update_receiver: receiver,
view_update_task,
}
}

pub fn get_update_stream(&self) -> tokio_stream::wrappers::WatchStream<WalletView> {
tokio_stream::wrappers::WatchStream::new(self.view_update_receiver.clone())
}

pub async fn connect_to_joined_federations(&self) -> anyhow::Result<()> {
Expand Down

0 comments on commit 95b8c44

Please sign in to comment.