Skip to content

Commit 2255c7d

Browse files
authored
optional traces for vortex-layout (#2570)
1 parent 6cd0ece commit 2255c7d

File tree

9 files changed

+68
-23
lines changed

9 files changed

+68
-23
lines changed

Cargo.lock

+3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vortex-datafusion/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ vortex-layout = { workspace = true, features = ["tokio"] }
5151
vortex-metrics = { workspace = true }
5252

5353
[features]
54-
tracing = ["dep:tracing", "dep:tracing-futures", "vortex-io/tracing"]
54+
tracing = ["dep:tracing", "dep:tracing-futures", "vortex-io/tracing", "vortex-layout/tracing", "vortex-file/tracing"]
5555

5656
[dev-dependencies]
5757
anyhow = { workspace = true }

vortex-file/Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ oneshot = { workspace = true, features = ["async"] }
2727
rustc-hash = { workspace = true }
2828
tokio = { workspace = true, features = ["rt"], optional = true }
2929
tracing = { workspace = true, optional = true }
30+
tracing-futures = { workspace = true, features = ["futures-03"], optional = true }
3031
# Needed to pickup the "js" feature for wasm targets from the workspace configuration
3132
uuid = { workspace = true }
3233
vortex-alp = { workspace = true }
@@ -60,4 +61,4 @@ workspace = true
6061

6162
[features]
6263
object_store = ["vortex-error/object_store", "vortex-io/object_store"]
63-
tracing = ["dep:tracing", "vortex-io/tracing"]
64+
tracing = ["dep:tracing", "dep:tracing-futures", "vortex-io/tracing", "vortex-layout/tracing"]

vortex-file/src/generic.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use moka::future::CacheBuilder;
99
use vortex_buffer::{Alignment, ByteBuffer};
1010
use vortex_error::{VortexExpect, VortexResult, vortex_err, vortex_panic};
1111
use vortex_io::VortexReadAt;
12+
use vortex_layout::instrument;
1213
use vortex_layout::scan::ScanDriver;
1314
use vortex_layout::segments::{AsyncSegmentReader, SegmentId};
1415
use vortex_metrics::{Counter, VortexMetrics};
@@ -186,7 +187,10 @@ impl<R: VortexReadAt> ScanDriver for GenericScanDriver<R> {
186187
});
187188

188189
// Buffer some number of concurrent I/O operations.
189-
io_stream.buffer_unordered(self.options.io_concurrency)
190+
instrument!(
191+
"io_stream",
192+
io_stream.buffer_unordered(self.options.io_concurrency)
193+
)
190194
}
191195
}
192196

vortex-layout/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ itertools = { workspace = true }
2828
log = { workspace = true }
2929
pin-project-lite = { workspace = true }
3030
tokio = { workspace = true, features = ["sync"], optional = true }
31+
tracing = { workspace = true, optional = true }
32+
tracing-futures = { workspace = true, features = ["futures-03"], optional = true }
3133
vortex-array = { workspace = true }
3234
vortex-buffer = { workspace = true }
3335
vortex-dtype = { workspace = true }
@@ -43,6 +45,7 @@ rstest = { workspace = true }
4345

4446
[features]
4547
tokio = ["dep:tokio", "vortex-error/tokio"]
48+
tracing = ["dep:tracing", "dep:tracing-futures"]
4649

4750
[lints]
4851
workspace = true

vortex-layout/src/layouts/flat/reader.rs

+25-17
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use vortex_error::{VortexExpect, VortexResult, vortex_err, vortex_panic};
88
use crate::layouts::flat::FlatLayout;
99
use crate::reader::LayoutReader;
1010
use crate::segments::AsyncSegmentReader;
11-
use crate::{Layout, LayoutReaderExt, LayoutVTable};
11+
use crate::{Layout, LayoutReaderExt, LayoutVTable, instrument};
1212

1313
pub struct FlatReader {
1414
layout: Layout,
@@ -43,25 +43,33 @@ impl FlatReader {
4343

4444
pub(crate) async fn array(&self) -> VortexResult<&ArrayRef> {
4545
self.array
46-
.get_or_try_init(async move {
47-
let segment_id = self
48-
.layout()
49-
.segment_id(0)
50-
.ok_or_else(|| vortex_err!("FlatLayout missing segment"))?;
46+
.get_or_try_init(instrument!(
47+
"flat_read",
48+
{ name = self.layout().name() },
49+
async move {
50+
let segment_id = self
51+
.layout()
52+
.segment_id(0)
53+
.ok_or_else(|| vortex_err!("FlatLayout missing segment"))?;
5154

52-
log::debug!(
53-
"Requesting segment {} for flat layout {} expr",
54-
segment_id,
55-
self.layout().name(),
56-
);
55+
log::debug!(
56+
"Requesting segment {} for flat layout {} expr",
57+
segment_id,
58+
self.layout().name(),
59+
);
5760

58-
// Fetch all the array segment.
59-
let buffer = self.segment_reader.get(segment_id).await?;
60-
let row_count = usize::try_from(self.layout().row_count())
61-
.vortex_expect("FlatLayout row count does not fit within usize");
61+
// Fetch all the array segment.
62+
let buffer = self.segment_reader.get(segment_id).await?;
63+
let row_count = usize::try_from(self.layout().row_count())
64+
.vortex_expect("FlatLayout row count does not fit within usize");
6265

63-
ArrayParts::try_from(buffer)?.decode(self.ctx(), self.dtype().clone(), row_count)
64-
})
66+
ArrayParts::try_from(buffer)?.decode(
67+
self.ctx(),
68+
self.dtype().clone(),
69+
row_count,
70+
)
71+
}
72+
))
6573
.await
6674
}
6775
}

vortex-layout/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ mod row_mask;
1515
pub use row_mask::*;
1616
use vortex_array::arcref::ArcRef;
1717

18+
pub mod macros;
1819
mod reader;
1920
pub mod scan;
2021
pub mod segments;

vortex-layout/src/macros.rs

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/// A macro for optionally instrumenting a future, if tracing feature is enabled.
2+
#[macro_export]
3+
macro_rules! instrument {
4+
($span_name:expr, $expr:expr) => {
5+
instrument!($span_name, {}, $expr)
6+
};
7+
($span_name:expr, { $($key:ident = $value:expr),* $(,)? }, $expr:expr) => {
8+
{
9+
let task = $expr;
10+
#[cfg(feature = "tracing")]
11+
{
12+
use tracing_futures::Instrument;
13+
task.instrument(tracing::info_span!(
14+
$span_name,
15+
$($key = $value,)*
16+
))
17+
}
18+
#[cfg(not(feature = "tracing"))]
19+
{
20+
task
21+
}
22+
}
23+
};
24+
}

vortex-layout/src/scan/mod.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use vortex_mask::Mask;
1818
use crate::scan::filter::FilterExpr;
1919
use crate::scan::unified::UnifiedDriverStream;
2020
use crate::segments::AsyncSegmentReader;
21-
use crate::{ExprEvaluator, Layout, LayoutReader, LayoutReaderExt, RowMask};
21+
use crate::{ExprEvaluator, Layout, LayoutReader, LayoutReaderExt, RowMask, instrument};
2222

2323
pub mod executor;
2424
pub(crate) mod filter;
@@ -261,7 +261,7 @@ impl<D: ScanDriver> Scan<D> {
261261
let reader = reader.clone();
262262

263263
// This future is the processing task
264-
async move {
264+
instrument!("process", async move {
265265
let row_mask = match pruning {
266266
None => row_mask,
267267
Some(pruning_filter) => {
@@ -284,12 +284,13 @@ impl<D: ScanDriver> Scan<D> {
284284
}
285285
VortexResult::Ok(Some(array))
286286
}
287-
}
287+
})
288288
})
289289
.map(move |processing_task| task_executor.spawn(processing_task))
290290
.buffered(self.concurrency)
291291
.filter_map(|v| async move { v.unnest().transpose() });
292292

293+
let exec_stream = instrument!("exec_stream", exec_stream);
293294
let io_stream = self.driver.io_stream();
294295

295296
let unified = UnifiedDriverStream {

0 commit comments

Comments
 (0)