Skip to content

Commit

Permalink
feat(rust): add connect/disconnect logs to portal worker
Browse files Browse the repository at this point in the history
  • Loading branch information
adrianbenavides committed Feb 20, 2025
1 parent 922ef85 commit c460690
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 8 deletions.
11 changes: 6 additions & 5 deletions implementations/rust/ockam/ockam_api/src/session/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ impl Session {
async fn run_loop(
ctx: Context,
key: String,
initial_connect_was_called: bool,
mut initial_connect_was_called: bool,
collector_address: Address,
shared_state: SharedState,
ping_interval: Duration,
Expand Down Expand Up @@ -479,6 +479,7 @@ impl Session {
// The session is down, or we reached the maximum number of failures
_ => {
let mut replacer = shared_state.replacer.lock().await;
debug!(key = %key, %initial_connect_was_called, %first_creation, pings = %pings.len(), "session state");

if first_creation && !initial_connect_was_called {
debug!(key = %key, "session is down. starting");
Expand All @@ -487,7 +488,7 @@ impl Session {
warn!(key = %key, "session unresponsive. replacing");
}

if !first_creation && pings.len() > 0 {
if initial_connect_was_called && pings.len() > 0 {
replacer.on_session_down().await;
}

Expand All @@ -502,9 +503,9 @@ impl Session {
match replacer.create().await {
Ok(replacer_outcome) => {
info!(key = %key, ping_route = %replacer_outcome.ping_route, "replacement is up");
if !first_creation {
replacer.on_session_replaced().await;
}

initial_connect_was_called = true;
replacer.on_session_replaced().await;

shared_state.status.set_up(replacer_outcome.ping_route);
shared_state
Expand Down
2 changes: 1 addition & 1 deletion implementations/rust/ockam/ockam_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ hex = { version = "0.4", default-features = false, optional = true }
miette = { version = "7.2.0", features = ["fancy-no-backtrace"], optional = true }
minicbor = { version = "0.25.1", default-features = false, features = ["derive"] }
ockam_macros = { path = "../ockam_macros", version = "^0.37.0", default-features = false }
once_cell = { version = "1", optional = true, default-features = false }
once_cell = { version = "1.19.0", optional = true, default-features = false }
opentelemetry = { version = "0.27", features = ["logs", "metrics", "trace"], optional = true }
rand = { version = "0.8", default-features = false }
rand_pcg = { version = "0.3.1", default-features = false, optional = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ impl PortalType {
PortalType::PrivilegedInlet | PortalType::PrivilegedOutlet => true,
}
}

pub fn is_inlet(&self) -> bool {
match self {
PortalType::Inlet | PortalType::PrivilegedInlet => true,
PortalType::Outlet | PortalType::PrivilegedOutlet => false,
}
}

pub fn is_outlet(&self) -> bool {
!self.is_inlet()
}
}

impl Display for PortalType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,27 @@ impl Worker for TcpPortalWorker {
self.registry
.add_portal_worker(&self.addresses.sender_remote);

info!(portal_type = %self.portal_type, sender_internal = %self.addresses.sender_internal,
if self.portal_type.is_inlet() {
info!(
"The TCP Inlet at {} connected to {}",
&self.hostname_port,
self.their_identifier
.as_ref()
.map(|i| i.to_string())
.unwrap_or_else(|| "unknown".to_string()),
);
} else if self.portal_type.is_outlet() {
info!(
"The TCP Outlet at {} received connection from {}",
&self.hostname_port,
self.their_identifier
.as_ref()
.map(|i| i.to_string())
.unwrap_or_else(|| "unknown".to_string()),
);
}

debug!(portal_type = %self.portal_type, sender_internal = %self.addresses.sender_internal,
"tcp portal worker initialized"
);

Expand Down Expand Up @@ -647,7 +667,21 @@ impl TcpPortalWorker {

#[instrument(skip_all)]
async fn handle_disconnect(&mut self, ctx: &Context) -> Result<()> {
info!(portal_type = %self.portal_type, sender_internal = %self.addresses.sender_internal,
let portal_type_str = if self.portal_type.is_inlet() {
"TCP Inlet"
} else {
"TCP Outlet"
};
info!(
"The {} at {} was disconnected from to {}",
portal_type_str,
&self.hostname_port,
self.their_identifier
.as_ref()
.map(|i| i.to_string())
.unwrap_or_else(|| "unknown".to_string()),
);
debug!(portal_type = %self.portal_type, sender_internal = %self.addresses.sender_internal,
"tcp stream was dropped");
self.start_disconnection(ctx, DisconnectionReason::FailedRx)
.await
Expand Down

0 comments on commit c460690

Please sign in to comment.