diff --git a/Cargo.toml b/Cargo.toml index 3d588fd1f..4cf33066c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,7 +48,7 @@ timely = {workspace = true} columnar = "0.2" [workspace.dependencies] -timely = { version = "0.15", default-features = false } +timely = { version = "0.16", default-features = false } #timely = { path = "../timely-dataflow/timely/", default-features = false } [features] diff --git a/examples/bfs.rs b/examples/bfs.rs index 26d58fe24..fa6bad616 100644 --- a/examples/bfs.rs +++ b/examples/bfs.rs @@ -7,7 +7,7 @@ use differential_dataflow::input::Input; use differential_dataflow::Collection; use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; -use differential_dataflow::logging::DifferentialEvent; +use differential_dataflow::logging::DifferentialEventBuilder; type Node = u32; type Edge = (Node, Node); @@ -30,7 +30,7 @@ fn main() { if let Ok(stream) = ::std::net::TcpStream::connect(&addr) { let writer = ::timely::dataflow::operators::capture::EventWriter::new(stream); let mut logger = ::timely::logging::BatchLogger::new(writer); - worker.log_register().insert::("differential/arrange", move |time, data| + worker.log_register().insert::("differential/arrange", move |time, data| logger.publish_batch(time, data) ); } diff --git a/examples/dynamic.rs b/examples/dynamic.rs index 01422ec57..6a2daf58a 100644 --- a/examples/dynamic.rs +++ b/examples/dynamic.rs @@ -7,7 +7,7 @@ use differential_dataflow::input::Input; use differential_dataflow::Collection; use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; -use differential_dataflow::logging::DifferentialEvent; +use differential_dataflow::logging::DifferentialEventBuilder; type Node = u32; type Edge = (Node, Node); @@ -30,7 +30,7 @@ fn main() { if let Ok(stream) = ::std::net::TcpStream::connect(&addr) { let writer = ::timely::dataflow::operators::capture::EventWriter::new(stream); let mut logger = ::timely::logging::BatchLogger::new(writer); - worker.log_register().insert::("differential/arrange", move |time, data| + worker.log_register().insert::("differential/arrange", move |time, data| logger.publish_batch(time, data) ); } diff --git a/interactive/src/manager.rs b/interactive/src/manager.rs index 164ca42f4..de4f9b30c 100644 --- a/interactive/src/manager.rs +++ b/interactive/src/manager.rs @@ -7,7 +7,7 @@ use std::hash::Hash; use timely::dataflow::ProbeHandle; use timely::communication::Allocate; use timely::worker::Worker; -use timely::logging::TimelyEvent; +use timely::logging::TimelyEventBuilder; // use timely::dataflow::operators::capture::event::EventIterator; @@ -16,7 +16,7 @@ use differential_dataflow::trace::implementations::{KeySpine, ValSpine}; use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::input::InputSession; -use differential_dataflow::logging::DifferentialEvent; +use differential_dataflow::logging::DifferentialEventBuilder; use crate::{Time, Diff, Plan, Datum}; @@ -87,11 +87,11 @@ impl Manager // Deregister loggers, so that the logging dataflows can shut down. worker .log_register() - .insert::("timely", move |_time, _data| { }); + .insert::("timely", move |_time, _data| { }); worker .log_register() - .insert::("differential/arrange", move |_time, _data| { }); + .insert::("differential/arrange", move |_time, _data| { }); } /// Inserts a new input session by name. @@ -226,4 +226,4 @@ impl TraceManager { .insert(keys.to_vec(), handle.clone()); } -} \ No newline at end of file +} diff --git a/src/logging.rs b/src/logging.rs index 628f283af..a0caa7336 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -3,8 +3,11 @@ use columnar::Columnar; use serde::{Deserialize, Serialize}; +/// Container builder for differential log events. +pub type DifferentialEventBuilder = timely::container::CapacityContainerBuilder>; + /// Logger for differential dataflow events. -pub type Logger = ::timely::logging::Logger; +pub type Logger = ::timely::logging_core::TypedLogger; /// Enables logging of differential dataflow events. pub fn enable(worker: &mut timely::worker::Worker, writer: W) -> Option> @@ -16,7 +19,7 @@ where let mut logger = ::timely::logging::BatchLogger::new(writer); worker .log_register() - .insert::("differential/arrange", move |time, data| logger.publish_batch(time, data)) + .insert::("differential/arrange", move |time, data| logger.publish_batch(time, data)) } /// Possible different differential events. diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 16d4c16be..93df7bc77 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -437,7 +437,7 @@ where // Acquire a logger for arrange events. let logger = { let register = scope.log_register(); - register.get::("differential/arrange") + register.get::("differential/arrange").map(Into::into) }; // Where we will deposit received updates, and from which we extract batches. diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index cfabfd472..62f9fe844 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -156,7 +156,7 @@ where let logger = { let scope = stream.scope(); let register = scope.log_register(); - register.get::("differential/arrange") + register.get::("differential/arrange").map(Into::into) }; // Tracks the lower envelope of times in `priority_queue`. diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 886c13d67..b7a61e89b 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -336,7 +336,7 @@ where let logger = { let scope = trace.stream.scope(); let register = scope.log_register(); - register.get::("differential/arrange") + register.get::("differential/arrange").map(Into::into) }; let activator = Some(trace.stream.scope().activator_for(operator_info.address.clone())); diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 7923ba3e7..5ffab9c64 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -12,13 +12,12 @@ use std::marker::PhantomData; -use timely::logging_core::Logger; use timely::progress::frontier::AntichainRef; use timely::progress::{frontier::Antichain, Timestamp}; use timely::Container; use timely::container::{ContainerBuilder, PushInto}; -use crate::logging::{BatcherEvent, DifferentialEvent}; +use crate::logging::{BatcherEvent, Logger}; use crate::trace::{Batcher, Builder, Description}; /// Creates batches from containers of unordered tuples. @@ -41,7 +40,7 @@ pub struct MergeBatcher { /// The lower-bound frontier of the data, after the last call to seal. frontier: Antichain, /// Logger for size accounting. - logger: Option>, + logger: Option, /// Timely operator ID. operator_id: usize, /// The `Input` type needs to be called out as the type of container accepted, but it is not otherwise present. @@ -58,7 +57,7 @@ where type Time = M::Time; type Output = M::Chunk; - fn new(logger: Option>, operator_id: usize) -> Self { + fn new(logger: Option, operator_id: usize) -> Self { Self { logger, operator_id, diff --git a/src/trace/mod.rs b/src/trace/mod.rs index e06183faf..e8c28a891 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -12,11 +12,10 @@ pub mod description; pub mod implementations; pub mod wrappers; -use timely::logging_core::Logger; use timely::progress::{Antichain, frontier::AntichainRef}; use timely::progress::Timestamp; -use crate::logging::DifferentialEvent; +use crate::logging::Logger; use crate::trace::cursor::IntoOwned; use crate::difference::Semigroup; use crate::lattice::Lattice; @@ -309,7 +308,7 @@ pub trait Batcher { /// Times at which batches are formed. type Time: Timestamp; /// Allocates a new empty batcher. - fn new(logger: Option>, operator_id: usize) -> Self; + fn new(logger: Option, operator_id: usize) -> Self; /// Adds an unordered container of elements to the batcher. fn push_container(&mut self, batch: &mut Self::Input); /// Returns all updates not greater or equal to an element of `upper`.