Skip to content

Commit

Permalink
Avoid resetting the sidecar session on fork
Browse files Browse the repository at this point in the history
And add a few more log lines
Also fix some ordering assumptions around unrwap().

Signed-off-by: Bob Weinand <[email protected]>
  • Loading branch information
bwoebi committed Feb 5, 2025
1 parent b6c6615 commit d7acab5
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 58 deletions.
83 changes: 43 additions & 40 deletions remote-config/src/fetch/multitarget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,44 +167,46 @@ where
// "goto" like handling to drop the known_service borrow and be able to change services
'service_handling: {
'drop_service: {
let known_service = services.get_mut(target).unwrap();
known_service.refcount = if known_service.refcount == 1 {
known_service.runtimes.remove(runtime_id);
let mut status = known_service.status.lock().unwrap();
*status = match *status {
KnownTargetStatus::Pending => KnownTargetStatus::Alive, // not really
KnownTargetStatus::Alive => {
KnownTargetStatus::RemoveAt(Instant::now() + Duration::from_secs(3666))
}
KnownTargetStatus::RemoveAt(_) | KnownTargetStatus::Removing(_) => {
unreachable!()
if let Some(known_service) = services.get_mut(target) {
known_service.refcount = if known_service.refcount == 1 {
known_service.runtimes.remove(runtime_id);
let mut status = known_service.status.lock().unwrap();
*status = match *status {
KnownTargetStatus::Pending => KnownTargetStatus::Alive, // not really
KnownTargetStatus::Alive => KnownTargetStatus::RemoveAt(
Instant::now() + Duration::from_secs(3666),
),
KnownTargetStatus::RemoveAt(_) | KnownTargetStatus::Removing(_) => {
unreachable!()
}
};
// We've marked it Alive so that the Pending check in start_fetcher() will
// fail
if matches!(*status, KnownTargetStatus::Alive) {
break 'drop_service;
}
};
// We've marked it Alive so that the Pending check in start_fetcher() will fail
if matches!(*status, KnownTargetStatus::Alive) {
break 'drop_service;
}
0
} else {
if *known_service.fetcher.runtime_id.lock().unwrap() == runtime_id {
'changed_rt_id: {
for (id, runtime) in self.runtimes.lock().unwrap().iter() {
if runtime.targets.len() == 1
&& runtime.targets.contains_key(target)
{
*known_service.fetcher.runtime_id.lock().unwrap() =
id.to_string();
break 'changed_rt_id;
0
} else {
if *known_service.fetcher.runtime_id.lock().unwrap() == runtime_id {
'changed_rt_id: {
for (id, runtime) in self.runtimes.lock().unwrap().iter() {
if runtime.targets.len() == 1
&& runtime.targets.contains_key(target)
{
*known_service.fetcher.runtime_id.lock().unwrap() =
id.to_string();
break 'changed_rt_id;
}
}
known_service.synthetic_id = true;
*known_service.fetcher.runtime_id.lock().unwrap() =
Self::generate_synthetic_id();
}
known_service.synthetic_id = true;
*known_service.fetcher.runtime_id.lock().unwrap() =
Self::generate_synthetic_id();
}
}
known_service.refcount - 1
};
break 'service_handling;
known_service.refcount - 1
};
break 'service_handling;
}
}
trace!("Remove {target:?} from services map while in pending state");
services.remove(target);
Expand Down Expand Up @@ -309,14 +311,15 @@ where
match info.targets.entry(target.clone()) {
Entry::Occupied(mut e) => *e.get_mut() += 1,
Entry::Vacant(e) => {
// it's the second usage here
if let Some(primary_target) = primary_target {
let mut services = self.services.lock().unwrap();
let known_target = services.get_mut(&primary_target).unwrap();
if !known_target.synthetic_id {
known_target.synthetic_id = true;
*known_target.fetcher.runtime_id.lock().unwrap() =
Self::generate_synthetic_id();
if let Some(known_target) = services.get_mut(&primary_target) {
// it's the second usage here
if !known_target.synthetic_id {
known_target.synthetic_id = true;
*known_target.fetcher.runtime_id.lock().unwrap() =
Self::generate_synthetic_id();
}
}
}
e.insert(1);
Expand Down
2 changes: 2 additions & 0 deletions sidecar-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ pub unsafe extern "C" fn ddog_sidecar_session_set_config(
remote_config_products_count: usize,
remote_config_capabilities: *const RemoteConfigCapabilities,
remote_config_capabilities_count: usize,
is_fork: bool,
) -> MaybeError {
#[cfg(unix)]
let remote_config_notify_target = libc::getpid();
Expand Down Expand Up @@ -580,6 +581,7 @@ pub unsafe extern "C" fn ddog_sidecar_session_set_config(
.as_slice()
.to_vec(),
},
is_fork
));

MaybeError::None
Expand Down
2 changes: 2 additions & 0 deletions sidecar-ffi/tests/sidecar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ fn test_ddog_sidecar_register_app() {
0,
null(),
0,
false,
)
.unwrap_none();

Expand Down Expand Up @@ -160,6 +161,7 @@ fn test_ddog_sidecar_register_app() {
0,
null(),
0,
false,
)
.unwrap_none();

Expand Down
19 changes: 10 additions & 9 deletions sidecar/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,16 @@ where
#[cfg(unix)]
tokio::spawn(async move {
let socket_path = crashtracker_unix_socket_path();
match datadog_crashtracker::get_receiver_unix_socket(socket_path.to_str().unwrap_or_default()) {
Ok(listener) =>
loop {
if let Err(e) =
datadog_crashtracker::async_receiver_entry_point_unix_listener(&listener).await
{
tracing::warn!("Got error while receiving crash report: {e}");
}
},
match datadog_crashtracker::get_receiver_unix_socket(
socket_path.to_str().unwrap_or_default(),
) {
Ok(listener) => loop {
if let Err(e) =
datadog_crashtracker::async_receiver_entry_point_unix_listener(&listener).await
{
tracing::warn!("Got error while receiving crash report: {e}");
}
},
Err(e) => tracing::error!("Failed setting up the crashtracker listener: {e}"),
}
});
Expand Down
9 changes: 4 additions & 5 deletions sidecar/src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::collections::HashMap;
use std::hash::Hash;
use std::ops::{DerefMut, Sub};
use std::path::PathBuf;
use std::sync::{Mutex, RwLock};
use std::sync::{Mutex, OnceLock, RwLock};
use std::time::{Duration, Instant, SystemTime};
use std::{env, io};
use tracing::level_filters::LevelFilter;
Expand Down Expand Up @@ -365,7 +365,8 @@ lazy_static! {
pub static ref MULTI_LOG_FILTER: MultiEnvFilter = MultiEnvFilter::default();
pub static ref MULTI_LOG_WRITER: MultiWriter = MultiWriter::default();
}
static mut PERMANENT_MIN_LOG_LEVEL: Option<TemporarilyRetainedMapGuard<String, EnvFilter>> = None;
static PERMANENT_MIN_LOG_LEVEL: OnceLock<TemporarilyRetainedMapGuard<String, EnvFilter>> =
OnceLock::new();

pub(crate) fn enable_logging() -> anyhow::Result<()> {
tracing_subscriber::registry()
Expand All @@ -385,9 +386,7 @@ pub(crate) fn enable_logging() -> anyhow::Result<()> {
let config = config::Config::get();
if !config.log_level.is_empty() {
let filter = MULTI_LOG_FILTER.add(config.log_level.clone());
unsafe {
PERMANENT_MIN_LOG_LEVEL.replace(filter);
} // SAFETY: initialized once
_ = PERMANENT_MIN_LOG_LEVEL.set(filter);
}
MULTI_LOG_WRITER.add(config.log_method); // same than MULTI_LOG_FILTER

Expand Down
2 changes: 2 additions & 0 deletions sidecar/src/service/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ pub fn set_session_config(
#[cfg(windows)] remote_config_notify_function: *mut libc::c_void,
session_id: String,
config: &SessionConfig,
is_fork: bool,
) -> io::Result<()> {
#[cfg(unix)]
let remote_config_notify_target = pid;
Expand All @@ -219,6 +220,7 @@ pub fn set_session_config(
session_id,
remote_config_notify_target,
config: config.clone(),
is_fork,
})
}

Expand Down
1 change: 1 addition & 0 deletions sidecar/src/service/sidecar_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub trait SidecarInterface {
session_id: String,
remote_config_notify_target: RemoteConfigNotifyTarget,
config: SessionConfig,
is_fork: bool,
);

/// Shuts down a runtime.
Expand Down
16 changes: 14 additions & 2 deletions sidecar/src/service/sidecar_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::Duration;
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info, trace, warn};

use futures::FutureExt;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -274,6 +274,13 @@ impl SidecarServer {
}
};

debug!(
"Received {} bytes of data for {:?} with headers {:?}",
data.len(),
target,
headers
);

let mut size = 0;
let mut processor = tracer_payload::DefaultTraceChunkProcessor;
let mut payload_params = tracer_payload::TracerPayloadParams::new(
Expand All @@ -286,6 +293,7 @@ impl SidecarServer {
payload_params.measure_size(&mut size);
match payload_params.try_into() {
Ok(payload) => {
trace!("Parsed the trace payload and enqueuing it for sending: {payload:?}");
let data = SendData::new(size, payload, headers, target);
self.trace_flusher.enqueue(data);
}
Expand Down Expand Up @@ -670,6 +678,7 @@ impl SidecarInterface for SidecarServer {
#[cfg(windows)]
remote_config_notify_function: crate::service::remote_configs::RemoteConfigNotifyFunction,
config: SessionConfig,
is_fork: bool,
) -> Self::SetSessionConfigFut {
debug!("Set session config for {session_id} to {config:?}");

Expand Down Expand Up @@ -762,7 +771,9 @@ impl SidecarInterface for SidecarServer {
}

Box::pin(async move {
session.shutdown_running_instances().await;
if !is_fork {
session.shutdown_running_instances().await;
}
no_response().await
})
}
Expand Down Expand Up @@ -1000,6 +1011,7 @@ impl SidecarInterface for SidecarServer {
} else {
Some(Cow::Owned(token))
};
debug!("Update test token of session {session_id} to {token:?}");
fn update_cfg<F: FnOnce(Endpoint) -> anyhow::Result<()>>(
endpoint: Option<Endpoint>,
set: F,
Expand Down
2 changes: 1 addition & 1 deletion sidecar/src/service/tracing/trace_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl TraceFlusher {
Err(e) => error!("Error receiving agent configuration: {e:?}"),
}
}
info!("Successfully flushed traces to {}", endpoint.url);
info!("Successfully flushed traces to {endpoint:?}");
}
Err(e) => {
error!("Error sending trace: {e:?}");
Expand Down
9 changes: 8 additions & 1 deletion tinybytes/src/bytes_string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
use crate::Bytes;
#[cfg(feature = "serde")]
use serde::ser::{Serialize, Serializer};
use std::fmt::{Debug, Formatter};
use std::{borrow::Borrow, hash, str::Utf8Error};

#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone, Eq, PartialEq)]
pub struct BytesString {
bytes: Bytes,
}
Expand Down Expand Up @@ -157,6 +158,12 @@ impl hash::Hash for BytesString {
}
}

impl Debug for BytesString {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.serialize_str(self.as_str())
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit d7acab5

Please sign in to comment.