Skip to content

Commit

Permalink
fix: p2p timeout, ipfs settings, more
Browse files Browse the repository at this point in the history
  • Loading branch information
zeeshanlakhani committed Nov 8, 2023
1 parent ae5e26d commit e13b48d
Show file tree
Hide file tree
Showing 21 changed files with 701 additions and 370 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/tests_and_checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ jobs:

- name: Run Tests (no-default-features)
if: ${{ matrix.default-features == 'none' }}
run: cargo nextest run --workspace --profile ci --no-default-features --features "test-utils"
run: cargo nextest run --profile ci --no-default-features --features "test-utils"

- name: Run Doc Tests
if: ${{ matrix.default-features == 'all' }}
Expand Down Expand Up @@ -241,7 +241,7 @@ jobs:

- name: Run Tests (no-default-features)
if: ${{ matrix.default-features == 'none' }}
run: cargo nextest run --workspace --profile ci --no-default-features --features "test-utils"
run: cargo nextest run --profile ci --no-default-features --features "test-utils"

- name: Run Doc Tests
if: ${{ matrix.default-features == 'all' }}
Expand Down
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ hooks. Please run this before every commit and/or push.
* **`nx-test`**, which translates to
`cargo nextest run --workspace && cargo test --workspace --doc`
* **`x-test`** for testing continuously as files change, translating to
`cargo watch -c -s "cargo nextest run --workspace --nocapture && cargo test --doc"`
`cargo watch -c -s "cargo nextest run --workspace --no-capture && cargo test --doc"`
* **`x-<build,check,run,clippy>`** for running a variety of `cargo watch`
execution stages
* **`nx-test-<all,0>`**, which is just like `nx-test`, but adds `all` or `0`
Expand Down
16 changes: 8 additions & 8 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@
xFuncNoDefault = cmd:
pkgs.writeScriptBin "x-${cmd}-0" ''
#!${pkgs.stdenv.shell}
cargo watch -c -s "cargo ${cmd} --workspace --no-default-features"
cargo watch -c -s "cargo ${cmd} --no-default-features"
'';

xFuncPackage = cmd: crate:
Expand All @@ -145,19 +145,19 @@

xFuncTest = pkgs.writeScriptBin "x-test" ''
#!${pkgs.stdenv.shell}
cargo watch -c -s "cargo nextest run --workspace --nocapture && cargo test --doc"
cargo watch -c -s "cargo nextest run --workspace --no-capture && cargo test --doc"
'';

xFuncTestAll = pkgs.writeScriptBin "x-test-all" ''
#!${pkgs.stdenv.shell}
cargo watch -c -s "cargo nextest run --workspace --all-features --nocapture \
cargo watch -c -s "cargo nextest run --workspace --all-features --no-capture \
&& cargo test --workspace --doc --all-features"
'';

xFuncTestNoDefault = pkgs.writeScriptBin "x-test-0" ''
#!${pkgs.stdenv.shell}
cargo watch -c -s "cargo nextest run --workspace --no-default-features --nocapture \
&& cargo test --workspace --doc --no-default-features"
cargo watch -c -s "cargo nextest run --no-default-features --no-capture \
&& cargo test --doc --no-default-features"
'';

xFuncTestPackage = crate:
Expand All @@ -175,14 +175,14 @@

nxTestAll = pkgs.writeScriptBin "nx-test-all" ''
#!${pkgs.stdenv.shell}
cargo nextest run --workspace --all-features --nocapture
cargo nextest run --workspace --all-features --no-capture
cargo test --workspace --doc --all-features
'';

nxTestNoDefault = pkgs.writeScriptBin "nx-test-0" ''
#!${pkgs.stdenv.shell}
cargo nextest run --workspace --no-default-features --nocapture
cargo test --workspace --doc --no-default-features
cargo nextest run --no-default-features --no-capture
cargo test --doc --no-default-features
'';

wasmTest = pkgs.writeScriptBin "wasm-ex-test" ''
Expand Down
2 changes: 2 additions & 0 deletions homestar-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ libp2p = { version = "0.52", default-features = false, features = [
"noise",
"cbor",
"yamux",
"serde",
] }
libsqlite3-sys = { version = "0.26", default-features = false, features = [
"bundled",
Expand Down Expand Up @@ -166,6 +167,7 @@ tower = { version = "0.4", default-features = false, features = [
] }
tower-http = { version = "0.4", default-features = false, features = [
"trace",
"sensitive-headers",
"catch-panic",
"cors",
] }
Expand Down
5 changes: 3 additions & 2 deletions homestar-runtime/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub(crate) mod cache;
pub mod channel;
pub(crate) mod error;
pub(crate) mod event;
#[cfg(feature = "websocket-notify")]
pub(crate) mod notification;
pub(crate) mod swarm_event;
pub(crate) use cache::{setup_cache, CacheValue};
Expand Down Expand Up @@ -80,7 +81,7 @@ pub(crate) struct EventHandler<DB: Database> {
p2p_provider_timeout: Duration,
db: DB,
swarm: Swarm<ComposedBehaviour>,
cache: Cache<String, CacheValue>,
cache: Arc<Cache<String, CacheValue>>,
sender: Arc<channel::AsyncBoundedChannelSender<Event>>,
receiver: channel::AsyncBoundedChannelReceiver<Event>,
query_senders: FnvHashMap<QueryId, (RequestResponseKey, Option<P2PSender>)>,
Expand Down Expand Up @@ -228,7 +229,7 @@ where
#[cfg(not(feature = "ipfs"))]
pub(crate) async fn start(mut self) -> Result<()> {
let handle = Handle::current();
handle.spawn(poll_cache(self.cache.clone()));
handle.spawn(poll_cache(self.cache.clone(), self.poll_cache_interval));

loop {
select! {
Expand Down
83 changes: 69 additions & 14 deletions homestar-runtime/src/event_handler/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::event_handler::notification::emit_receipt;
use crate::network::IpfsCli;
use crate::{
db::Database,
event_handler::{Handler, P2PSender},
event_handler::{Handler, P2PSender, ResponseEvent},
network::{
pubsub,
swarm::{CapsuleTag, RequestResponseKey, TopicMessage},
Expand Down Expand Up @@ -127,8 +127,8 @@ impl Event {
event_handler.shutdown().await;
let _ = tx.send(());
}
Event::FindRecord(record) => record.find(event_handler),
Event::RemoveRecord(record) => record.remove(event_handler),
Event::FindRecord(record) => record.find(event_handler).await,
Event::RemoveRecord(record) => record.remove(event_handler).await,
Event::OutboundRequest(PeerRequest {
peer,
request,
Expand All @@ -144,7 +144,7 @@ impl Event {
.request_response_senders
.insert(request_id, (request, sender));
}
Event::GetProviders(record) => record.get_providers(event_handler),
Event::GetProviders(record) => record.get_providers(event_handler).await,
Event::ProvideRecord(cid, sender, capsule_tag) => {
let query_id = event_handler
.swarm
Expand Down Expand Up @@ -247,7 +247,7 @@ impl Captured {
{
emit_receipt(
event_handler.ws_workflow_sender(),
receipt.clone(),
&receipt,
self.metadata.to_owned(),
)
}
Expand All @@ -258,12 +258,15 @@ impl Captured {
TopicMessage::CapturedReceipt(receipt),
) {
Ok(msg_id) => info!(
"message {msg_id} published on {} topic for receipt with cid: {receipt_cid}",
cid = receipt_cid.to_string(),
"message {msg_id} published on {} topic for receipt",
pubsub::RECEIPTS_TOPIC
),
Err(_err) => {
error!(
"message not published on {} topic for receipt with cid: {receipt_cid}",
Err(err) => {
warn!(
err=?err,
cid = receipt_cid.to_string(),
"message not published on {} topic for receipt",
pubsub::RECEIPTS_TOPIC
)
}
Expand Down Expand Up @@ -328,7 +331,7 @@ impl Replay {
Self { pointers, metadata }
}

fn notify<DB>(self, event_handler: &EventHandler<DB>) -> Result<()>
fn notify<DB>(self, event_handler: &mut EventHandler<DB>) -> Result<()>
where
DB: Database,
{
Expand All @@ -350,14 +353,36 @@ impl Replay {
);

#[cfg(feature = "websocket-notify")]
receipts.into_iter().for_each(|receipt| {
receipts.iter().for_each(|receipt| {
emit_receipt(
event_handler.ws_workflow_sender(),
receipt,
self.metadata.to_owned(),
);
});

// gossiping replayed receipts
receipts.into_iter().for_each(|receipt| {
if event_handler.pubsub_enabled {
let receipt_cid = receipt.cid().to_string();
let _ = event_handler
.swarm
.behaviour_mut()
.gossip_publish(
pubsub::RECEIPTS_TOPIC,
TopicMessage::CapturedReceipt(receipt),
)
.map(|msg_id|
info!(cid=receipt_cid,
"message {msg_id} published on {} topic for receipt", pubsub::RECEIPTS_TOPIC))
.map_err(
|err|
warn!(err=?err, cid=receipt_cid,
"message not published on {} topic for receipt", pubsub::RECEIPTS_TOPIC),
);
}
});

Ok(())
}
}
Expand All @@ -372,10 +397,20 @@ impl QueryRecord {
}
}

fn find<DB>(self, event_handler: &mut EventHandler<DB>)
async fn find<DB>(self, event_handler: &mut EventHandler<DB>)
where
DB: Database,
{
if event_handler.connections.peers.is_empty() {
info!("no connections to send request to");

if let Some(sender) = self.sender {
let _ = sender.send_async(ResponseEvent::NoPeersAvailable).await;
}

return;
}

let id = event_handler
.swarm
.behaviour_mut()
Expand All @@ -386,10 +421,20 @@ impl QueryRecord {
event_handler.query_senders.insert(id, (key, self.sender));
}

fn remove<DB>(self, event_handler: &mut EventHandler<DB>)
async fn remove<DB>(self, event_handler: &mut EventHandler<DB>)
where
DB: Database,
{
if event_handler.connections.peers.is_empty() {
info!("no connections to send request to");

if let Some(sender) = self.sender {
let _ = sender.send_async(ResponseEvent::NoPeersAvailable).await;
}

return;
}

event_handler
.swarm
.behaviour_mut()
Expand All @@ -403,10 +448,20 @@ impl QueryRecord {
.stop_providing(&Key::new(&self.cid.to_bytes()));
}

fn get_providers<DB>(self, event_handler: &mut EventHandler<DB>)
async fn get_providers<DB>(self, event_handler: &mut EventHandler<DB>)
where
DB: Database,
{
if event_handler.connections.peers.is_empty() {
info!("no connections to send request to");

if let Some(sender) = self.sender {
let _ = sender.send_async(ResponseEvent::NoPeersAvailable).await;
}

return;
}

let id = event_handler
.swarm
.behaviour_mut()
Expand Down
4 changes: 2 additions & 2 deletions homestar-runtime/src/event_handler/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ const TIMESTAMP_KEY: &str = "timestamp";
/// Send receipt notification as bytes.
pub(crate) fn emit_receipt(
notifier: Notifier<notifier::Message>,
receipt: Receipt,
receipt: &Receipt,
metadata: Option<Ipld>,
) {
let invocation_receipt = InvocationReceipt::from(&receipt);
let invocation_receipt = InvocationReceipt::from(receipt);
let receipt_cid = receipt.cid();
let notification = ReceiptNotification::with(invocation_receipt, receipt_cid, metadata.clone());

Expand Down
8 changes: 6 additions & 2 deletions homestar-runtime/src/event_handler/swarm_event.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
//! Internal libp2p [SwarmEvent] handling and [Handler] implementation.
use super::EventHandler;
#[cfg(feature = "websocket-notify")]
use crate::event_handler::notification::{self, EventNotificationTyp, SwarmNotification};
#[cfg(feature = "ipfs")]
use crate::network::IpfsCli;
use crate::{
db::{Connection, Database},
event_handler::{
cache::{self, CacheData, CacheValue},
event::QueryRecord,
notification::{self, EventNotificationTyp, SwarmNotification},
Event, Handler, RequestResponseError,
},
libp2p::multiaddr::MultiaddrExt,
Expand Down Expand Up @@ -40,6 +41,7 @@ use libp2p::{
swarm::{dial_opts::DialOpts, SwarmEvent},
PeerId, StreamProtocol,
};
#[cfg(feature = "websocket-notify")]
use maplit::btreemap;
use std::{
collections::{HashMap, HashSet},
Expand All @@ -56,6 +58,8 @@ const RENDEZVOUS_NAMESPACE: &str = "homestar";
pub(crate) enum ResponseEvent {
/// Found [PeerRecord] on the DHT.
Found(Result<FoundEvent>),
/// TODO
NoPeersAvailable,
/// Found Providers/[PeerId]s on the DHT.
Providers(Result<HashSet<PeerId>>),
}
Expand Down Expand Up @@ -758,7 +762,7 @@ async fn handle_swarm_event<THandlerErr: fmt::Debug + Send, DB: Database>(
} => {
debug!(
peer_id = peer_id.to_string(),
"peer connection closed, cause: {cause:?}"
"peer connection closed, cause: {cause:#?}, endpoint: {endpoint:#?}"
);
event_handler.connections.peers.remove_entry(&peer_id);

Expand Down
Loading

0 comments on commit e13b48d

Please sign in to comment.