Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance logging and add fork handling for sessions #857

Merged
merged 5 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crashtracker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ pub use crash_info::*;

#[cfg(all(unix, feature = "receiver"))]
pub use receiver::{
async_receiver_entry_point_unix_socket, receiver_entry_point_stdin,
receiver_entry_point_unix_socket,
async_receiver_entry_point_unix_listener, async_receiver_entry_point_unix_socket,
get_receiver_unix_socket, receiver_entry_point_stdin, receiver_entry_point_unix_socket,
};

#[cfg(all(unix, any(feature = "collector", feature = "receiver")))]
Expand Down
20 changes: 11 additions & 9 deletions crashtracker/src/receiver/entry_points.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,21 @@ pub fn receiver_entry_point_stdin() -> anyhow::Result<()> {
Ok(())
}

pub async fn async_receiver_entry_point_unix_listener(
listener: &UnixListener,
) -> anyhow::Result<()> {
let (unix_stream, _) = listener.accept().await?;
let stream = BufReader::new(unix_stream);
receiver_entry_point(receiver_timeout(), stream).await
}

pub async fn async_receiver_entry_point_unix_socket(
socket_path: impl AsRef<str>,
one_shot: bool,
) -> anyhow::Result<()> {
let listener = get_unix_socket(socket_path)?;
let listener = get_receiver_unix_socket(socket_path)?;
loop {
let (unix_stream, _) = listener.accept().await?;
let stream = BufReader::new(unix_stream);
let res = receiver_entry_point(receiver_timeout(), stream).await;
let res = async_receiver_entry_point_unix_listener(&listener).await;
// TODO, should we log failures somewhere?
if one_shot {
return res;
Expand All @@ -48,11 +54,7 @@ pub fn receiver_entry_point_unix_socket(socket_path: impl AsRef<str>) -> anyhow:
// Dropping the stream closes it, allowing the collector to exit if it was waiting.
}

/*-----------------------------------------
| Helper Functions |
------------------------------------------*/

fn get_unix_socket(socket_path: impl AsRef<str>) -> anyhow::Result<UnixListener> {
pub fn get_receiver_unix_socket(socket_path: impl AsRef<str>) -> anyhow::Result<UnixListener> {
fn path_bind(socket_path: impl AsRef<str>) -> anyhow::Result<UnixListener> {
let socket_path = socket_path.as_ref();
if std::fs::metadata(socket_path).is_ok() {
Expand Down
4 changes: 2 additions & 2 deletions crashtracker/src/receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

mod entry_points;
pub use entry_points::{
async_receiver_entry_point_unix_socket, receiver_entry_point_stdin,
receiver_entry_point_unix_socket,
async_receiver_entry_point_unix_listener, async_receiver_entry_point_unix_socket,
get_receiver_unix_socket, receiver_entry_point_stdin, receiver_entry_point_unix_socket,
};
mod receive_report;

Expand Down
83 changes: 43 additions & 40 deletions remote-config/src/fetch/multitarget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,44 +167,46 @@ where
// "goto" like handling to drop the known_service borrow and be able to change services
'service_handling: {
'drop_service: {
let known_service = services.get_mut(target).unwrap();
known_service.refcount = if known_service.refcount == 1 {
known_service.runtimes.remove(runtime_id);
let mut status = known_service.status.lock().unwrap();
*status = match *status {
KnownTargetStatus::Pending => KnownTargetStatus::Alive, // not really
KnownTargetStatus::Alive => {
KnownTargetStatus::RemoveAt(Instant::now() + Duration::from_secs(3666))
}
KnownTargetStatus::RemoveAt(_) | KnownTargetStatus::Removing(_) => {
unreachable!()
if let Some(known_service) = services.get_mut(target) {
known_service.refcount = if known_service.refcount == 1 {
known_service.runtimes.remove(runtime_id);
let mut status = known_service.status.lock().unwrap();
*status = match *status {
KnownTargetStatus::Pending => KnownTargetStatus::Alive, // not really
KnownTargetStatus::Alive => KnownTargetStatus::RemoveAt(
Instant::now() + Duration::from_secs(3666),
),
KnownTargetStatus::RemoveAt(_) | KnownTargetStatus::Removing(_) => {
unreachable!()
}
};
// We've marked it Alive so that the Pending check in start_fetcher() will
// fail
if matches!(*status, KnownTargetStatus::Alive) {
break 'drop_service;
}
};
// We've marked it Alive so that the Pending check in start_fetcher() will fail
if matches!(*status, KnownTargetStatus::Alive) {
break 'drop_service;
}
0
} else {
if *known_service.fetcher.runtime_id.lock().unwrap() == runtime_id {
'changed_rt_id: {
for (id, runtime) in self.runtimes.lock().unwrap().iter() {
if runtime.targets.len() == 1
&& runtime.targets.contains_key(target)
{
*known_service.fetcher.runtime_id.lock().unwrap() =
id.to_string();
break 'changed_rt_id;
0
} else {
if *known_service.fetcher.runtime_id.lock().unwrap() == runtime_id {
'changed_rt_id: {
for (id, runtime) in self.runtimes.lock().unwrap().iter() {
if runtime.targets.len() == 1
&& runtime.targets.contains_key(target)
{
*known_service.fetcher.runtime_id.lock().unwrap() =
id.to_string();
break 'changed_rt_id;
}
}
known_service.synthetic_id = true;
*known_service.fetcher.runtime_id.lock().unwrap() =
Self::generate_synthetic_id();
}
known_service.synthetic_id = true;
*known_service.fetcher.runtime_id.lock().unwrap() =
Self::generate_synthetic_id();
}
}
known_service.refcount - 1
};
break 'service_handling;
known_service.refcount - 1
};
break 'service_handling;
}
}
trace!("Remove {target:?} from services map while in pending state");
services.remove(target);
Expand Down Expand Up @@ -309,14 +311,15 @@ where
match info.targets.entry(target.clone()) {
Entry::Occupied(mut e) => *e.get_mut() += 1,
Entry::Vacant(e) => {
// it's the second usage here
if let Some(primary_target) = primary_target {
let mut services = self.services.lock().unwrap();
let known_target = services.get_mut(&primary_target).unwrap();
if !known_target.synthetic_id {
known_target.synthetic_id = true;
*known_target.fetcher.runtime_id.lock().unwrap() =
Self::generate_synthetic_id();
if let Some(known_target) = services.get_mut(&primary_target) {
// it's the second usage here
if !known_target.synthetic_id {
known_target.synthetic_id = true;
*known_target.fetcher.runtime_id.lock().unwrap() =
Self::generate_synthetic_id();
}
}
}
e.insert(1);
Expand Down
2 changes: 2 additions & 0 deletions sidecar-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ pub unsafe extern "C" fn ddog_sidecar_session_set_config(
remote_config_products_count: usize,
remote_config_capabilities: *const RemoteConfigCapabilities,
remote_config_capabilities_count: usize,
is_fork: bool,
) -> MaybeError {
#[cfg(unix)]
let remote_config_notify_target = libc::getpid();
Expand Down Expand Up @@ -580,6 +581,7 @@ pub unsafe extern "C" fn ddog_sidecar_session_set_config(
.as_slice()
.to_vec(),
},
is_fork
));

MaybeError::None
Expand Down
2 changes: 2 additions & 0 deletions sidecar-ffi/tests/sidecar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ fn test_ddog_sidecar_register_app() {
0,
null(),
0,
false,
)
.unwrap_none();

Expand Down Expand Up @@ -160,6 +161,7 @@ fn test_ddog_sidecar_register_app() {
0,
null(),
0,
false,
)
.unwrap_none();

Expand Down
15 changes: 11 additions & 4 deletions sidecar/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,18 @@ where
#[cfg(unix)]
tokio::spawn(async move {
let socket_path = crashtracker_unix_socket_path();
let _ = datadog_crashtracker::async_receiver_entry_point_unix_socket(
match datadog_crashtracker::get_receiver_unix_socket(
socket_path.to_str().unwrap_or_default(),
false,
)
.await;
) {
Ok(listener) => loop {
if let Err(e) =
datadog_crashtracker::async_receiver_entry_point_unix_listener(&listener).await
{
tracing::warn!("Got error while receiving crash report: {e}");
}
},
Err(e) => tracing::error!("Failed setting up the crashtracker listener: {e}"),
}
});

// Init. Early, before we start listening.
Expand Down
7 changes: 5 additions & 2 deletions sidecar/src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::collections::HashMap;
use std::hash::Hash;
use std::ops::{DerefMut, Sub};
use std::path::PathBuf;
use std::sync::{Mutex, RwLock};
use std::sync::{Mutex, OnceLock, RwLock};
use std::time::{Duration, Instant, SystemTime};
use std::{env, io};
use tracing::level_filters::LevelFilter;
Expand Down Expand Up @@ -365,6 +365,8 @@ lazy_static! {
pub static ref MULTI_LOG_FILTER: MultiEnvFilter = MultiEnvFilter::default();
pub static ref MULTI_LOG_WRITER: MultiWriter = MultiWriter::default();
}
static PERMANENT_MIN_LOG_LEVEL: OnceLock<TemporarilyRetainedMapGuard<String, EnvFilter>> =
OnceLock::new();

pub(crate) fn enable_logging() -> anyhow::Result<()> {
tracing_subscriber::registry()
Expand All @@ -383,7 +385,8 @@ pub(crate) fn enable_logging() -> anyhow::Result<()> {
}
let config = config::Config::get();
if !config.log_level.is_empty() {
MULTI_LOG_FILTER.add(config.log_level.clone());
let filter = MULTI_LOG_FILTER.add(config.log_level.clone());
_ = PERMANENT_MIN_LOG_LEVEL.set(filter);
}
MULTI_LOG_WRITER.add(config.log_method); // same than MULTI_LOG_FILTER

Expand Down
2 changes: 2 additions & 0 deletions sidecar/src/service/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ pub fn set_session_config(
#[cfg(windows)] remote_config_notify_function: *mut libc::c_void,
session_id: String,
config: &SessionConfig,
is_fork: bool,
) -> io::Result<()> {
#[cfg(unix)]
let remote_config_notify_target = pid;
Expand All @@ -219,6 +220,7 @@ pub fn set_session_config(
session_id,
remote_config_notify_target,
config: config.clone(),
is_fork,
})
}

Expand Down
1 change: 1 addition & 0 deletions sidecar/src/service/sidecar_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub trait SidecarInterface {
session_id: String,
remote_config_notify_target: RemoteConfigNotifyTarget,
config: SessionConfig,
is_fork: bool,
);

/// Shuts down a runtime.
Expand Down
16 changes: 14 additions & 2 deletions sidecar/src/service/sidecar_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::Duration;
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info, trace, warn};

use futures::FutureExt;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -274,6 +274,13 @@ impl SidecarServer {
}
};

debug!(
"Received {} bytes of data for {:?} with headers {:?}",
data.len(),
target,
headers
);

let mut size = 0;
let mut processor = tracer_payload::DefaultTraceChunkProcessor;
let mut payload_params = tracer_payload::TracerPayloadParams::new(
Expand All @@ -286,6 +293,7 @@ impl SidecarServer {
payload_params.measure_size(&mut size);
match payload_params.try_into() {
Ok(payload) => {
trace!("Parsed the trace payload and enqueuing it for sending: {payload:?}");
let data = SendData::new(size, payload, headers, target);
self.trace_flusher.enqueue(data);
}
Expand Down Expand Up @@ -670,6 +678,7 @@ impl SidecarInterface for SidecarServer {
#[cfg(windows)]
remote_config_notify_function: crate::service::remote_configs::RemoteConfigNotifyFunction,
config: SessionConfig,
is_fork: bool,
) -> Self::SetSessionConfigFut {
debug!("Set session config for {session_id} to {config:?}");

Expand Down Expand Up @@ -762,7 +771,9 @@ impl SidecarInterface for SidecarServer {
}

Box::pin(async move {
session.shutdown_running_instances().await;
if !is_fork {
session.shutdown_running_instances().await;
}
no_response().await
})
}
Expand Down Expand Up @@ -1000,6 +1011,7 @@ impl SidecarInterface for SidecarServer {
} else {
Some(Cow::Owned(token))
};
debug!("Update test token of session {session_id} to {token:?}");
fn update_cfg<F: FnOnce(Endpoint) -> anyhow::Result<()>>(
endpoint: Option<Endpoint>,
set: F,
Expand Down
2 changes: 1 addition & 1 deletion sidecar/src/service/tracing/trace_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl TraceFlusher {
Err(e) => error!("Error receiving agent configuration: {e:?}"),
}
}
info!("Successfully flushed traces to {}", endpoint.url);
info!("Successfully flushed traces to {endpoint:?}");
}
Err(e) => {
error!("Error sending trace: {e:?}");
Expand Down
9 changes: 8 additions & 1 deletion tinybytes/src/bytes_string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
use crate::Bytes;
#[cfg(feature = "serde")]
use serde::ser::{Serialize, Serializer};
use std::fmt::{Debug, Formatter};
use std::{borrow::Borrow, hash, str::Utf8Error};

#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone, Eq, PartialEq)]
pub struct BytesString {
bytes: Bytes,
}
Expand Down Expand Up @@ -157,6 +158,12 @@ impl hash::Hash for BytesString {
}
}

impl Debug for BytesString {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.serialize_str(self.as_str())
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading