diff --git a/crashtracker/src/lib.rs b/crashtracker/src/lib.rs index 5ce6fcc3d..4f4ad2cb1 100644 --- a/crashtracker/src/lib.rs +++ b/crashtracker/src/lib.rs @@ -64,8 +64,8 @@ pub use crash_info::*; #[cfg(all(unix, feature = "receiver"))] pub use receiver::{ - async_receiver_entry_point_unix_socket, receiver_entry_point_stdin, - receiver_entry_point_unix_socket, + async_receiver_entry_point_unix_listener, async_receiver_entry_point_unix_socket, + get_receiver_unix_socket, receiver_entry_point_stdin, receiver_entry_point_unix_socket, }; #[cfg(all(unix, any(feature = "collector", feature = "receiver")))] diff --git a/crashtracker/src/receiver/entry_points.rs b/crashtracker/src/receiver/entry_points.rs index 6d1472d65..0ceed1b57 100644 --- a/crashtracker/src/receiver/entry_points.rs +++ b/crashtracker/src/receiver/entry_points.rs @@ -23,15 +23,21 @@ pub fn receiver_entry_point_stdin() -> anyhow::Result<()> { Ok(()) } +pub async fn async_receiver_entry_point_unix_listener( + listener: &UnixListener, +) -> anyhow::Result<()> { + let (unix_stream, _) = listener.accept().await?; + let stream = BufReader::new(unix_stream); + receiver_entry_point(receiver_timeout(), stream).await +} + pub async fn async_receiver_entry_point_unix_socket( socket_path: impl AsRef<str>, one_shot: bool, ) -> anyhow::Result<()> { - let listener = get_unix_socket(socket_path)?; + let listener = get_receiver_unix_socket(socket_path)?; loop { - let (unix_stream, _) = listener.accept().await?; - let stream = BufReader::new(unix_stream); - let res = receiver_entry_point(receiver_timeout(), stream).await; + let res = async_receiver_entry_point_unix_listener(&listener).await; // TODO, should we log failures somewhere? if one_shot { return res; @@ -48,11 +54,7 @@ pub fn receiver_entry_point_unix_socket(socket_path: impl AsRef<str>) -> anyhow: // Dropping the stream closes it, allowing the collector to exit if it was waiting. } -/*----------------------------------------- -| Helper Functions | -------------------------------------------*/ - -fn get_unix_socket(socket_path: impl AsRef<str>) -> anyhow::Result<UnixListener> { +pub fn get_receiver_unix_socket(socket_path: impl AsRef<str>) -> anyhow::Result<UnixListener> { fn path_bind(socket_path: impl AsRef<str>) -> anyhow::Result<UnixListener> { let socket_path = socket_path.as_ref(); if std::fs::metadata(socket_path).is_ok() { diff --git a/crashtracker/src/receiver/mod.rs b/crashtracker/src/receiver/mod.rs index a30bd3c38..a4d6cb9da 100644 --- a/crashtracker/src/receiver/mod.rs +++ b/crashtracker/src/receiver/mod.rs @@ -4,8 +4,8 @@ mod entry_points; pub use entry_points::{ - async_receiver_entry_point_unix_socket, receiver_entry_point_stdin, - receiver_entry_point_unix_socket, + async_receiver_entry_point_unix_listener, async_receiver_entry_point_unix_socket, + get_receiver_unix_socket, receiver_entry_point_stdin, receiver_entry_point_unix_socket, }; mod receive_report; diff --git a/remote-config/src/fetch/multitarget.rs b/remote-config/src/fetch/multitarget.rs index 0a890d8dc..08ce9ad3d 100644 --- a/remote-config/src/fetch/multitarget.rs +++ b/remote-config/src/fetch/multitarget.rs @@ -167,44 +167,46 @@ where // "goto" like handling to drop the known_service borrow and be able to change services 'service_handling: { 'drop_service: { - let known_service = services.get_mut(target).unwrap(); - known_service.refcount = if known_service.refcount == 1 { - known_service.runtimes.remove(runtime_id); - let mut status = known_service.status.lock().unwrap(); - *status = match *status { - KnownTargetStatus::Pending => KnownTargetStatus::Alive, // not really - KnownTargetStatus::Alive => { - KnownTargetStatus::RemoveAt(Instant::now() + Duration::from_secs(3666)) - } - KnownTargetStatus::RemoveAt(_) | KnownTargetStatus::Removing(_) => { - unreachable!() + if let Some(known_service) = services.get_mut(target) { + known_service.refcount = if known_service.refcount == 1 { + known_service.runtimes.remove(runtime_id); + let mut status = known_service.status.lock().unwrap(); + *status = match *status { + KnownTargetStatus::Pending => KnownTargetStatus::Alive, // not really + KnownTargetStatus::Alive => KnownTargetStatus::RemoveAt( + Instant::now() + Duration::from_secs(3666), + ), + KnownTargetStatus::RemoveAt(_) | KnownTargetStatus::Removing(_) => { + unreachable!() + } + }; + // We've marked it Alive so that the Pending check in start_fetcher() will + // fail + if matches!(*status, KnownTargetStatus::Alive) { + break 'drop_service; } - }; - // We've marked it Alive so that the Pending check in start_fetcher() will fail - if matches!(*status, KnownTargetStatus::Alive) { - break 'drop_service; - } - 0 - } else { - if *known_service.fetcher.runtime_id.lock().unwrap() == runtime_id { - 'changed_rt_id: { - for (id, runtime) in self.runtimes.lock().unwrap().iter() { - if runtime.targets.len() == 1 - && runtime.targets.contains_key(target) - { - *known_service.fetcher.runtime_id.lock().unwrap() = - id.to_string(); - break 'changed_rt_id; + 0 + } else { + if *known_service.fetcher.runtime_id.lock().unwrap() == runtime_id { + 'changed_rt_id: { + for (id, runtime) in self.runtimes.lock().unwrap().iter() { + if runtime.targets.len() == 1 + && runtime.targets.contains_key(target) + { + *known_service.fetcher.runtime_id.lock().unwrap() = + id.to_string(); + break 'changed_rt_id; + } } + known_service.synthetic_id = true; + *known_service.fetcher.runtime_id.lock().unwrap() = + Self::generate_synthetic_id(); } - known_service.synthetic_id = true; - *known_service.fetcher.runtime_id.lock().unwrap() = - Self::generate_synthetic_id(); } - } - known_service.refcount - 1 - }; - break 'service_handling; + known_service.refcount - 1 + }; + break 'service_handling; + } } trace!("Remove {target:?} from services map while in pending state"); services.remove(target); @@ -309,14 +311,15 @@ where match info.targets.entry(target.clone()) { Entry::Occupied(mut e) => *e.get_mut() += 1, Entry::Vacant(e) => { - // it's the second usage here if let Some(primary_target) = primary_target { let mut services = self.services.lock().unwrap(); - let known_target = services.get_mut(&primary_target).unwrap(); - if !known_target.synthetic_id { - known_target.synthetic_id = true; - *known_target.fetcher.runtime_id.lock().unwrap() = - Self::generate_synthetic_id(); + if let Some(known_target) = services.get_mut(&primary_target) { + // it's the second usage here + if !known_target.synthetic_id { + known_target.synthetic_id = true; + *known_target.fetcher.runtime_id.lock().unwrap() = + Self::generate_synthetic_id(); + } } } e.insert(1); diff --git a/sidecar-ffi/src/lib.rs b/sidecar-ffi/src/lib.rs index ebf359e3b..2d20dc862 100644 --- a/sidecar-ffi/src/lib.rs +++ b/sidecar-ffi/src/lib.rs @@ -538,6 +538,7 @@ pub unsafe extern "C" fn ddog_sidecar_session_set_config( remote_config_products_count: usize, remote_config_capabilities: *const RemoteConfigCapabilities, remote_config_capabilities_count: usize, + is_fork: bool, ) -> MaybeError { #[cfg(unix)] let remote_config_notify_target = libc::getpid(); @@ -580,6 +581,7 @@ pub unsafe extern "C" fn ddog_sidecar_session_set_config( .as_slice() .to_vec(), }, + is_fork )); MaybeError::None diff --git a/sidecar-ffi/tests/sidecar.rs b/sidecar-ffi/tests/sidecar.rs index 50b55a803..7de38c0df 100644 --- a/sidecar-ffi/tests/sidecar.rs +++ b/sidecar-ffi/tests/sidecar.rs @@ -105,6 +105,7 @@ fn test_ddog_sidecar_register_app() { 0, null(), 0, + false, ) .unwrap_none(); @@ -160,6 +161,7 @@ fn test_ddog_sidecar_register_app() { 0, null(), 0, + false, ) .unwrap_none(); diff --git a/sidecar/src/entry.rs b/sidecar/src/entry.rs index fd473e5b4..7aaa7f8f3 100644 --- a/sidecar/src/entry.rs +++ b/sidecar/src/entry.rs @@ -73,11 +73,18 @@ where #[cfg(unix)] tokio::spawn(async move { let socket_path = crashtracker_unix_socket_path(); - let _ = datadog_crashtracker::async_receiver_entry_point_unix_socket( + match datadog_crashtracker::get_receiver_unix_socket( socket_path.to_str().unwrap_or_default(), - false, - ) - .await; + ) { + Ok(listener) => loop { + if let Err(e) = + datadog_crashtracker::async_receiver_entry_point_unix_listener(&listener).await + { + tracing::warn!("Got error while receiving crash report: {e}"); + } + }, + Err(e) => tracing::error!("Failed setting up the crashtracker listener: {e}"), + } }); // Init. Early, before we start listening. diff --git a/sidecar/src/log.rs b/sidecar/src/log.rs index 3fc6ef27c..6e38d8093 100644 --- a/sidecar/src/log.rs +++ b/sidecar/src/log.rs @@ -11,7 +11,7 @@ use std::collections::HashMap; use std::hash::Hash; use std::ops::{DerefMut, Sub}; use std::path::PathBuf; -use std::sync::{Mutex, RwLock}; +use std::sync::{Mutex, OnceLock, RwLock}; use std::time::{Duration, Instant, SystemTime}; use std::{env, io}; use tracing::level_filters::LevelFilter; @@ -365,6 +365,8 @@ lazy_static! { pub static ref MULTI_LOG_FILTER: MultiEnvFilter = MultiEnvFilter::default(); pub static ref MULTI_LOG_WRITER: MultiWriter = MultiWriter::default(); } +static PERMANENT_MIN_LOG_LEVEL: OnceLock<TemporarilyRetainedMapGuard<String, EnvFilter>> = + OnceLock::new(); pub(crate) fn enable_logging() -> anyhow::Result<()> { tracing_subscriber::registry() @@ -383,7 +385,8 @@ pub(crate) fn enable_logging() -> anyhow::Result<()> { } let config = config::Config::get(); if !config.log_level.is_empty() { - MULTI_LOG_FILTER.add(config.log_level.clone()); + let filter = MULTI_LOG_FILTER.add(config.log_level.clone()); + _ = PERMANENT_MIN_LOG_LEVEL.set(filter); } MULTI_LOG_WRITER.add(config.log_method); // same than MULTI_LOG_FILTER diff --git a/sidecar/src/service/blocking.rs b/sidecar/src/service/blocking.rs index 70f3782eb..ee10c6124 100644 --- a/sidecar/src/service/blocking.rs +++ b/sidecar/src/service/blocking.rs @@ -209,6 +209,7 @@ pub fn set_session_config( #[cfg(windows)] remote_config_notify_function: *mut libc::c_void, session_id: String, config: &SessionConfig, + is_fork: bool, ) -> io::Result<()> { #[cfg(unix)] let remote_config_notify_target = pid; @@ -219,6 +220,7 @@ pub fn set_session_config( session_id, remote_config_notify_target, config: config.clone(), + is_fork, }) } diff --git a/sidecar/src/service/sidecar_interface.rs b/sidecar/src/service/sidecar_interface.rs index 90a56dcf0..de17670eb 100644 --- a/sidecar/src/service/sidecar_interface.rs +++ b/sidecar/src/service/sidecar_interface.rs @@ -72,6 +72,7 @@ pub trait SidecarInterface { session_id: String, remote_config_notify_target: RemoteConfigNotifyTarget, config: SessionConfig, + is_fork: bool, ); /// Shuts down a runtime. diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index 364584083..91afd3d6f 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -32,7 +32,7 @@ use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex, MutexGuard}; use std::time::Duration; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, trace, warn}; use futures::FutureExt; use serde::{Deserialize, Serialize}; @@ -274,6 +274,13 @@ impl SidecarServer { } }; + debug!( + "Received {} bytes of data for {:?} with headers {:?}", + data.len(), + target, + headers + ); + let mut size = 0; let mut processor = tracer_payload::DefaultTraceChunkProcessor; let mut payload_params = tracer_payload::TracerPayloadParams::new( @@ -286,6 +293,7 @@ impl SidecarServer { payload_params.measure_size(&mut size); match payload_params.try_into() { Ok(payload) => { + trace!("Parsed the trace payload and enqueuing it for sending: {payload:?}"); let data = SendData::new(size, payload, headers, target); self.trace_flusher.enqueue(data); } @@ -670,6 +678,7 @@ impl SidecarInterface for SidecarServer { #[cfg(windows)] remote_config_notify_function: crate::service::remote_configs::RemoteConfigNotifyFunction, config: SessionConfig, + is_fork: bool, ) -> Self::SetSessionConfigFut { debug!("Set session config for {session_id} to {config:?}"); @@ -762,7 +771,9 @@ impl SidecarInterface for SidecarServer { } Box::pin(async move { - session.shutdown_running_instances().await; + if !is_fork { + session.shutdown_running_instances().await; + } no_response().await }) } @@ -1000,6 +1011,7 @@ impl SidecarInterface for SidecarServer { } else { Some(Cow::Owned(token)) }; + debug!("Update test token of session {session_id} to {token:?}"); fn update_cfg<F: FnOnce(Endpoint) -> anyhow::Result<()>>( endpoint: Option<Endpoint>, set: F, diff --git a/sidecar/src/service/tracing/trace_flusher.rs b/sidecar/src/service/tracing/trace_flusher.rs index 1121ac8c1..197fcdb0a 100644 --- a/sidecar/src/service/tracing/trace_flusher.rs +++ b/sidecar/src/service/tracing/trace_flusher.rs @@ -255,7 +255,7 @@ impl TraceFlusher { Err(e) => error!("Error receiving agent configuration: {e:?}"), } } - info!("Successfully flushed traces to {}", endpoint.url); + info!("Successfully flushed traces to {endpoint:?}"); } Err(e) => { error!("Error sending trace: {e:?}"); diff --git a/tinybytes/src/bytes_string.rs b/tinybytes/src/bytes_string.rs index 0fe0d22eb..67b9ea9c3 100644 --- a/tinybytes/src/bytes_string.rs +++ b/tinybytes/src/bytes_string.rs @@ -4,9 +4,10 @@ use crate::Bytes; #[cfg(feature = "serde")] use serde::ser::{Serialize, Serializer}; +use std::fmt::{Debug, Formatter}; use std::{borrow::Borrow, hash, str::Utf8Error}; -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Clone, Eq, PartialEq)] pub struct BytesString { bytes: Bytes, } @@ -157,6 +158,12 @@ impl hash::Hash for BytesString { } } +impl Debug for BytesString { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.serialize_str(self.as_str()) + } +} + #[cfg(test)] mod tests { use super::*;