Skip to content

Commit 7137c44

Browse files
committed
console-api,console-subscriber-tokio-console: attempt to genericize over subscriber
This is currently a build failure. Not sure how to fix it.
1 parent bb94af6 commit 7137c44

File tree

16 files changed

+384
-312
lines changed

16 files changed

+384
-312
lines changed

Cargo.lock

Lines changed: 256 additions & 223 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

console-api/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ tonic = { version = "0.7", default-features = false, features = [
3636
] }
3737
prost = "0.10"
3838
prost-types = "0.10"
39-
tracing-core = "0.1.17"
39+
tracing-core = "0.1.29"
4040

4141
[dev-dependencies]
4242
tonic-build = { version = "0.7", default-features = false, features = [

console-api/proto/common.proto

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,6 @@ message Span {
7878
repeated Field fields = 3;
7979
// Timestamp for the span.
8080
google.protobuf.Timestamp at = 4;
81-
// An Id that uniquely identifies this span's parent (if any).
82-
optional SpanId parent_id = 5;
8381
}
8482

8583
// Any new metadata that was registered since the last update.

console-api/src/generated/google.protobuf.rs

Whitespace-only changes.

console-api/src/generated/rs.tokio.console.common.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,6 @@ pub struct Span {
106106
/// Timestamp for the span.
107107
#[prost(message, optional, tag="4")]
108108
pub at: ::core::option::Option<::prost_types::Timestamp>,
109-
/// An Id that uniquely identifies this span's parent (if any).
110-
#[prost(message, optional, tag="5")]
111-
pub parent_id: ::core::option::Option<SpanId>,
112109
}
113110
/// Any new metadata that was registered since the last update.
114111
#[derive(Clone, PartialEq, ::prost::Message)]

console-api/src/generated/rs.tokio.console.instrument.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ pub mod instrument_client {
8484
where
8585
T: tonic::client::GrpcService<tonic::body::BoxBody>,
8686
T::Error: Into<StdError>,
87-
T::ResponseBody: Default + Body<Data = Bytes> + Send + 'static,
87+
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
8888
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
8989
{
9090
pub fn new(inner: T) -> Self {
@@ -97,6 +97,7 @@ pub mod instrument_client {
9797
) -> InstrumentClient<InterceptedService<T, F>>
9898
where
9999
F: tonic::service::Interceptor,
100+
T::ResponseBody: Default,
100101
T: tonic::codegen::Service<
101102
http::Request<tonic::body::BoxBody>,
102103
Response = http::Response<
@@ -129,9 +130,9 @@ pub mod instrument_client {
129130
&mut self,
130131
request: impl tonic::IntoRequest<super::InstrumentRequest>,
131132
) -> Result<
132-
tonic::Response<tonic::codec::Streaming<super::Update>>,
133-
tonic::Status,
134-
> {
133+
tonic::Response<tonic::codec::Streaming<super::Update>>,
134+
tonic::Status,
135+
> {
135136
self.inner
136137
.ready()
137138
.await
@@ -152,11 +153,9 @@ pub mod instrument_client {
152153
&mut self,
153154
request: impl tonic::IntoRequest<super::TaskDetailsRequest>,
154155
) -> Result<
155-
tonic::Response<
156-
tonic::codec::Streaming<super::super::tasks::TaskDetails>,
157-
>,
158-
tonic::Status,
159-
> {
156+
tonic::Response<tonic::codec::Streaming<super::super::tasks::TaskDetails>>,
157+
tonic::Status,
158+
> {
160159
self.inner
161160
.ready()
162161
.await

console-api/src/generated/rs.tokio.console.trace.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ pub mod trace_client {
104104
where
105105
T: tonic::client::GrpcService<tonic::body::BoxBody>,
106106
T::Error: Into<StdError>,
107-
T::ResponseBody: Default + Body<Data = Bytes> + Send + 'static,
107+
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
108108
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
109109
{
110110
pub fn new(inner: T) -> Self {
@@ -117,6 +117,7 @@ pub mod trace_client {
117117
) -> TraceClient<InterceptedService<T, F>>
118118
where
119119
F: tonic::service::Interceptor,
120+
T::ResponseBody: Default,
120121
T: tonic::codegen::Service<
121122
http::Request<tonic::body::BoxBody>,
122123
Response = http::Response<
@@ -149,9 +150,9 @@ pub mod trace_client {
149150
&mut self,
150151
request: impl tonic::IntoRequest<super::WatchRequest>,
151152
) -> Result<
152-
tonic::Response<tonic::codec::Streaming<super::TraceEvent>>,
153-
tonic::Status,
154-
> {
153+
tonic::Response<tonic::codec::Streaming<super::TraceEvent>>,
154+
tonic::Status,
155+
> {
155156
self.inner
156157
.ready()
157158
.await

console-api/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
pub mod async_ops;
55
/// Represents unique id's and Rust source locations.
66
mod common;
7+
/// Represents updates to the causality tree.
8+
pub mod consequences;
79
/// Represents interactions between the console-subscriber and a console client observing it.
810
pub mod instrument;
911
/// Represents updates to the resources in an async runtime.
@@ -12,6 +14,4 @@ pub mod resources;
1214
pub mod tasks;
1315
/// Represents events on the tracing subsystem: thread registration and span activities.
1416
pub mod trace;
15-
/// Represents updates to the causality tree.
16-
pub mod consequences;
1717
pub use common::*;

console-subscriber/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ tokio-stream = "0.1"
3737
thread_local = "1.1.3"
3838
console-api = { version = "0.2.0", path = "../console-api", features = ["transport"] }
3939
tonic = { version = "0.7", features = ["transport"] }
40-
tracing-core = "0.1.24"
40+
tracing-core = "0.1.29"
4141
tracing = "0.1.26"
4242
tracing-subscriber = { version = "0.3.11", default-features = false, features = ["fmt", "registry"] }
43-
tracing-causality = { path = "../../tracing-causality" }
43+
tracing-causality = { version = "0.1.0" }
4444
futures = { version = "0.3", default-features = false }
4545
hdrhistogram = { version = "7.3.0", default-features = false, features = ["serialization"] }
4646
# The parking_lot dependency is renamed, because we want our `parking_lot`

console-subscriber/examples/app.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::time::Duration;
1+
use std::{sync::Arc, time::Duration};
22

33
static HELP: &str = r#"
44
Example console-instrumented app
@@ -17,13 +17,16 @@ OPTIONS:
1717
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1818
use tracing_subscriber::prelude::*;
1919

20+
// initialize an underlying `Registry`
21+
let registry = Arc::new(tracing_subscriber::registry());
22+
2023
// spawn the console server in the background,
2124
// returning a `Layer`:
22-
let console_layer = console_subscriber::spawn();
25+
let console_layer = console_subscriber::spawn(registry.clone());
2326

24-
// build a `Subscriber` by combining layers with a
25-
// `tracing_subscriber::Registry`:
26-
tracing_subscriber::registry().with(console_layer).init();
27+
// build a `Subscriber` by combining layers with the
28+
// `registry`:
29+
registry.with(console_layer).init();
2730

2831
// spawn optional extras from CLI args
2932
// skip first which is command name

console-subscriber/examples/deep.rs renamed to console-subscriber/examples/nested.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,22 @@
1+
use std::sync::Arc;
2+
13
use tokio::task;
24

35
#[tokio::main]
46
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
57
use tracing_subscriber::prelude::*;
68

7-
let subscriber = tracing_subscriber::registry()
8-
.with(console_subscriber::spawn())
9-
.with(tracing_subscriber::filter::filter_fn(|_| true));
9+
// initialize an underlying `Registry`
10+
let registry = Arc::new(tracing_subscriber::registry());
11+
12+
// spawn the console server in the background,
13+
// returning a `Layer`:
14+
let console_layer = console_subscriber::spawn(registry.clone());
1015

11-
tracing::subscriber::set_global_default(subscriber).unwrap();
16+
registry
17+
.with(console_layer)
18+
.with(tracing_subscriber::filter::filter_fn(|_| true))
19+
.init();
1220

1321
task::Builder::default()
1422
.name("main-task")

console-subscriber/src/aggregator/mod.rs

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use super::{Command, Event, Shared, Watch};
22
use crate::{
3-
consequences,
43
stats::{self, Unsent},
54
ToProto, WatchRequest,
65
};
@@ -16,14 +15,16 @@ use std::{
1615
},
1716
time::{Duration, Instant},
1817
};
18+
use tracing::Subscriber;
1919
use tracing_core::{span::Id, Metadata};
20+
use tracing_subscriber::registry::LookupSpan;
2021

2122
mod id_data;
2223
mod shrink;
2324
use self::id_data::{IdData, Include};
2425
use self::shrink::{ShrinkMap, ShrinkVec};
2526

26-
pub(crate) struct Aggregator {
27+
pub(crate) struct Aggregator<S> {
2728
/// Channel of incoming events emitted by `TaskLayer`s.
2829
events: mpsc::Receiver<Event>,
2930

@@ -86,6 +87,9 @@ pub(crate) struct Aggregator {
8687
/// Used to anchor monotonic timestamps to a base `SystemTime`, to produce a
8788
/// timestamp that can be sent over the wire.
8889
base_time: stats::TimeAnchor,
90+
91+
/// The subscriber used by the application.
92+
subscriber: Arc<S>,
8993
}
9094

9195
#[derive(Debug, Default)]
@@ -129,13 +133,17 @@ struct AsyncOp {
129133
source: String,
130134
}
131135

132-
impl Aggregator {
136+
impl<S> Aggregator<S>
137+
where
138+
S: Subscriber + for<'s> LookupSpan<'s>,
139+
{
133140
pub(crate) fn new(
134141
events: mpsc::Receiver<Event>,
135142
rpcs: mpsc::Receiver<Command>,
136143
builder: &crate::Builder,
137144
shared: Arc<crate::Shared>,
138145
base_time: stats::TimeAnchor,
146+
subscriber: Arc<S>,
139147
) -> Self {
140148
Self {
141149
shared,
@@ -156,6 +164,7 @@ impl Aggregator {
156164
poll_ops: Default::default(),
157165
temporality: Temporality::Live,
158166
base_time,
167+
subscriber,
159168
}
160169
}
161170

@@ -324,19 +333,15 @@ impl Aggregator {
324333
let now = Some(self.base_time.to_timestamp(Instant::now()));
325334

326335
let span = id.clone();
327-
// XXX(jswrenn): A new thread is spawned solely for the sake of
328-
// getting access to the global subscriber.
329-
let (trace, updates) = std::thread::spawn(move || {
330-
// XXX(jswrenn): Need to gracefully handle the case where the
331-
// task span is already closed.
332-
consequences::trace(&span, buffer)
333-
.expect("the subscriber isn't a Registry")
334-
.expect("id is already closed")
335-
})
336-
.join()
337-
.unwrap();
338336

339-
let initial = crate::consequences::trace_into_proto(trace);
337+
let maybe_trace = tracing_causality::trace(&self.subscriber, &span, buffer);
338+
let (initial, updates) = if let Some((trace, updates)) = maybe_trace {
339+
(crate::consequences::trace_into_proto(trace), updates)
340+
} else {
341+
// a trace could not be produced, because the span is already closed
342+
//(vec![], tracing_causality::Updates::default())
343+
panic!();
344+
};
340345

341346
// Send back the stream receiver.
342347
// Then send the initial state --- if this fails, the subscription is already dead.

console-subscriber/src/builder.rs

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@ use super::{ConsoleLayer, Server};
22
use std::{
33
net::{SocketAddr, ToSocketAddrs},
44
path::PathBuf,
5+
sync::Arc,
56
thread,
67
time::Duration,
78
};
89
use tokio::runtime;
910
use tracing::Subscriber;
1011
use tracing_subscriber::{
11-
filter,
12+
filter::{self, FilterFn},
1213
layer::{Layer, SubscriberExt},
1314
prelude::*,
1415
registry::LookupSpan,
@@ -51,7 +52,7 @@ impl Default for Builder {
5152
client_buffer_capacity: ConsoleLayer::DEFAULT_CLIENT_BUFFER_CAPACITY,
5253
publish_interval: ConsoleLayer::DEFAULT_PUBLISH_INTERVAL,
5354
retention: ConsoleLayer::DEFAULT_RETENTION,
54-
server_addr: SocketAddr::new(Server::DEFAULT_IP, Server::DEFAULT_PORT),
55+
server_addr: SocketAddr::new(Server::<()>::DEFAULT_IP, Server::<()>::DEFAULT_PORT),
5556
recording_path: None,
5657
filter_env_var: "RUST_LOG".to_string(),
5758
self_trace: false,
@@ -190,8 +191,11 @@ impl Builder {
190191
}
191192

192193
/// Completes the builder, returning a [`ConsoleLayer`] and [`Server`] task.
193-
pub fn build(self) -> (ConsoleLayer, Server) {
194-
ConsoleLayer::build(self)
194+
pub fn build<S>(self, subscriber: Arc<S>) -> (ConsoleLayer, Server<S>)
195+
where
196+
S: Subscriber + for<'s> LookupSpan<'s>,
197+
{
198+
ConsoleLayer::build(self, subscriber)
195199
}
196200

197201
/// Configures this builder from a standard set of environment variables:
@@ -318,9 +322,11 @@ impl Builder {
318322
.expect("`error` filter should always parse successfully")
319323
});
320324

321-
let console_layer = self.spawn();
325+
let registry = Arc::new(tracing_subscriber::registry());
326+
327+
let console_layer = self.spawn(registry.clone());
322328

323-
tracing_subscriber::registry()
329+
registry
324330
.with(console_layer)
325331
.with(tracing_subscriber::fmt::layer().with_filter(fmt_filter))
326332
.init();
@@ -382,13 +388,28 @@ impl Builder {
382388
/// [`fmt::Layer`]: https://docs.rs/tracing-subscriber/latest/tracing-subscriber/fmt/struct.Layer.html
383389
/// [`console_subscriber::init`]: crate::init()
384390
#[must_use = "a `Layer` must be added to a `tracing::Subscriber` in order to be used"]
385-
pub fn spawn<S>(self) -> impl Layer<S>
391+
pub fn spawn<S>(self, subscriber: Arc<S>) -> impl Layer<Arc<S>>
386392
where
387-
S: Subscriber + for<'a> LookupSpan<'a>,
393+
S: Send + Sync + Subscriber + for<'a> LookupSpan<'a> + 'static,
388394
{
395+
fn console_filter(meta: &tracing::Metadata<'_>) -> bool {
396+
// events will have *targets* beginning with "runtime"
397+
if meta.is_event() {
398+
return meta.target().starts_with("runtime") || meta.target().starts_with("tokio");
399+
}
400+
401+
// spans will have *names* beginning with "runtime". for backwards
402+
// compatibility with older Tokio versions, enable anything with the `tokio`
403+
// target as well.
404+
meta.name().starts_with("runtime.") || meta.target().starts_with("tokio")
405+
}
406+
389407
let self_trace = self.self_trace;
390408

391-
let (layer, server) = self.build();
409+
let (layer, server) = self.build(subscriber);
410+
let filter =
411+
FilterFn::new(console_filter as for<'r, 's> fn(&'r tracing::Metadata<'s>) -> bool);
412+
let layer = layer.with_filter(filter);
392413

393414
thread::Builder::new()
394415
.name("console_subscriber".into())
@@ -553,11 +574,11 @@ pub fn init() {
553574
/// [`fmt::Layer`]: https://docs.rs/tracing-subscriber/latest/tracing-subscriber/fmt/struct.Layer.html
554575
/// [`console_subscriber::init`]: crate::init()
555576
#[must_use = "a `Layer` must be added to a `tracing::Subscriber`in order to be used"]
556-
pub fn spawn<S>() -> impl Layer<S>
577+
pub fn spawn<S>(subscriber: Arc<S>) -> impl Layer<Arc<S>>
557578
where
558-
S: Subscriber + for<'a> LookupSpan<'a>,
579+
S: Send + Sync + Subscriber + for<'a> LookupSpan<'a> + 'static,
559580
{
560-
ConsoleLayer::builder().with_default_env().spawn::<S>()
581+
ConsoleLayer::builder().with_default_env().spawn(subscriber)
561582
}
562583

563584
fn duration_from_env(var_name: &str) -> Option<Duration> {

0 commit comments

Comments
 (0)