Skip to content

Commit

Permalink
Merge pull request #184 from Concordium/lma/fix/retry
Browse files Browse the repository at this point in the history
Restart client
  • Loading branch information
lassemand authored Aug 17, 2024
2 parents 814be40 + be7a413 commit f6bfdf0
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 26 deletions.
2 changes: 1 addition & 1 deletion notification-server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion notification-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
authors = ["Concordium AG [email protected]"]
edition = "2021"
name = "notification-server"
version = "0.1.7"
version = "0.1.8"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand Down
141 changes: 117 additions & 24 deletions notification-server/src/bin/service.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
use anyhow::Context;
use backoff::ExponentialBackoff;
use clap::Parser;
use concordium_rust_sdk::v2::{Client, Endpoint};
use concordium_rust_sdk::v2::{Client, Endpoint, FinalizedBlockInfo};
use dotenv::dotenv;
use futures::Stream;
use gcp_auth::CustomServiceAccount;
use log::{error, info};
use notification_server::{
database::DatabaseConnection, google_cloud::GoogleCloud, processor::process,
database::DatabaseConnection,
google_cloud::{GoogleCloud, NotificationError},
processor::process,
};
use std::{path::PathBuf, time::Duration};
use tokio::time::sleep;
use tonic::{
codegen::{http, tokio_stream::StreamExt},
transport::ClientTlsConfig,
};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

#[derive(Debug, Parser)]
struct Args {
Expand Down Expand Up @@ -65,12 +71,104 @@ struct Args {
default_value_t = 180 // 3 minutes
)]
google_client_max_interval_time_secs: u64,
#[clap(
long = "log-level",
default_value = "info",
help = "The maximum log level. Possible values are: `trace`, `debug`, `info`, `warn`, and \
`error`.",
env = "LOG_LEVEL"
)]
log_level: tracing_subscriber::filter::LevelFilter,
}

const DATABASE_RETRY_DELAY: Duration = Duration::from_secs(1);

async fn traverse_chain(
database_connection: &DatabaseConnection,
concordium_client: &mut Client,
gcloud: &GoogleCloud<CustomServiceAccount>,
mut receiver: impl Stream<Item = Result<FinalizedBlockInfo, tonic::Status>> + Unpin,
) {
while let Some(v) = receiver.next().await {
let finalized_block = match v {
Ok(v) => v,
Err(e) => {
error!("Error while reading block: {:?}", e);
continue;
}
};
info!(
"Processed block {} at height {}",
finalized_block.block_hash, finalized_block.height
);
let block_hash = finalized_block.block_hash;
let transactions = match concordium_client
.get_block_transaction_events(block_hash)
.await
{
Ok(transactions) => transactions.response,
Err(err) => {
error!("Error occurred while reading transactions: {:?}", err);
continue;
}
};
for result in process(transactions).await.iter() {
info!(
"Sending notification to account {} with type {:?}",
result.address, result.transaction_type
);

let devices = loop {
match database_connection
.prepared
.get_devices_from_account(result.address)
.await
{
Ok(devices) => break devices,
Err(err) => {
error!(
"Error retrieving devices for account {}: {:?}. Retrying...",
result.address, err
);
sleep(DATABASE_RETRY_DELAY).await;
}
}
};

for device in devices
.iter()
.filter(|device| device.preferences.contains(&result.transaction_type))
{
if let Err(err) = gcloud
.send_push_notification(&device.device_token, result.to_owned())
.await
{
if err == NotificationError::UnregisteredError {
info!("Device {} is unregistered", device.device_token);
} else {
error!("Error occurred while sending notification: {:?}", err);
}
}
}
}
}
}

#[tokio::main(flavor = "multi_thread")]
async fn main() -> anyhow::Result<()> {
dotenv().ok();
let args = Args::parse();

let log_filter = tracing_subscriber::filter::Targets::new()
.with_target(module_path!(), args.log_level)
.with_target("tower_http", args.log_level)
.with_target("tokio_postgres", args.log_level);

tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(log_filter)
.init();

let endpoint = if args
.endpoint
.uri()
Expand Down Expand Up @@ -112,28 +210,23 @@ async fn main() -> anyhow::Result<()> {
let database_connection = DatabaseConnection::create(args.db_connection).await?;

let mut concordium_client = Client::new(endpoint).await?;
let mut receiver = concordium_client.get_finalized_blocks().await?;
while let Some(v) = receiver.next().await {
let block_hash = v?.block_hash;
println!("Blockhash: {:?}", block_hash);
let transactions = concordium_client
.get_block_transaction_events(block_hash)
.await?
.response;
for result in process(transactions).await.iter() {
println!("address: {}, amount: {}", result.address, result.amount);
for device in database_connection
.prepared
.get_devices_from_account(result.address)
.await?
.iter()
.filter(|device| device.preferences.contains(&result.transaction_type))
{
gcloud
.send_push_notification(&device.device_token, result.to_owned())
.await?;

loop {
info!("Establishing stream of finalized blocks");
let receiver = match concordium_client.get_finalized_blocks().await {
Ok(receiver) => receiver,
Err(err) => {
info!("Error occurred while reading finalized blocks: {:?}", err);
tokio::time::sleep(Duration::from_secs(10)).await;
continue;
}
}
};
traverse_chain(
&database_connection,
&mut concordium_client,
&gcloud,
receiver,
)
.await;
}
Ok(())
}

0 comments on commit f6bfdf0

Please sign in to comment.