Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Periodic Bootstraps #229

Closed
wants to merge 12 commits into from
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
.*
aterentic-ethernal marked this conversation as resolved.
Show resolved Hide resolved
/target
**/*.rs.bk
.env
.DS_Store
config.yaml
avail_light_store
18 changes: 16 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ base64 = "0.21.0"
mockall = "0.11.3"
async-trait = "0.1.66"
hex-literal = "0.4.0"
uuid = { version = "1.3.4", features = ["v4", "fast-rng", "macro-diagnostics"] }
pcap = "1.1.0"

[features]
Expand All @@ -85,4 +86,3 @@ panic = "abort"

[profile.release]
panic = "abort"

44 changes: 23 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,29 +81,31 @@ http_server_port = "7000"
# If set to seed, keypair will be generated from that seed.
# If set to key, a valid ed25519 private key must be provided, else the client will fail
# If `secret_key` is not set, random seed will be used.
secret_key = { key = "1498b5467a63dffa2dc9d9e069caf075d16fc33fdd4c3b01bfadae6433767d93" }
# Libp2p service port range (port, range) (default: 37000).
libp2p_port = "37000"
# Configures LibP2P TCP port reuse for local sockets, which implies reuse of listening ports for outgoing connections to enhance NAT traversal capabilities (default: false)
libp2p_tcp_port_reuse = false
# Configures LibP2P AutoNAT behaviour to reject probes as a server for clients that are observed at a non-global ip address (default: false)
libp2p_autonat_only_global_ips = false
# Libp2p AutoNat throttle period for re-using a peer as server for a dial-request. (default: 1 sec)
libp2p_autonat_throttle = 1
secret_key = { seed="1" }
aterentic-ethernal marked this conversation as resolved.
Show resolved Hide resolved
# P2P service port (default: 37000).
port = 3700
# Configures TCP port reuse for local sockets, which implies reuse of listening ports for outgoing connections to enhance NAT traversal capabilities (default: false)
tcp_port_reuse = bool
# Configures AutoNAT behaviour to reject probes as a server for clients that are observed at a non-global ip address (default: false)
autonat_only_global_ips = false
# AutoNat throttle period for re-using a peer as server for a dial-request. (default: 1 sec)
autonat_throttle = 2
# Interval in which the NAT status should be re-tried if it is currently unknown or max confidence was not reached yet. (default: 10 sec)
libp2p_autonat_retry_interval = 10
autonat_retry_interval = 10
# Interval in which the NAT should be tested again if max confidence was reached in a status. (default: 30 sec)
libp2p_autonat_refresh_interval = 30
# Libp2p AutoNat on init delay before starting the fist probe. (default: 5 sec)
libp2p_autonat_boot_delay = 5
# Sets libp2p application-specific version of the protocol family used by the peer. (default: "/avail_kad/id/1.0.0")
libp2p_identify_protocol = "/avail_kad/id/1.0.0"
# Sets libp2p agent version that is sent to peers. (default: "avail-light-client/rust-client")
libp2p_identify_agent = "avail-light-client/rust-client"
# Vector of Relay nodes, which are used for hole punching (default: empty)
relays = [["12D3KooWMm1c4pzeLPGkkCJMAgFbsfQ8xmVDusg272icWsaNHWzN", "/ip4/127.0.0.1/tcp/37000"]]
# Vector of IPFS bootstrap nodes, used to bootstrap DHT. If not set, light client acts as a bootstrap node, waiting for first peer to connect for DHT bootstrap (default: empty).
bootstraps = [["12D3KooWMm1c4pzeLPGkkCJMAgFbsfQ8xmVDusg272icWsaNHWzN", "/ip4/127.0.0.1/tcp/37000"]]
autonat_refresh_interval = 30
# AutoNat on init delay before starting the fist probe. (default: 5 sec)
autonat_boot_delay = 10
# Sets application-specific version of the protocol family used by the peer. (default: "/avail_kad/id/1.0.0")
identify_protocol = "/avail_kad/id/1.0.0"
# Sets agent version that is sent to peers. (default: "avail-light-client/rust-client")
identify_agent = "avail-light-client/rust-client"
# Vector of Light Client bootstrap nodes, used to bootstrap DHT. If not set, light client acts as a bootstrap node, waiting for first peer to connect for DHT bootstrap (default: empty).
bootstraps = [["12D3KooWE2xXc6C2JzeaCaEg7jvZLogWyjLsB5dA3iw5o3KcF9ds", "/ip4/13.51.79.255/udp/39000/quic-v1"]]
# Vector of Relay nodes, which are used for hole punching
relays = [["12D3KooWBETtE42fN7DZ5QsGgi7qfrN3jeYdXmBPL4peVTDmgG9b", "/ip4/13.49.44.246/udp/39111/quic-v1"]]
# Defines a period of time in which periodic bootstraps will be repeated. (default: 300 sec)
bootstrap_period = 300,
# WebSocket endpoint of a full node for subscribing to the latest header, etc (default: ws://127.0.0.1:9944).
full_node_ws = ["ws://127.0.0.1:9944"]
# ID of application used to start application client. If app_id is not set, or set to 0, application client is not started (default: 0).
Expand Down
10 changes: 2 additions & 8 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use clap::Parser;
use consts::STATE_CF;
use libp2p::{metrics::Metrics as LibP2PMetrics, multiaddr::Protocol, Multiaddr, PeerId};
use prometheus_client::registry::Registry;
use rand::{thread_rng, Rng};
use rocksdb::{ColumnFamilyDescriptor, Options, DB};
use tokio::sync::mpsc::{channel, Sender};
use tracing::{error, info, metadata::ParseLevelError, trace, warn, Level};
Expand Down Expand Up @@ -191,13 +190,8 @@ async fn run(error_sender: Sender<anyhow::Error>) -> Result<()> {
tokio::spawn(network_event_loop.run());

// Start listening on provided port
let port = if cfg.libp2p_port.1 > 0 {
let port: u16 = thread_rng().gen_range(cfg.libp2p_port.0..=cfg.libp2p_port.1);
info!("Using random port: {port}");
port
} else {
cfg.libp2p_port.0
};
let port = cfg.port;
info!("Using random port: {port}");

// always listen on UDP to prioritize QUIC
network_client
Expand Down
44 changes: 22 additions & 22 deletions src/network/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub struct Client {
dht_parallelization_limit: usize,
/// Cell time to live in DHT (in seconds)
ttl: u64,
/// Number of records to be put in DHT simultaneuosly
/// Number of records to be put in DHT simultaneously
put_batch_size: usize,
}

Expand Down Expand Up @@ -144,29 +144,29 @@ impl Client {
}

async fn put_kad_record_batch(&self, records: Vec<Record>, quorum: Quorum) -> NumSuccPut {
let mut num_success: usize = 0;
for records in records.chunks(self.put_batch_size).map(Into::into) {
let (sender, receiver) = oneshot::channel();
if self
.sender
.send(Command::PutKadRecordBatch {
records,
let (tx, mut rx) = mpsc::channel::<NumSuccPut>(100);
let commands =
records
.chunks(self.put_batch_size)
.map(|records| Command::PutKadRecordBatch {
records: records.into(),
quorum,
sender,
})
.await
.context("Command receiver should not be dropped.")
.is_err()
{
return NumSuccPut(num_success);
sender: tx.clone(),
});

for cmd in commands {
if self.sender.send(cmd).await.is_err() {
return NumSuccPut(0);
}
}
Comment on lines +157 to +161
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this block if there are more than 100 batches, since its not in separate thread, receive will not happen until everything is sent?


num_success +=
if let Ok(NumSuccPut(num)) = receiver.await.context("Sender not to be dropped.") {
num
} else {
num_success
};
// drop tx manually,
// ensure that only senders in spawned threads are still in use
drop(tx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess if we move first reference into separate thread this won't be needed.


let mut num_success: usize = 0;
while let Some(NumSuccPut(num)) = rx.recv().await {
num_success += num;
}
NumSuccPut(num_success)
}
Expand Down Expand Up @@ -371,7 +371,7 @@ pub enum Command {
PutKadRecordBatch {
records: Arc<[Record]>,
quorum: Quorum,
sender: oneshot::Sender<NumSuccPut>,
sender: mpsc::Sender<NumSuccPut>,
},
ReduceKademliaMapSize,
NetworkObservabilityDump,
Expand Down
Loading