Skip to content

Commit

Permalink
Update to transport 1.0.25
Browse files Browse the repository at this point in the history
  • Loading branch information
kalabukdima committed Oct 2, 2024
1 parent 6a9192d commit d91ff27
Show file tree
Hide file tree
Showing 13 changed files with 202 additions and 171 deletions.
321 changes: 173 additions & 148 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ tracing-futures = { version = "0.2.5", features = ["tokio", "futures-03"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] }
uuid = { version = "1", features = ["v4", "fast-rng"] }

contract-client = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.0.4" }
subsquid-messages = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.1.0", features = ["semver"] }
subsquid-network-transport = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.0.12", features = ["gateway", "metrics"] }
sqd-contract-client = { git = "https://github.com/subsquid/sqd-network.git", version = "1.0.7" }
sqd-messages = { git = "https://github.com/subsquid/sqd-network.git", version = "1.1.4", features = ["semver"] }
sqd-network-transport = { git = "https://github.com/subsquid/sqd-network.git", version = "1.0.25", features = ["gateway", "metrics"] }
2 changes: 1 addition & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use serde::Deserialize;
use serde_with::{serde_as, DurationSeconds};
use std::time::Duration;
use std::{collections::HashMap, net::SocketAddr};
use subsquid_network_transport::{PeerId, TransportArgs};
use sqd_network_transport::{PeerId, TransportArgs};

use crate::types::DatasetId;

Expand Down
2 changes: 1 addition & 1 deletion src/controller/stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{future::Future, sync::Arc};

use futures::FutureExt;
use subsquid_messages::{query_result, Range};
use sqd_messages::{query_result, Range};
use tokio::task::JoinSet;
use tracing::{instrument, Instrument};

Expand Down
4 changes: 2 additions & 2 deletions src/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ use axum::{
routing::{get, post},
Extension, RequestExt, Router,
};
use contract_client::PeerId;
use sqd_contract_client::PeerId;
use futures::StreamExt;
use itertools::Itertools;
use prometheus_client::registry::Registry;
use subsquid_messages::query_result;
use sqd_messages::query_result;

use crate::{
cli::Config,
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async fn main() -> anyhow::Result<()> {
let config = Arc::new(args.config);
let mut metrics_registry = Default::default();
metrics::register_metrics(&mut metrics_registry);
subsquid_network_transport::metrics::register_metrics(&mut metrics_registry);
sqd_network_transport::metrics::register_metrics(&mut metrics_registry);
let cancellation_token = CancellationToken::new();

let network_client =
Expand Down
2 changes: 1 addition & 1 deletion src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use prometheus_client::{
metrics::{counter::Counter, family::Family, gauge::Gauge},
registry::Registry,
};
use subsquid_messages::query_result;
use sqd_messages::query_result;

use crate::types::DatasetId;

Expand Down
22 changes: 14 additions & 8 deletions src/network/client.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::{collections::HashMap, sync::Arc, time::Duration};

use contract_client::PeerId;
use futures::{Stream, StreamExt};
use parking_lot::Mutex;
use serde::Serialize;
use subsquid_messages::{query_result, Ping, Query, QueryResult};
use subsquid_network_transport::{
GatewayConfig, GatewayEvent, GatewayTransportHandle, P2PTransportBuilder, QueueFull,
TransportArgs,
use sqd_contract_client::{Client as ContractClient, PeerId};
use sqd_messages::{query_result, Ping, Query, QueryResult};
use sqd_network_transport::{
get_agent_info, AgentInfo, GatewayConfig, GatewayEvent, GatewayTransportHandle, P2PTransportBuilder, QueueFull, TransportArgs
};
use tokio::{sync::oneshot, time::Instant};
use tokio_util::sync::CancellationToken;
Expand All @@ -30,7 +29,7 @@ pub struct NetworkClient {
incoming_events: UseOnce<Box<dyn Stream<Item = GatewayEvent> + Send + Unpin + 'static>>,
transport_handle: GatewayTransportHandle,
network_state: Mutex<NetworkState>,
contract_client: Box<dyn contract_client::Client>,
contract_client: Box<dyn ContractClient>,
tasks: Mutex<HashMap<QueryId, QueryTask>>,
dataset_storage: StorageClient,
dataset_update_interval: Duration,
Expand All @@ -49,7 +48,8 @@ impl NetworkClient {
config: Arc<Config>,
) -> anyhow::Result<NetworkClient> {
let dataset_storage = StorageClient::new(args.rpc.network)?;
let transport_builder = P2PTransportBuilder::from_cli(args).await?;
let agent_into = get_agent_info!();
let transport_builder = P2PTransportBuilder::from_cli(args, agent_into).await?;
let contract_client = transport_builder.contract_client();
let mut gateway_config = GatewayConfig::new(logs_collector);
gateway_config.query_config.request_timeout = config.transport_timeout;
Expand Down Expand Up @@ -137,9 +137,15 @@ impl NetworkClient {
GatewayEvent::QueryResult { peer_id, result } => {
self.handle_query_result(peer_id, result)
.unwrap_or_else(|e| {
tracing::error!("Error handling query: {e:?}");
tracing::error!("Error handling query result: {e:?}");
});
}
GatewayEvent::QueryDropped { query_id } => {
// No good way to handle this yet, just drop the response sender
if self.tasks.lock().remove_entry(&query_id).is_none() {
tracing::error!("Not expecting response for query {query_id}");
}
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/network/priorities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
sync::Arc,
};

use contract_client::PeerId;
use sqd_contract_client::PeerId;
use static_assertions::const_assert;

use crate::cli::Config;
Expand Down
4 changes: 2 additions & 2 deletions src/network/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use crate::cli::Config;
use crate::metrics;
use crate::types::DatasetId;
use serde::Serialize;
use subsquid_messages::RangeSet;
use subsquid_network_transport::PeerId;
use sqd_messages::RangeSet;
use sqd_network_transport::PeerId;

use super::priorities::WorkersPool;

Expand Down
2 changes: 1 addition & 1 deletion src/network/storage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;

use contract_client::Network;
use sqd_contract_client::Network;
use parking_lot::{Mutex, RwLock};
use serde::{de::DeserializeOwned, Deserialize};

Expand Down
2 changes: 1 addition & 1 deletion src/types/request.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{str::FromStr, time::Duration};

use subsquid_messages::Range;
use sqd_messages::Range;

use super::DatasetId;

Expand Down
2 changes: 1 addition & 1 deletion src/types/request_error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use axum::http::StatusCode;
use subsquid_messages::query_result;
use sqd_messages::query_result;

#[derive(thiserror::Error, Debug)]
pub enum RequestError {
Expand Down

0 comments on commit d91ff27

Please sign in to comment.