Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix interleaved non-deferred spans #78

Merged
merged 4 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions examples/concurrent_eager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

use futures::{pin_mut, FutureExt};
use tracing::Instrument;
use tracing_subscriber::{layer::SubscriberExt, registry::Registry};
use tracing_tree::HierarchicalLayer;

fn main() {
let layer = HierarchicalLayer::default()
.with_writer(std::io::stdout)
.with_indent_lines(true)
.with_indent_amount(4)
.with_thread_names(true)
.with_thread_ids(true)
.with_span_retrace(true)
.with_deferred_spans(false)
.with_verbose_entry(true)
.with_targets(true);

let subscriber = Registry::default().with(layer);
tracing::subscriber::set_global_default(subscriber).unwrap();
#[cfg(feature = "tracing-log")]
tracing_log::LogTracer::init().unwrap();

let fut_a = spawn_fut("a", a);
pin_mut!(fut_a);

let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);
assert!(fut_a.poll_unpin(&mut cx).is_pending());

let fut_b = spawn_fut("b", b);
pin_mut!(fut_b);

assert!(fut_b.poll_unpin(&mut cx).is_pending());

assert!(fut_a.poll_unpin(&mut cx).is_pending());
assert!(fut_b.poll_unpin(&mut cx).is_pending());

assert!(fut_a.poll_unpin(&mut cx).is_ready());
assert!(fut_b.poll_unpin(&mut cx).is_ready());
}

fn spawn_fut<F: Fn() -> Fut, Fut: Future<Output = ()>>(
key: &'static str,
inner: F,
) -> impl Future<Output = ()> {
let span = tracing::info_span!("spawn_fut", key);

async move {
countdown(1).await;

inner().await;
}
.instrument(span)
}

fn a() -> impl Future<Output = ()> {
let span = tracing::info_span!("a");

async move {
countdown(1).await;
tracing::info!("a");
}
.instrument(span)
}

fn b() -> impl Future<Output = ()> {
let span = tracing::info_span!("b");

async move {
countdown(1).await;
tracing::info!("b");
}
.instrument(span)
}

fn countdown(count: u32) -> impl Future<Output = ()> {
CountdownFuture { count }
}

struct CountdownFuture {
count: u32,
}

impl Future for CountdownFuture {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.count == 0 {
Poll::Ready(())
} else {
self.count -= 1;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
20 changes: 20 additions & 0 deletions examples/concurrent_eager.stdout
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
1:main┐concurrent_eager::spawn_fut key="a"
1:main┐concurrent_eager::spawn_fut key="b"
1:main┐concurrent_eager::spawn_fut key="a"
1:main├─┐concurrent_eager::spawn_fut key="a"
1:main│ └─┐concurrent_eager::a
1:main┐concurrent_eager::spawn_fut key="b"
1:main├─┐concurrent_eager::spawn_fut key="b"
1:main│ └─┐concurrent_eager::b
1:main┐concurrent_eager::spawn_fut key="a"
1:main├─┐concurrent_eager::spawn_fut key="a"
1:main│ └─┐concurrent_eager::a
1:main│ ├─── Xms INFO concurrent_eager a
1:main├───┘
1:main┐concurrent_eager::spawn_fut key="b"
1:main├─┐concurrent_eager::spawn_fut key="b"
1:main│ └─┐concurrent_eager::b
1:main│ ├─── Xms INFO concurrent_eager b
1:main├───┘
1:main┘
1:main┘
21 changes: 9 additions & 12 deletions examples/concurrent_verbose.stdout
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
1:main┐concurrent_verbose::hierarchical-example version=0.1
1:main├───┐concurrent_verbose::server host="localhost", port=8080
1:main├─┐concurrent_verbose::hierarchical-example version=0.1
1:main│ └─┐concurrent_verbose::server host="localhost", port=8080
1:main│ ├─── Xms INFO concurrent_verbose starting
1:main│ ├─── Xs INFO concurrent_verbose listening
1:main│ ├─── Xs DEBUG concurrent_verbose starting countdowns
1:main│ ├─┐concurrent_verbose::server host="localhost", port=8080
1:main│ │ └─┐concurrent_verbose::countdowns
1:main│ │ ├───┐concurrent_verbose::countdown_a
1:main│ │ │ ├─── Xms DEBUG concurrent_verbose polling countdown, label="a", count=3
1:main│ ├───┐concurrent_verbose::countdowns
1:main│ │ ├─┐concurrent_verbose::countdowns
1:main│ │ │ └─┐concurrent_verbose::countdown_b
1:main│ │ │ └─┐concurrent_verbose::countdown_a
1:main│ │ │ ├─── Xms DEBUG concurrent_verbose polling countdown, label="a", count=3
1:main│ │ ├───┐concurrent_verbose::countdown_b
1:main│ │ │ ├─── Xms DEBUG concurrent_verbose polling countdown, label="b", count=5
1:main│ │ │ ├─── Xms DEBUG concurrent_verbose polling countdown, label="b", count=4
1:main│ │ ├─┐concurrent_verbose::countdowns
1:main│ │ │ └─┐concurrent_verbose::countdown_a
1:main│ │ ├───┐concurrent_verbose::countdown_a
1:main│ │ │ ├─── Xms DEBUG concurrent_verbose polling countdown, label="a", count=2
1:main│ ├─┐concurrent_verbose::server host="localhost", port=8080
1:main│ │ └─┐concurrent_verbose::conn peer_addr="82.9.9.9", port=42381
1:main│ ├───┐concurrent_verbose::conn peer_addr="82.9.9.9", port=42381
1:main│ │ ├─── Xms WARN concurrent_verbose peer1 warning
1:main│ ├─┐concurrent_verbose::server host="localhost", port=8080
1:main│ │ └─┐concurrent_verbose::countdowns
1:main│ ├───┐concurrent_verbose::countdowns
1:main│ │ ├─── Xms INFO concurrent_verbose finished polling countdowns
1:main│ │ │ ┌─┘concurrent_verbose::countdown_b
1:main│ │ ├─┘concurrent_verbose::countdowns
Expand Down
23 changes: 10 additions & 13 deletions examples/deferred.stdout
Original file line number Diff line number Diff line change
@@ -1,35 +1,32 @@
-> This prints before the span open message
1:main┐open(v): deferred::hierarchical-example version=0.1
1:main├─┐open: deferred::server host="localhost", port=8080
1:main┐open: deferred::hierarchical-example version=0.1
1:main├┐pre_open: deferred::hierarchical-example version=0.1
1:main│└┐open(v): deferred::server host="localhost", port=8080
1:main│ ├─ Xms INFO deferred starting
1:main│ ├─ Xs INFO deferred listening
-> Deferring two levels of spans
1:main│ ├┐pre_open: deferred::server host="localhost", port=8080
1:main│ │└┐open(v): deferred::connections
1:main│ │ ├─┐open: deferred::conn peer_addr="82.9.9.9", port=42381
1:main│ ├─┐open: deferred::connections
1:main│ │ ├┐pre_open: deferred::connections
1:main│ │ │└┐open(v): deferred::conn peer_addr="82.9.9.9", port=42381
1:main│ │ │ ├─ Xms DEBUG deferred connected, peer="peer1"
1:main│ │ │ ├─ Xms DEBUG deferred message received, length=2
1:main│ │ │┌┘close(v): deferred::conn peer_addr="82.9.9.9", port=42381
1:main│ │ ├┘post_close: deferred::connections
1:main│ │ ├┐pre_open: deferred::connections
1:main│ │ │└┐open(v): deferred::conn peer_addr="8.8.8.8", port=18230
1:main│ │ ├─┐open: deferred::conn peer_addr="8.8.8.8", port=18230
1:main│ │ │ ├─ Xms DEBUG deferred connected, peer="peer3"
1:main│ │ │┌┘close(v): deferred::conn peer_addr="8.8.8.8", port=18230
1:main│ │ ├┘post_close: deferred::connections
1:main│ │ ├┐pre_open: deferred::connections
1:main│ │ │└┐open(v): deferred::foomp 42 <- format string, normal_var=43
1:main│ │ ├─┐open: deferred::foomp 42 <- format string, normal_var=43
1:main│ │ │ ├─ Xms ERROR deferred hello
1:main│ │ │┌┘close(v): deferred::foomp 42 <- format string, normal_var=43
1:main│ │ ├┘post_close: deferred::connections
1:main│ │ ├┐pre_open: deferred::connections
1:main│ │ │└┐open(v): deferred::conn peer_addr="82.9.9.9", port=42381
1:main│ │ ├─┐open: deferred::conn peer_addr="82.9.9.9", port=42381
1:main│ │ │ ├─ Xms WARN deferred weak encryption requested, algo="xor"
1:main│ │ │ ├─ Xms DEBUG deferred response sent, length=8
1:main│ │ │ ├─ Xms DEBUG deferred disconnected
1:main│ │ │┌┘close(v): deferred::conn peer_addr="82.9.9.9", port=42381
1:main│ │ ├┘post_close: deferred::connections
1:main│ │ ├┐pre_open: deferred::connections
1:main│ │ │└┐open(v): deferred::conn peer_addr="8.8.8.8", port=18230
1:main│ │ ├─┐open: deferred::conn peer_addr="8.8.8.8", port=18230
1:main│ │ │ ├─ Xms DEBUG deferred message received, length=5
1:main│ │ │ ├─ Xms DEBUG deferred response sent, length=8
1:main│ │ │ ├─ Xms DEBUG deferred disconnected
Expand Down
132 changes: 65 additions & 67 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use format::{write_span_mode, Buffers, ColorLevel, Config, FmtEvent, SpanMode};

use nu_ansi_term::{Color, Style};
use std::{
fmt::{self, Write as _},
fmt::{self, Write},
io::{self, IsTerminal},
iter::Fuse,
mem,
Expand Down Expand Up @@ -267,69 +267,63 @@ where
Ok(())
}

/// If `span_retrace` ensures that `new_span` is properly printed before an event
/// Ensures that `new_span` and all its ancestors are properly printed before an event
fn write_retrace_span<'a, S>(
&self,
new_span: &SpanRef<'a, S>,
bufs: &mut Buffers,
ctx: &'a Context<S>,
pre_open: bool,
) where
S: Subscriber + for<'new_span> LookupSpan<'new_span>,
{
let should_write = if self.config.deferred_spans {
if let Some(data) = new_span.extensions_mut().get_mut::<Data>() {
!data.written
} else {
false
}
} else {
false
};

// Also handle deferred spans along with retrace since deferred spans may need to print
// multiple spans at once as a whole tree can be deferred
if self.config.span_retrace || should_write {
let old_span_id = bufs.current_span.replace((new_span.id()).clone());
let old_span_id = old_span_id.as_ref();

if Some(&new_span.id()) != old_span_id {
let old_span = old_span_id.as_ref().and_then(|v| ctx.span(v));
let old_path = old_span.as_ref().map(scope_path).into_iter().flatten();

let new_path = scope_path(new_span);

// Print the path from the common base of the two spans
let new_path = DifferenceIter::new(old_path, new_path, |v| v.id());

for (i, span) in new_path.enumerate() {
// Mark traversed spans as *written*
let was_written = if let Some(data) = span.extensions_mut().get_mut::<Data>() {
mem::replace(&mut data.written, true)
} else {
// `on_new_span` was not called, before
// Consider if this should panic instead, which is *technically* correct but is
// bad behavior for a logging layer in production.
false
};

// Print the previous span before entering a new deferred or retraced span
if i == 0 && self.config.verbose_entry {
if let Some(parent) = &span.parent() {
self.write_span_info(parent, bufs, SpanMode::PreOpen);
}
//
// If a another event occurs right after a previous event in the same span, this will
// simply print nothing since the path to the common lowest ancestor is empty
// if self.config.span_retrace || self.config.deferred_spans {
let old_span_id = bufs.current_span.replace((new_span.id()).clone());
let old_span_id = old_span_id.as_ref();
let new_span_id = new_span.id();

if Some(&new_span_id) != old_span_id {
let old_span = old_span_id.as_ref().and_then(|v| ctx.span(v));
let old_path = old_span.as_ref().map(scope_path).into_iter().flatten();

let new_path = scope_path(new_span);

// Print the path from the common base of the two spans
let new_path = DifferenceIter::new(old_path, new_path, |v| v.id());

for (i, span) in new_path.enumerate() {
// Mark traversed spans as *written*
let was_written = if let Some(data) = span.extensions_mut().get_mut::<Data>() {
mem::replace(&mut data.written, true)
} else {
// `on_new_span` was not called, before
// Consider if this should panic instead, which is *technically* correct but is
// bad behavior for a logging layer in production.
false
};

let verbose = i == 1 && pre_open && span.id() == new_span_id;
// Print the parent of the new span if `pre_open==true`
if verbose {
if let Some(span) = span.parent() {
self.write_span_info(&span, bufs, SpanMode::PreOpen);
}
let verbose = self.config.verbose_entry && i == 0;

self.write_span_info(
&span,
bufs,
if was_written {
SpanMode::Retrace { verbose }
} else {
SpanMode::Open { verbose }
},
)
}

self.write_span_info(
&span,
bufs,
if was_written {
SpanMode::Retrace { verbose }
} else {
SpanMode::Open { verbose }
},
)
}
}
}
Expand Down Expand Up @@ -491,22 +485,24 @@ where

let bufs = &mut *self.bufs.lock().unwrap();

// Store the most recently entered span
bufs.current_span = Some(span.id());

if self.config.verbose_entry {
if let Some(span) = span.parent() {
self.write_span_info(&span, bufs, SpanMode::PreOpen);
if self.config.span_retrace {
self.write_retrace_span(&span, bufs, &ctx, self.config.verbose_entry);
} else {
if self.config.verbose_entry {
if let Some(span) = span.parent() {
self.write_span_info(&span, bufs, SpanMode::PreOpen);
}
}
// Store the most recently entered span
bufs.current_span = Some(span.id());
self.write_span_info(
&span,
bufs,
SpanMode::Open {
verbose: self.config.verbose_entry,
},
);
}

self.write_span_info(
&span,
bufs,
SpanMode::Open {
verbose: self.config.verbose_entry,
},
);
}

fn on_event(&self, event: &Event<'_>, ctx: Context<S>) {
Expand All @@ -518,7 +514,9 @@ where
let bufs = &mut *guard;

if let Some(new_span) = &span {
self.write_retrace_span(new_span, bufs, &ctx);
if self.config.span_retrace || self.config.deferred_spans {
self.write_retrace_span(new_span, bufs, &ctx, self.config.verbose_entry);
}
}

let mut event_buf = &mut bufs.current_buf;
Expand Down
Loading