Skip to content
This repository was archived by the owner on Feb 3, 2023. It is now read-only.

Tracing dm #2107

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 100 additions & 103 deletions Cargo.lock

Large diffs are not rendered by default.

19 changes: 14 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,17 @@ exclude = [
]

[patch.crates-io]
#lib3h = "=0.0.38"
#lib3h_protocol = "=0.0.38"
#lib3h_crypto_api = "=0.0.38"
#lib3h_sodium = "=0.0.38"
#lib3h_zombie_actor = "=0.0.38"
lib3h = { git = "https://github.com/holochain/lib3h", branch ="bump_tracing" }
lib3h_protocol = { git = "https://github.com/holochain/lib3h", branch ="bump_tracing" }
lib3h_crypto_api = { git = "https://github.com/holochain/lib3h", branch ="bump_tracing" }
lib3h_sodium = { git = "https://github.com/holochain/lib3h", branch ="bump_tracing" }
lib3h_zombie_actor = { git = "https://github.com/holochain/lib3h", branch ="bump_tracing" }
#lib3h = "=0.0.37"
#lib3h_protocol = "=0.0.37"
#lib3h_crypto_api = "=0.0.37"
#lib3h_sodium = "=0.0.37"
#lib3h_zombie_actor = "=0.0.37"

#holochain_tracing = { path = "../holochain-tracing/crates/tracing" }
#holochain_tracing_macros = { path = "../holochain-tracing/crates/tracing_macros" }

4 changes: 2 additions & 2 deletions crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ holochain_conductor_lib = { version = "=0.0.43-alpha3", path = "../conductor_lib
holochain_net = { version = "=0.0.43-alpha3", path = "../net" }
holochain_dpki = { version = "=0.0.43-alpha3", path = "../dpki" }
holochain_locksmith = { version = "=0.0.43-alpha3", path = "../locksmith" }
holochain_tracing = "=0.0.19"
holochain_tracing_macros = "=0.0.19"
holochain_tracing = "=0.0.20"
holochain_tracing_macros = "=0.0.20"
newrelic="0.2"
sim2h = { version = "=0.0.43-alpha3", path = "../sim2h" }
lib3h_crypto_api = "=0.0.38"
Expand Down
2 changes: 1 addition & 1 deletion crates/conductor_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ edition = "2018"
holochain_core_types = { version = "=0.0.43-alpha3", path = "../core_types" }
holochain_wasm_utils = { version = "=0.0.43-alpha3", path = "../wasm_utils" }
holochain_locksmith = { version = "=0.0.43-alpha3", path = "../locksmith" }
holochain_tracing_macros = "=0.0.19"
holochain_tracing_macros = "=0.0.20"
holochain_common = { version = "=0.0.43-alpha3", path = "../common" }
newrelic="0.2"
jsonrpc-core = "=14.0.1"
Expand Down
4 changes: 2 additions & 2 deletions crates/conductor_lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ holochain_persistence_lmdb = "=0.0.17"
holochain_common = { version = "=0.0.43-alpha3", path = "../common" }
holochain_dpki = { version = "=0.0.43-alpha3", path = "../dpki" }
holochain_net = { version = "=0.0.43-alpha3", path = "../net" }
holochain_tracing = "=0.0.19"
holochain_tracing_macros = "=0.0.19"
holochain_tracing = "=0.0.20"
holochain_tracing_macros = "=0.0.20"
lib3h = "=0.0.38"
lib3h_sodium = "=0.0.38"
holochain_metrics = { version = "=0.0.43-alpha3", path = "../metrics" }
Expand Down
4 changes: 2 additions & 2 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ holochain_core_types = { version = "=0.0.43-alpha3", path = "../core_types" }
holochain_dpki = { version = "=0.0.43-alpha3", path = "../dpki" }
holochain_locksmith = { version = "=0.0.43-alpha3", path = "../locksmith" }
holochain_metrics = { version = "=0.0.43-alpha3", path = "../metrics" }
holochain_tracing = "=0.0.19"
holochain_tracing_macros = "=0.0.19"
holochain_tracing = "=0.0.20"
holochain_tracing_macros = "=0.0.20"
log = "=0.4.8"
holochain_logging = "=0.0.7"
boolinator = "=2.4.0"
Expand Down
2 changes: 1 addition & 1 deletion crates/dpki/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ edition = "2018"
lazy_static = "=1.4.0"
base64 = "=0.10.1"
holochain_core_types = { version = "=0.0.43-alpha3", path = "../core_types" }
holochain_tracing_macros = "=0.0.19"
holochain_tracing_macros = "=0.0.20"
newrelic="0.2"
lib3h_sodium = "=0.0.38"
holochain_persistence_api = "=0.0.17"
Expand Down
4 changes: 2 additions & 2 deletions crates/holochain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ holochain_conductor_lib = { version = "=0.0.43-alpha3", path = "../conductor_lib
lib3h_sodium = "=0.0.38"
holochain_common = { version = "=0.0.43-alpha3", path = "../common" }
holochain_locksmith = { version = "=0.0.43-alpha3", path = "../locksmith" }
holochain_tracing = "=0.0.19"
holochain_tracing_macros = "=0.0.19"
holochain_tracing = "=0.0.20"
holochain_tracing_macros = "=0.0.20"
structopt = "=0.2.15"
tiny_http = "=0.6.2"
lazy_static = "=1.4.0"
Expand Down
2 changes: 1 addition & 1 deletion crates/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ shrinkwraprs = "=0.2.1"
serde = { version = "=1.0.104", features = ["rc"] }
serde_derive = "=1.0.104"
holochain_locksmith = { version = "=0.0.43-alpha3", path = "../locksmith" }
holochain_tracing_macros = "=0.0.19"
holochain_tracing_macros = "=0.0.20"
holochain_common = { version = "=0.0.43-alpha3", path = "../common" }
lazy_static = "=1.4.0"
num-traits = "=0.2.6"
Expand Down
4 changes: 2 additions & 2 deletions crates/net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ lib3h_protocol = "=0.0.38"
lib3h = "=0.0.38"
lib3h_zombie_actor = "=0.0.38"
detach = "=0.0.19"
holochain_tracing = "=0.0.19"
holochain_tracing = "=0.0.20"
holochain_metrics = { version = "=0.0.43-alpha3", path = "../metrics" }
holochain_conductor_lib_api = { version = "=0.0.43-alpha3", path = "../conductor_api" }
holochain_core_types = { version = "=0.0.43-alpha3", path = "../core_types" }
holochain_locksmith = { version = "=0.0.43-alpha3", path = "../locksmith" }
holochain_json_derive = "=0.0.23"
holochain_json_api = "=0.0.23"
holochain_tracing_macros = "=0.0.19"
holochain_tracing_macros = "=0.0.20"
holochain_persistence_api = "=0.0.17"
holochain_common = { version = "=0.0.43-alpha3", path = "../common" }
in_stream = { version = "=0.0.43-alpha3", path = "../in_stream" }
Expand Down
4 changes: 2 additions & 2 deletions crates/sim2h/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ lib3h_crypto_api = "=0.0.38"
lib3h_sodium = "=0.0.38"
lib3h_protocol = "=0.0.38"
lib3h_zombie_actor = "=0.0.38"
holochain_tracing = "=0.0.19"
holochain_tracing = "=0.0.20"
holochain_core_types = { version = "=0.0.43-alpha3", path = "../core_types" }
holochain_locksmith = { version = "=0.0.43-alpha3", path = "../locksmith" }
holochain_metrics = { version = "=0.0.43-alpha3", path = "../metrics" }
holochain_common = { version = "=0.0.43-alpha3", path = "../common" }
holochain_tracing_macros = "=0.0.19"
holochain_tracing_macros = "=0.0.20"
im = { version = "=14.0.0", features = ["serde"] }
in_stream = { version = "=0.0.43-alpha3", path = "../in_stream" }
uuid = { version = "0.4", features = ["v4"] }
Expand Down
31 changes: 24 additions & 7 deletions crates/sim2h/src/connection_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub enum ConMgrEvent {
#[derive(Debug)]
enum ConMgrCommand {
Connect(Lib3hUri, TcpWss),
SendData(Lib3hUri, WsFrame),
SendData(Lib3hUri, ht::SpanWrap<WsFrame>),
Disconnect(Lib3hUri),
}

Expand All @@ -38,7 +38,13 @@ type CmdRecv = tokio::sync::mpsc::UnboundedReceiver<ConMgrCommand>;
pub type ConnectionMgrEventRecv = EvtRecv;

/// internal websocket polling loop
async fn wss_task(uri: Lib3hUri, mut wss: TcpWss, evt_send: EvtSend, mut cmd_recv: CmdRecv) {
async fn wss_task(
uri: Lib3hUri,
mut wss: TcpWss,
evt_send: EvtSend,
mut cmd_recv: CmdRecv,
tracer: Option<ht::Tracer>,
) {
let mut frame = None;

// TODO - this should be done with tokio tcp streams && selecting
Expand All @@ -59,7 +65,8 @@ async fn wss_task(uri: Lib3hUri, mut wss: TcpWss, evt_send: EvtSend, mut cmd_rec
did_work = true;
match cmd {
ConMgrCommand::SendData(_uri, frame) => {
if let Err(e) = wss.write(frame) {
let _spanguard = ht::follow(&tracer, &frame, here!(()));
if let Err(e) = wss.write(frame.data) {
error!("socket write error {} {:?}", uri, e);
let _ = evt_send
.send(ConMgrEvent::Disconnect(uri.clone(), Some(e.into())));
Expand Down Expand Up @@ -134,9 +141,14 @@ async fn wss_task(uri: Lib3hUri, mut wss: TcpWss, evt_send: EvtSend, mut cmd_rec
}

/// internal actually spawn the above wss_task into the tokio runtime
fn spawn_wss_task(uri: Lib3hUri, wss: TcpWss, evt_send: EvtSend) -> CmdSend {
fn spawn_wss_task(
uri: Lib3hUri,
wss: TcpWss,
evt_send: EvtSend,
tracer: Option<ht::Tracer>,
) -> CmdSend {
let (cmd_send, cmd_recv) = tokio::sync::mpsc::unbounded_channel();
tokio::task::spawn(wss_task(uri, wss, evt_send, cmd_recv));
tokio::task::spawn(wss_task(uri, wss, evt_send, cmd_recv, tracer));
cmd_send
}

Expand Down Expand Up @@ -175,12 +187,15 @@ pub struct ConnectionMgr {
evt_recv_from_children: EvtRecv,
connection_count: ConnectionCount,
wss_map: std::collections::HashMap<Lib3hUri, CmdSend>,
tracer: Option<ht::Tracer>,
}

impl ConnectionMgr {
/// spawn a new connection manager task, returning a handle for controlling it
/// and a receiving channel for any incoming data
pub fn new() -> (ConnectionMgrHandle, ConnectionMgrEventRecv, ConnectionCount) {
pub fn new(
tracer: Option<ht::Tracer>,
) -> (ConnectionMgrHandle, ConnectionMgrEventRecv, ConnectionCount) {
let (evt_p_send, evt_p_recv) = tokio::sync::mpsc::unbounded_channel();
let (evt_c_send, evt_c_recv) = tokio::sync::mpsc::unbounded_channel();
let (cmd_send, cmd_recv) = tokio::sync::mpsc::unbounded_channel();
Expand All @@ -198,6 +213,7 @@ impl ConnectionMgr {
evt_recv_from_children: evt_c_recv,
connection_count: connection_count.clone(),
wss_map: std::collections::HashMap::new(),
tracer,
};

tokio::task::spawn(con_mgr_task(con_mgr, weak_ref_dummy));
Expand Down Expand Up @@ -250,6 +266,7 @@ impl ConnectionMgr {
uri.clone(),
wss,
self.evt_send_from_children.clone(),
self.tracer.clone(),
);
if let Some(old) = self.wss_map.insert(uri.clone(), cmd_send) {
error!("REPLACING ACTIVE CONNECTION: {}", uri);
Expand Down Expand Up @@ -353,7 +370,7 @@ impl ConnectionMgrHandle {
}

/// send data to a managed websocket connection
pub fn send_data(&self, uri: Lib3hUri, frame: WsFrame) {
pub fn send_data(&self, uri: Lib3hUri, frame: ht::SpanWrap<WsFrame>) {
if let Err(e) = self.send_cmd.send(ConMgrCommand::SendData(uri, frame)) {
error!("failed to send on channel - shutting down? {:?}", e);
}
Expand Down
32 changes: 28 additions & 4 deletions crates/sim2h/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![feature(vec_remove_item)]
#![feature(proc_macro_hygiene)]
#![feature(label_break_value)]
#![allow(clippy::redundant_clone)]

Expand Down Expand Up @@ -258,15 +259,20 @@ impl Sim2hHandle {
&self.connection_mgr
}

#[autotrace]
/// send a message to another connected agent
pub fn send(&self, agent: AgentId, uri: Lib3hUri, msg: &WireMessage) {
debug!(">>OUT>> {} to {}", msg.message_type(), uri);
MESSAGE_LOGGER
.lock()
.log_out(agent, uri.clone(), msg.clone());
let payload: Opaque = msg.clone().into();
self.connection_mgr
.send_data(uri, payload.as_bytes().into());
let payload: ht::SpanWrap<WsFrame> = ht::wrap_with_tag(
payload.as_bytes().into(),
here!(()),
ht::debug_tag("Message", msg),
);
self.connection_mgr.send_data(uri, payload);
}

/// get access to our im_state object
Expand All @@ -275,6 +281,7 @@ impl Sim2hHandle {
}

/// forward a message to be handled
#[autotrace]
pub fn handle_message(&self, uri: Lib3hUri, message: WireMessage, signer: AgentId) {
// dispatch to correct handler
let sim2h_handle = self.clone();
Expand All @@ -300,6 +307,14 @@ impl Sim2hHandle {
// you have to be in a space to proceed further
let tracer = self.tracer.clone().unwrap_or_else(|| ht::null_tracer());
tokio::task::spawn(async move {
let _spanguard = message.try_get_span().and_then(|msg| {
ht::follow_encoded_tag(
&Some(tracer.clone()),
msg,
here!(()),
ht::debug_tag("HandleMessageTask", message.clone()),
)
});
// -- right now each agent can only be part of a single space :/ --

let (agent_id, space_hash) = 'got_info: {
Expand Down Expand Up @@ -1046,7 +1061,7 @@ pub struct Sim2h {
}

#[autotrace]
#[holochain_tracing_macros::newrelic_autotrace(SIM2H)]
//#[holochain_tracing_macros::newrelic_autotrace(SIM2H)]
impl Sim2h {
/// create a new Sim2h server instance
pub fn new(
Expand All @@ -1060,7 +1075,8 @@ impl Sim2h {

let (metric_gen, metric_task) = MetricsTimerGenerator::new();

let (connection_mgr, connection_mgr_evt_recv, connection_count) = ConnectionMgr::new();
let (connection_mgr, connection_mgr_evt_recv, connection_count) =
ConnectionMgr::new(tracer.clone());

let (wss_send, wss_recv) = crossbeam_channel::unbounded();
let sim2h_handle = Sim2hHandle::new(
Expand Down Expand Up @@ -1230,6 +1246,14 @@ impl Sim2h {
Ok((signed_message.provenance.source().into(), wire_message))
})() {
Ok((source, wire_message)) => {
let _spanguard = wire_message.try_get_span().and_then(|msg| {
ht::follow_encoded_tag(
&sim2h_handle.tracer,
msg,
here!(()),
ht::debug_tag("HandlePayload", wire_message.clone()),
)
});
sim2h_handle.handle_message(url, wire_message, source)
}
Err(error) => {
Expand Down
10 changes: 10 additions & 0 deletions crates/sim2h/src/wire_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,16 @@ impl WireMessage {
WireMessage::Err(_) => "[Error] {:?}",
})
}

pub fn try_get_span(&self) -> Option<&ht::EncodedSpanContext> {
match self {
WireMessage::ClientToLib3h(s) => s.span_context.as_ref(),
WireMessage::ClientToLib3hResponse(s) => s.span_context.as_ref(),
WireMessage::Lib3hToClient(s) => s.span_context.as_ref(),
WireMessage::Lib3hToClientResponse(s) => s.span_context.as_ref(),
_ => None,
}
}
}

fn get_multi_type(list: Vec<&Lib3hToClient>) -> &str {
Expand Down
4 changes: 2 additions & 2 deletions crates/sim2h_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ repository = "https://github.com/holochain/sim2h"
crossbeam-channel = "=0.3.8"
sim2h = { version = "=0.0.43-alpha3", path = "../sim2h" }
holochain_common = { version = "=0.0.43-alpha3", path = "../common" }
holochain_tracing = "=0.0.19"
holochain_tracing_macros = "=0.0.19"
holochain_tracing = "=0.0.20"
holochain_tracing_macros = "=0.0.20"
newrelic="0.2"
detach = "=0.0.19"
futures = "=0.3.2"
Expand Down
2 changes: 1 addition & 1 deletion crates/stress/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ hcid = "=0.0.6"
lib3h_crypto_api = "=0.0.38"
lib3h_protocol = "=0.0.38"
lib3h_sodium = "=0.0.38"
holochain_tracing = "=0.0.19"
holochain_tracing = "=0.0.20"
log = "=0.4.8"
nanoid = "=0.2.0"
native-tls = "=0.2.3"
Expand Down