diff --git a/.github/workflows/network-docker.yml b/.github/workflows/network-docker.yml index 90b67bf..41980b2 100644 --- a/.github/workflows/network-docker.yml +++ b/.github/workflows/network-docker.yml @@ -103,3 +103,29 @@ jobs: subsquid/pings-collector:${{ inputs.tag }} cache-from: type=gha cache-to: type=gha,mode=max + + - name: Build peer checker + uses: docker/build-push-action@v5 + with: + context: . + target: peer-checker + load: true + tags: subsquid/peer-checker:test + cache-from: type=gha + cache-to: type=gha,mode=max + + - name: Get peer checker version + run: echo "PEER_CHECKER_VERSION=$(docker run --rm subsquid/peer-checker:test peer-checker --version | cut -d ' ' -f2)" >> $GITHUB_ENV + + - name: Build & publish peer checker + uses: docker/build-push-action@v5 + with: + context: . + platforms: linux/amd64,linux/arm/v7,linux/arm64/v8,linux/386 + target: peer-checker + push: true + tags: | + subsquid/peer-checker:${{ env.PEER_CHECKER_VERSION }} + subsquid/peer-checker:${{ inputs.tag }} + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/Cargo.lock b/Cargo.lock index 93c0d90..0801051 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1689,8 +1689,8 @@ dependencies = [ "serde", "serde_bytes", "serde_repr", - "subsquid-messages", - "subsquid-network-transport", + "sqd-messages", + "sqd-network-transport", "test-with", "tokio", ] @@ -1735,23 +1735,6 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" -[[package]] -name = "contract-client" -version = "1.0.5" -source = "git+https://github.com/subsquid/subsquid-network.git#69c00a42ddcfe0f35b4352c773cbef1228dfcb89" -dependencies = [ - "async-trait", - "clap", - "ethers", - "libp2p", - "log", - "serde", - "thiserror", - "tokio", - "tokio-stream", - "url", -] - [[package]] name = "core-foundation" version = "0.9.4" @@ -4361,13 +4344,13 @@ dependencies = [ "anyhow", "clap", "collector-utils", - "contract-client", "env_logger", "futures", "log", "serde_json", - "subsquid-messages", - "subsquid-network-transport", + "sqd-contract-client", + "sqd-messages", + "sqd-network-transport", "tikv-jemallocator", "tokio", ] @@ -4636,7 +4619,6 @@ dependencies = [ "axum 0.7.5", "base64 0.22.1", "clap", - "contract-client", "dashmap", "derive-enum-from-into", "env_logger", @@ -4662,8 +4644,9 @@ dependencies = [ "serde_yaml", "sha2", "sha3", - "subsquid-messages", - "subsquid-network-transport", + "sqd-contract-client", + "sqd-messages", + "sqd-network-transport", "tikv-jemallocator", "tokio", "url", @@ -5043,6 +5026,23 @@ dependencies = [ "hmac", ] +[[package]] +name = "peer-checker" +version = "1.0.0" +dependencies = [ + "anyhow", + "axum 0.7.5", + "clap", + "env_logger", + "log", + "serde_json", + "sqd-contract-client", + "sqd-messages", + "sqd-network-transport", + "tikv-jemallocator", + "tokio", +] + [[package]] name = "pem" version = "1.1.1" @@ -5192,7 +5192,6 @@ dependencies = [ "clap", "clickhouse", "collector-utils", - "contract-client", "env_logger", "futures", "lazy_static", @@ -5203,8 +5202,9 @@ dependencies = [ "serde_bytes", "serde_json", "serde_repr", - "subsquid-messages", - "subsquid-network-transport", + "sqd-contract-client", + "sqd-messages", + "sqd-network-transport", "tikv-jemallocator", "tokio", "yaque", @@ -5469,7 +5469,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0f5d036824e4761737860779c906171497f6d55681139d8312388f8fe398922" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.12.4", +] + +[[package]] +name = "prost" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b2ecbe40f08db5c006b5764a2645f7f3f141ce756412ac9e1dd6087e6d32995" +dependencies = [ + "bytes", + "prost-derive 0.13.2", ] [[package]] @@ -5486,7 +5496,7 @@ dependencies = [ "once_cell", "petgraph", "prettyplease", - "prost", + "prost 0.12.4", "prost-types", "regex", "syn 2.0.68", @@ -5506,13 +5516,26 @@ dependencies = [ "syn 2.0.68", ] +[[package]] +name = "prost-derive" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acf0c195eebb4af52c752bec4f52f645da98b6e92077a04110c7f349477ae5ac" +dependencies = [ + "anyhow", + "itertools 0.11.0", + "proc-macro2", + "quote", + "syn 2.0.68", +] + [[package]] name = "prost-types" version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3235c33eb02c1f1e212abdbe34c78b264b038fb58ca612664343271e36e55ffe" dependencies = [ - "prost", + "prost 0.12.4", ] [[package]] @@ -6026,7 +6049,7 @@ dependencies = [ "libc", "prometheus", "router-controller", - "subsquid-messages", + "sqd-messages", "tokio", "tracing", "tracing-subscriber", @@ -6042,7 +6065,7 @@ dependencies = [ "parking_lot", "rand 0.8.5", "serde", - "subsquid-messages", + "sqd-messages", ] [[package]] @@ -6763,6 +6786,69 @@ dependencies = [ "der 0.7.9", ] +[[package]] +name = "sqd-contract-client" +version = "1.0.7" +source = "git+https://github.com/subsquid/subsquid-network.git#f75a811817fb017f34ab4e82c56e95a3eaa610a7" +dependencies = [ + "async-trait", + "clap", + "ethers", + "libp2p", + "log", + "serde", + "thiserror", + "tokio", + "tokio-stream", + "url", +] + +[[package]] +name = "sqd-messages" +version = "1.1.3" +source = "git+https://github.com/subsquid/subsquid-network.git#f75a811817fb017f34ab4e82c56e95a3eaa610a7" +dependencies = [ + "anyhow", + "hex", + "libp2p", + "prost 0.13.2", + "prost-build", + "semver", + "serde", + "sha3", +] + +[[package]] +name = "sqd-network-transport" +version = "1.0.20" +source = "git+https://github.com/subsquid/subsquid-network.git#f75a811817fb017f34ab4e82c56e95a3eaa610a7" +dependencies = [ + "anyhow", + "async-trait", + "bimap", + "clap", + "derivative", + "futures", + "futures-bounded", + "futures-core", + "lazy_static", + "libp2p", + "libp2p-connection-limits", + "libp2p-swarm-derive", + "log", + "lru", + "parking_lot", + "prometheus-client", + "prost 0.13.2", + "serde", + "serde_with", + "sqd-contract-client", + "sqd-messages", + "thiserror", + "tokio", + "tokio-util", +] + [[package]] name = "static_assertions" version = "1.1.0" @@ -6816,53 +6902,6 @@ dependencies = [ "syn 2.0.68", ] -[[package]] -name = "subsquid-messages" -version = "1.1.2" -source = "git+https://github.com/subsquid/subsquid-network.git#69c00a42ddcfe0f35b4352c773cbef1228dfcb89" -dependencies = [ - "anyhow", - "hex", - "libp2p", - "prost", - "prost-build", - "semver", - "serde", - "sha3", -] - -[[package]] -name = "subsquid-network-transport" -version = "1.0.18" -source = "git+https://github.com/subsquid/subsquid-network.git#69c00a42ddcfe0f35b4352c773cbef1228dfcb89" -dependencies = [ - "anyhow", - "async-trait", - "bimap", - "clap", - "contract-client", - "derivative", - "env_logger", - "futures", - "futures-bounded", - "futures-core", - "lazy_static", - "libp2p", - "libp2p-connection-limits", - "libp2p-swarm-derive", - "log", - "lru", - "parking_lot", - "prometheus-client", - "prost", - "serde", - "serde_with", - "subsquid-messages", - "thiserror", - "tokio", - "tokio-util", -] - [[package]] name = "subtle" version = "2.5.0" diff --git a/Cargo.toml b/Cargo.toml index 71d55ec..74db30b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,6 @@ members = ["crates/*"] resolver = "2" [workspace.dependencies] -contract-client = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.0.5" } -subsquid-messages = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.1.2" } -subsquid-network-transport = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.0.17" } +sqd-contract-client = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.0.7" } +sqd-messages = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.0.3" } +sqd-network-transport = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.0.20" } diff --git a/Dockerfile b/Dockerfile index 1466985..b0ed043 100644 --- a/Dockerfile +++ b/Dockerfile @@ -95,3 +95,16 @@ CMD ["pings-collector"] COPY crates/logs-collector/healthcheck.sh . RUN chmod +x ./healthcheck.sh HEALTHCHECK --interval=5s CMD ./healthcheck.sh + +FROM --platform=$BUILDPLATFORM network-base AS peer-checker + +COPY --from=network-builder /app/target/release/peer-checker /usr/local/bin/peer-checker + +ENV P2P_LISTEN_ADDRS="/ip4/0.0.0.0/udp/12345/quic-v1" +ENV BUFFER_DIR="/run" + +CMD ["peer-checker"] + +COPY crates/logs-collector/healthcheck.sh . +RUN chmod +x ./healthcheck.sh +HEALTHCHECK --interval=5s CMD ./healthcheck.sh diff --git a/crates/collector-utils/Cargo.toml b/crates/collector-utils/Cargo.toml index 2f48d22..7a2a3b8 100644 --- a/crates/collector-utils/Cargo.toml +++ b/crates/collector-utils/Cargo.toml @@ -14,9 +14,9 @@ serde = { version = "1.0.188", features = ["derive"] } serde_bytes = "0.11" serde_repr = "0.1" -subsquid-messages = { workspace = true } +sqd-messages = { workspace = true } [dev-dependencies] test-with = "0.13" -subsquid-network-transport = { workspace = true } +sqd-network-transport = { workspace = true } tokio = { version = "1", features = ["full"] } diff --git a/crates/collector-utils/src/storage.rs b/crates/collector-utils/src/storage.rs index d70c453..5a2f2ae 100644 --- a/crates/collector-utils/src/storage.rs +++ b/crates/collector-utils/src/storage.rs @@ -6,7 +6,7 @@ use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; -use subsquid_messages::{query_executed, InputAndOutput, Ping, Query, QueryExecuted, SizeAndHash}; +use sqd_messages::{query_executed, InputAndOutput, Ping, Query, QueryExecuted, SizeAndHash}; use crate::cli::ClickhouseArgs; use crate::timestamp_now_ms; @@ -303,9 +303,9 @@ impl Storage for ClickhouseStorage { #[cfg(test)] mod tests { - use subsquid_messages::signatures::SignedMessage; - use subsquid_messages::{InputAndOutput, Query, SizeAndHash}; - use subsquid_network_transport::{Keypair, PeerId}; + use sqd_messages::signatures::SignedMessage; + use sqd_messages::{InputAndOutput, Query, SizeAndHash}; + use sqd_network_transport::{Keypair, PeerId}; use super::*; diff --git a/crates/logs-collector/Cargo.toml b/crates/logs-collector/Cargo.toml index 63ea56f..261e262 100644 --- a/crates/logs-collector/Cargo.toml +++ b/crates/logs-collector/Cargo.toml @@ -12,9 +12,9 @@ log = "0.4" serde_json = "1" tokio = { version = "1", features = ["full"] } -contract-client = { workspace = true } -subsquid-messages = { workspace = true } -subsquid-network-transport = { workspace = true, features = ["logs-collector"] } +sqd-contract-client = { workspace = true } +sqd-messages = { workspace = true } +sqd-network-transport = { workspace = true, features = ["logs-collector"] } collector-utils = { path = "../collector-utils" } diff --git a/crates/logs-collector/src/cli.rs b/crates/logs-collector/src/cli.rs index 5261e4f..a0d4d39 100644 --- a/crates/logs-collector/src/cli.rs +++ b/crates/logs-collector/src/cli.rs @@ -1,6 +1,6 @@ use clap::Parser; use collector_utils::ClickhouseArgs; -use subsquid_network_transport::TransportArgs; +use sqd_network_transport::TransportArgs; #[derive(Parser)] #[command(version)] diff --git a/crates/logs-collector/src/collector.rs b/crates/logs-collector/src/collector.rs index c52b352..5b33559 100644 --- a/crates/logs-collector/src/collector.rs +++ b/crates/logs-collector/src/collector.rs @@ -2,9 +2,9 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use subsquid_messages::QueryExecuted; -use subsquid_network_transport::protocol::EPOCH_SEAL_TIMEOUT; -use subsquid_network_transport::PeerId; +use sqd_messages::QueryExecuted; +use sqd_network_transport::protocol::EPOCH_SEAL_TIMEOUT; +use sqd_network_transport::PeerId; use collector_utils::{timestamp_now_ms, QueryExecutedRow, Storage}; @@ -12,13 +12,13 @@ type SeqNo = u64; pub struct LogsCollector { storage: T, - contract_client: Arc, + contract_client: Arc, last_stored: HashMap, // Last sequence number & timestamp saved in storage for each worker buffered_logs: HashMap>, // Local buffer, persisted periodically } impl LogsCollector { - pub fn new(storage: T, contract_client: Arc) -> Self { + pub fn new(storage: T, contract_client: Arc) -> Self { Self { storage, contract_client, diff --git a/crates/logs-collector/src/main.rs b/crates/logs-collector/src/main.rs index 70ff585..2cf63d5 100644 --- a/crates/logs-collector/src/main.rs +++ b/crates/logs-collector/src/main.rs @@ -3,7 +3,7 @@ use std::time::Duration; use clap::Parser; use env_logger::Env; -use subsquid_network_transport::P2PTransportBuilder; +use sqd_network_transport::P2PTransportBuilder; use collector_utils::ClickhouseStorage; @@ -33,7 +33,7 @@ async fn main() -> anyhow::Result<()> { // Build P2P transport let transport_builder = P2PTransportBuilder::from_cli(args.transport).await?; - let contract_client: Arc = + let contract_client: Arc = transport_builder.contract_client().into(); let (incoming_messages, transport_handle) = transport_builder.build_logs_collector(Default::default())?; diff --git a/crates/logs-collector/src/server.rs b/crates/logs-collector/src/server.rs index 3172a48..df4fe0a 100644 --- a/crates/logs-collector/src/server.rs +++ b/crates/logs-collector/src/server.rs @@ -6,11 +6,11 @@ use futures::{Stream, StreamExt}; use tokio::signal::unix::{signal, SignalKind}; use tokio::sync::RwLock; -use contract_client::Client as ContractClient; -use subsquid_messages::{LogsCollected, QueryExecuted}; -use subsquid_network_transport::util::TaskManager; -use subsquid_network_transport::PeerId; -use subsquid_network_transport::{LogsCollectorEvent, LogsCollectorTransportHandle}; +use sqd_contract_client::Client as ContractClient; +use sqd_messages::{LogsCollected, QueryExecuted}; +use sqd_network_transport::util::TaskManager; +use sqd_network_transport::PeerId; +use sqd_network_transport::{LogsCollectorEvent, LogsCollectorTransportHandle}; use collector_utils::Storage; diff --git a/crates/network-scheduler/Cargo.toml b/crates/network-scheduler/Cargo.toml index 36a6fc2..4f92518 100644 --- a/crates/network-scheduler/Cargo.toml +++ b/crates/network-scheduler/Cargo.toml @@ -39,9 +39,9 @@ sha3 = "0.10" tokio = { version = "1", features = ["full"] } url = "2.5.0" -contract-client = { workspace = true } -subsquid-messages = { workspace = true, features = ["semver"] } -subsquid-network-transport = { workspace = true, features = ["scheduler", "metrics"] } +sqd-contract-client = { workspace = true } +sqd-messages = { workspace = true, features = ["semver"] } +sqd-network-transport = { workspace = true, features = ["scheduler", "metrics"] } [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = "0.6" diff --git a/crates/network-scheduler/src/cli.rs b/crates/network-scheduler/src/cli.rs index f3909af..c3ffab5 100644 --- a/crates/network-scheduler/src/cli.rs +++ b/crates/network-scheduler/src/cli.rs @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DurationSeconds}; use tokio::sync::OnceCell; -use subsquid_network_transport::TransportArgs; +use sqd_network_transport::TransportArgs; static CONFIG: OnceCell = OnceCell::const_new(); diff --git a/crates/network-scheduler/src/data_chunk.rs b/crates/network-scheduler/src/data_chunk.rs index 7d8c2b8..5d0cd9e 100644 --- a/crates/network-scheduler/src/data_chunk.rs +++ b/crates/network-scheduler/src/data_chunk.rs @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize}; use serde_with::{hex::Hex, serde_as}; use sha3::{Digest, Sha3_256}; -use subsquid_messages::{AssignedChunk, DatasetChunks, Range, WorkerAssignment, WorkerState}; +use sqd_messages::{AssignedChunk, DatasetChunks, Range, WorkerAssignment, WorkerState}; use crate::cli::Config; @@ -54,7 +54,7 @@ impl DataChunk { size_bytes: u64, filenames: Vec, ) -> anyhow::Result { - let chunk = subsquid_messages::data_chunk::DataChunk::from_str(chunk_str) + let chunk = sqd_messages::data_chunk::DataChunk::from_str(chunk_str) .map_err(|_| anyhow::anyhow!("Invalid chunk: {chunk_str}"))?; Ok(Self { dataset_id: format!("s3://{bucket}"), @@ -127,7 +127,7 @@ pub fn chunks_to_assignment(chunks: impl Iterator) -> WorkerAs #[cfg(test)] mod tests { - use subsquid_messages::range::RangeSet; + use sqd_messages::range::RangeSet; use super::*; diff --git a/crates/network-scheduler/src/main.rs b/crates/network-scheduler/src/main.rs index 6e9f607..8cb610a 100644 --- a/crates/network-scheduler/src/main.rs +++ b/crates/network-scheduler/src/main.rs @@ -2,7 +2,7 @@ use clap::Parser; use env_logger::Env; use prometheus_client::registry::Registry; -use subsquid_network_transport::P2PTransportBuilder; +use sqd_network_transport::P2PTransportBuilder; use crate::cli::Cli; use crate::server::Server; @@ -38,12 +38,12 @@ async fn main() -> anyhow::Result<()> { // Open file for writing metrics let mut metrics_registry = Registry::default(); - subsquid_network_transport::metrics::register_metrics(&mut metrics_registry); + sqd_network_transport::metrics::register_metrics(&mut metrics_registry); prometheus_metrics::register_metrics(&mut metrics_registry); // Build P2P transport let transport_builder = P2PTransportBuilder::from_cli(args.transport).await?; - let contract_client: Box = transport_builder.contract_client(); + let contract_client: Box = transport_builder.contract_client(); let local_peer_id = transport_builder.local_peer_id(); let (incoming_messages, transport_handle) = transport_builder.build_scheduler(Default::default())?; diff --git a/crates/network-scheduler/src/metrics_server.rs b/crates/network-scheduler/src/metrics_server.rs index a45e34d..30b599f 100644 --- a/crates/network-scheduler/src/metrics_server.rs +++ b/crates/network-scheduler/src/metrics_server.rs @@ -8,7 +8,7 @@ use axum::routing::get; use axum::{Extension, Json, Router}; use prometheus_client::registry::Registry; use serde_partial::{Field, SerializePartial}; -use subsquid_network_transport::util::CancellationToken; +use sqd_network_transport::util::CancellationToken; use tokio::sync::RwLock; use crate::cli::Config; diff --git a/crates/network-scheduler/src/scheduler.rs b/crates/network-scheduler/src/scheduler.rs index 0b6cccd..e07ddf5 100644 --- a/crates/network-scheduler/src/scheduler.rs +++ b/crates/network-scheduler/src/scheduler.rs @@ -15,10 +15,10 @@ use random_choice::random_choice; use serde::{Deserialize, Serialize}; use tokio::time::Instant; -use contract_client::Worker; -use subsquid_messages::HttpHeader; -use subsquid_messages::{pong::Status as WorkerStatus, Ping}; -use subsquid_network_transport::PeerId; +use sqd_contract_client::Worker; +use sqd_messages::HttpHeader; +use sqd_messages::{pong::Status as WorkerStatus, Ping}; +use sqd_network_transport::PeerId; use crate::cli::Config; use crate::data_chunk::chunks_to_assignment; @@ -539,7 +539,7 @@ impl Scheduler { } fn add_signature_headers( - assignment: &mut subsquid_messages::WorkerAssignment, + assignment: &mut sqd_messages::WorkerAssignment, worker_id: &PeerId, worker_state: &WorkerState, ) { diff --git a/crates/network-scheduler/src/scheduling_unit.rs b/crates/network-scheduler/src/scheduling_unit.rs index c95038c..1e8e7d2 100644 --- a/crates/network-scheduler/src/scheduling_unit.rs +++ b/crates/network-scheduler/src/scheduling_unit.rs @@ -2,7 +2,7 @@ use std::fmt::{Display, Formatter}; use nonempty::NonEmpty; use serde::{Deserialize, Serialize}; -use subsquid_network_transport::util::CancellationToken; +use sqd_network_transport::util::CancellationToken; use tokio::sync::mpsc::{Receiver, Sender}; use crate::data_chunk::{ChunkId, DataChunk}; diff --git a/crates/network-scheduler/src/server.rs b/crates/network-scheduler/src/server.rs index 8859d92..3bfd68c 100644 --- a/crates/network-scheduler/src/server.rs +++ b/crates/network-scheduler/src/server.rs @@ -12,10 +12,10 @@ use tokio::signal::unix::{signal, SignalKind}; use tokio::sync::mpsc::Receiver; use tokio::time::Instant; -use subsquid_messages::signatures::msg_hash; -use subsquid_messages::{Pong, RangeSet}; -use subsquid_network_transport::util::{CancellationToken, TaskManager}; -use subsquid_network_transport::{SchedulerEvent, SchedulerTransportHandle}; +use sqd_messages::signatures::msg_hash; +use sqd_messages::{Pong, RangeSet}; +use sqd_network_transport::util::{CancellationToken, TaskManager}; +use sqd_network_transport::{SchedulerEvent, SchedulerTransportHandle}; use crate::cli::Config; use crate::data_chunk::{chunks_to_worker_state, DataChunk}; @@ -51,7 +51,7 @@ impl Server { pub async fn run + Send + Unpin + 'static>( mut self, - contract_client: Box, + contract_client: Box, storage_client: S3Storage, metrics_listen_addr: SocketAddr, metrics_registry: Registry, @@ -136,13 +136,13 @@ impl Server { async fn spawn_scheduling_task( &mut self, - contract_client: Box, + contract_client: Box, storage_client: S3Storage, ) -> anyhow::Result<()> { log::info!("Starting scheduling task"); let scheduler = self.scheduler.clone(); let last_epoch = Arc::new(Mutex::new(contract_client.current_epoch().await?)); - let contract_client: Arc = contract_client.into(); + let contract_client: Arc = contract_client.into(); let schedule_interval = Config::get().schedule_interval_epochs; let task = move |_| { diff --git a/crates/network-scheduler/src/storage.rs b/crates/network-scheduler/src/storage.rs index 7f1c566..33bd4eb 100644 --- a/crates/network-scheduler/src/storage.rs +++ b/crates/network-scheduler/src/storage.rs @@ -11,7 +11,7 @@ use nonempty::NonEmpty; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::Mutex; -use subsquid_network_transport::util::{CancellationToken, TaskManager}; +use sqd_network_transport::util::{CancellationToken, TaskManager}; use crate::cli::Config; use crate::data_chunk::DataChunk; diff --git a/crates/network-scheduler/src/worker_state.rs b/crates/network-scheduler/src/worker_state.rs index b35a3f1..0c5fc82 100644 --- a/crates/network-scheduler/src/worker_state.rs +++ b/crates/network-scheduler/src/worker_state.rs @@ -6,10 +6,10 @@ use serde::{Deserialize, Serialize}; use serde_partial::SerializePartial; use serde_with::{serde_as, TimestampMilliSeconds}; -use contract_client::Address; use dashmap::DashMap; -use subsquid_messages::{Ping, RangeSet}; -use subsquid_network_transport::PeerId; +use sqd_contract_client::Address; +use sqd_messages::{Ping, RangeSet}; +use sqd_network_transport::PeerId; use crate::cli::Config; use crate::data_chunk::DataChunk; diff --git a/crates/peer-checker/Cargo.toml b/crates/peer-checker/Cargo.toml new file mode 100644 index 0000000..1b8b3dc --- /dev/null +++ b/crates/peer-checker/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "peer-checker" +version = "1.0.0" +edition = "2021" + +[dependencies] +anyhow = "1" +axum = { version = "0.7", features = ["json"] } +clap = { version = "4", features = ["derive", "env"] } +env_logger = "0.11" +log = "0.4" +serde_json = "1" +tokio = { version = "1", features = ["full"] } + +sqd-contract-client = { workspace = true } +sqd-messages = { workspace = true } +sqd-network-transport = { workspace = true, features = ["peer-checker"] } + +[target.'cfg(not(target_env = "msvc"))'.dependencies] +tikv-jemallocator = "0.6" diff --git a/crates/peer-checker/src/main.rs b/crates/peer-checker/src/main.rs new file mode 100644 index 0000000..875ce81 --- /dev/null +++ b/crates/peer-checker/src/main.rs @@ -0,0 +1,131 @@ +use std::net::SocketAddr; +use std::time::Duration; + +use axum::http::StatusCode; +use axum::response::{IntoResponse, Response}; +use axum::routing::post; +use axum::{Extension, Json, Router}; +use clap::Parser; +use env_logger::Env; +use tokio::signal::unix::{signal, SignalKind}; + +use sqd_network_transport::{ + P2PTransportBuilder, PeerCheckerConfig, PeerCheckerTransportHandle, ProbeRequest, ProbeResult, + TransportArgs, +}; + +#[cfg(not(target_env = "msvc"))] +use tikv_jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + +#[derive(Parser)] +#[command(version)] +struct Cli { + #[command(flatten)] + pub transport: TransportArgs, + + #[arg( + long, + env, + help = "HTTP server listen addr", + default_value = "0.0.0.0:8000" + )] + http_listen_addr: SocketAddr, + + #[arg( + long, + env, + help = "Maximum number of probes that could run in parallel", + default_value = "1024" + )] + max_concurrent_probes: usize, + + #[arg( + long, + env, + help = "Size of the queue for probes waiting to be started", + default_value = "1024" + )] + probe_queue_size: usize, + + #[arg( + long, + env, + help = "Timeout for a single peer probe (in seconds)", + default_value = "20" + )] + probe_timeout_sec: u64, +} + +async fn probe_peer( + Extension(transport_handle): Extension, + Json(request): Json, +) -> Response { + match transport_handle.probe_peer(request).await { + ProbeResult::Ok(res) => (StatusCode::OK, Json(res)).into_response(), + ProbeResult::Failure { error } => (StatusCode::NOT_FOUND, error).into_response(), + ProbeResult::Ongoing => ( + StatusCode::TOO_MANY_REQUESTS, + "Probe for peer already ongoing", + ) + .into_response(), + ProbeResult::TooManyProbes => ( + StatusCode::SERVICE_UNAVAILABLE, + "Too many active/scheduled probes", + ) + .into_response(), + ProbeResult::ServerError => { + (StatusCode::INTERNAL_SERVER_ERROR, "Internal server error").into_response() + } + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + // Init logger and parse arguments + env_logger::Builder::from_env(Env::default().default_filter_or("info, ethers_providers=warn")) + .init(); + let args: Cli = Cli::parse(); + + // Build P2P transport + let transport_builder = P2PTransportBuilder::from_cli(args.transport) + .await? + .with_base_config(|mut config| { + config.max_concurrent_probes = args.max_concurrent_probes; + config.probe_timeout = Duration::from_secs(args.probe_timeout_sec); + config + }); + let config = PeerCheckerConfig { + probe_queue_size: args.probe_queue_size, + ..Default::default() + }; + let transport_handle = transport_builder.build_peer_checker(config)?; + + // Start HTTP server + let addr = args.http_listen_addr; + log::info!("Starting HTTP server listening on {addr}"); + let app = Router::new() + .route("/probe", post(probe_peer)) + .layer(Extension(transport_handle)); + + let mut sigint = signal(SignalKind::interrupt())?; + let mut sigterm = signal(SignalKind::terminate())?; + let shutdown = async move { + tokio::select! { + _ = sigint.recv() => (), + _ = sigterm.recv() =>(), + } + }; + + let listener = tokio::net::TcpListener::bind(addr).await?; + axum::serve(listener, app) + .with_graceful_shutdown(shutdown) + .await?; + + log::info!("HTTP server stopped"); + + Ok(()) +} diff --git a/crates/pings-collector/Cargo.toml b/crates/pings-collector/Cargo.toml index fc1990c..9fedf62 100644 --- a/crates/pings-collector/Cargo.toml +++ b/crates/pings-collector/Cargo.toml @@ -22,9 +22,9 @@ serde_repr = "0.1" tokio = { version = "1", features = ["full"] } yaque = "0.6" -contract-client = { workspace = true } -subsquid-messages = { workspace = true } -subsquid-network-transport = { workspace = true, features = ["pings-collector"] } +sqd-contract-client = { workspace = true } +sqd-messages = { workspace = true } +sqd-network-transport = { workspace = true, features = ["pings-collector"] } collector-utils = { path = "../collector-utils" } diff --git a/crates/pings-collector/src/cli.rs b/crates/pings-collector/src/cli.rs index 36af29a..6d3d459 100644 --- a/crates/pings-collector/src/cli.rs +++ b/crates/pings-collector/src/cli.rs @@ -1,7 +1,7 @@ use std::path::PathBuf; use clap::Parser; -use subsquid_network_transport::TransportArgs; +use sqd_network_transport::TransportArgs; use collector_utils::ClickhouseArgs; diff --git a/crates/pings-collector/src/main.rs b/crates/pings-collector/src/main.rs index 56c1271..d0e3b15 100644 --- a/crates/pings-collector/src/main.rs +++ b/crates/pings-collector/src/main.rs @@ -3,7 +3,7 @@ use std::time::Duration; use clap::Parser; use env_logger::Env; -use subsquid_network_transport::P2PTransportBuilder; +use sqd_network_transport::P2PTransportBuilder; use collector_utils::ClickhouseStorage; @@ -31,7 +31,7 @@ async fn main() -> anyhow::Result<()> { // Build P2P transport let transport_builder = P2PTransportBuilder::from_cli(args.transport).await?; - let contract_client: Arc = + let contract_client: Arc = transport_builder.contract_client().into(); let (incoming_pings, transport_handle) = transport_builder.build_pings_collector(Default::default())?; diff --git a/crates/pings-collector/src/server.rs b/crates/pings-collector/src/server.rs index 89d08d7..ce71128 100644 --- a/crates/pings-collector/src/server.rs +++ b/crates/pings-collector/src/server.rs @@ -11,10 +11,10 @@ use parking_lot::RwLock; use tokio::signal::unix::{signal, SignalKind}; use collector_utils::{PingRow, Storage}; -use contract_client::Client as ContractClient; use semver::VersionReq; -use subsquid_network_transport::util::{CancellationToken, TaskManager}; -use subsquid_network_transport::{PeerId, Ping, PingsCollectorTransportHandle}; +use sqd_contract_client::Client as ContractClient; +use sqd_network_transport::util::{CancellationToken, TaskManager}; +use sqd_network_transport::{PeerId, Ping, PingsCollectorTransportHandle}; lazy_static! { static ref BINCODE_CONFIG: bincode::config::Configuration = Default::default(); diff --git a/crates/router-controller/Cargo.toml b/crates/router-controller/Cargo.toml index 26902c0..b7e0cb0 100644 --- a/crates/router-controller/Cargo.toml +++ b/crates/router-controller/Cargo.toml @@ -10,4 +10,4 @@ parking_lot = "0.12" rand = "0.8" serde = "1" -subsquid-messages = { workspace = true } # FIXME: Remove this dependency +sqd-messages = { workspace = true } # FIXME: Remove this dependency diff --git a/crates/router-controller/src/controller.rs b/crates/router-controller/src/controller.rs index b9e1eac..b9e89d8 100644 --- a/crates/router-controller/src/controller.rs +++ b/crates/router-controller/src/controller.rs @@ -10,7 +10,7 @@ use base64::Engine; use rand::prelude::SliceRandom; use serde::{Deserialize, Serialize}; -use subsquid_messages::{data_chunk::DataChunk, Range, RangeSet, WorkerState}; +use sqd_messages::{data_chunk::DataChunk, Range, RangeSet, WorkerState}; use crate::atom::Atom; @@ -516,7 +516,7 @@ mod tests { use super::Ping; use crate::controller::ControllerBuilder; - use subsquid_messages::data_chunk::DataChunk; + use sqd_messages::data_chunk::DataChunk; #[test] fn basic() { diff --git a/crates/router/Cargo.toml b/crates/router/Cargo.toml index ee02f8e..35ef753 100644 --- a/crates/router/Cargo.toml +++ b/crates/router/Cargo.toml @@ -19,4 +19,4 @@ lazy_static = "1.4.0" prometheus = { version = "0.13.3", features = ["process"] } router-controller = { version = "0.1", path = "../router-controller" } -subsquid-messages = { workspace = true } # FIXME: Remove this dependency +sqd-messages = { workspace = true } # FIXME: Remove this dependency diff --git a/crates/router/src/dataset.rs b/crates/router/src/dataset.rs index 5abf333..e560790 100644 --- a/crates/router/src/dataset.rs +++ b/crates/router/src/dataset.rs @@ -3,7 +3,7 @@ use std::str::FromStr; use aws_sdk_s3::Client; use tokio::runtime::Handle; -use subsquid_messages::data_chunk::DataChunk; +use sqd_messages::data_chunk::DataChunk; pub trait Storage { /// Get data chunks in the dataset. diff --git a/crates/router/src/http_server.rs b/crates/router/src/http_server.rs index 92ab5d2..096a319 100644 --- a/crates/router/src/http_server.rs +++ b/crates/router/src/http_server.rs @@ -14,7 +14,7 @@ use prometheus::{gather, Encoder, TextEncoder}; use tracing::info; use router_controller::controller::{Controller, Ping}; -use subsquid_messages::WorkerState; +use sqd_messages::WorkerState; mod middleware;