Skip to content

Commit

Permalink
Incorporate breaking changes from Timely's logging update (#558)
Browse files Browse the repository at this point in the history
* Incorporate breaking changes from Timely's logging update

TimelyDataflow/timely-dataflow#615

Signed-off-by: Moritz Hoffmann <[email protected]>

* Back out of some changes

Signed-off-by: Moritz Hoffmann <[email protected]>

* Upgrade timely

Signed-off-by: Moritz Hoffmann <[email protected]>

* Fix compile errors

Signed-off-by: Moritz Hoffmann <[email protected]>

---------

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru authored Jan 9, 2025
1 parent c3871fb commit 17a053b
Show file tree
Hide file tree
Showing 10 changed files with 23 additions and 22 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ timely = {workspace = true}
columnar = "0.2"

[workspace.dependencies]
timely = { version = "0.15", default-features = false }
timely = { version = "0.16", default-features = false }
#timely = { path = "../timely-dataflow/timely/", default-features = false }

[features]
Expand Down
4 changes: 2 additions & 2 deletions examples/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use differential_dataflow::input::Input;
use differential_dataflow::Collection;
use differential_dataflow::operators::*;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::logging::DifferentialEvent;
use differential_dataflow::logging::DifferentialEventBuilder;

type Node = u32;
type Edge = (Node, Node);
Expand All @@ -30,7 +30,7 @@ fn main() {
if let Ok(stream) = ::std::net::TcpStream::connect(&addr) {
let writer = ::timely::dataflow::operators::capture::EventWriter::new(stream);
let mut logger = ::timely::logging::BatchLogger::new(writer);
worker.log_register().insert::<DifferentialEvent,_>("differential/arrange", move |time, data|
worker.log_register().insert::<DifferentialEventBuilder,_>("differential/arrange", move |time, data|
logger.publish_batch(time, data)
);
}
Expand Down
4 changes: 2 additions & 2 deletions examples/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use differential_dataflow::input::Input;
use differential_dataflow::Collection;
use differential_dataflow::operators::*;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::logging::DifferentialEvent;
use differential_dataflow::logging::DifferentialEventBuilder;

type Node = u32;
type Edge = (Node, Node);
Expand All @@ -30,7 +30,7 @@ fn main() {
if let Ok(stream) = ::std::net::TcpStream::connect(&addr) {
let writer = ::timely::dataflow::operators::capture::EventWriter::new(stream);
let mut logger = ::timely::logging::BatchLogger::new(writer);
worker.log_register().insert::<DifferentialEvent,_>("differential/arrange", move |time, data|
worker.log_register().insert::<DifferentialEventBuilder,_>("differential/arrange", move |time, data|
logger.publish_batch(time, data)
);
}
Expand Down
10 changes: 5 additions & 5 deletions interactive/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::hash::Hash;
use timely::dataflow::ProbeHandle;
use timely::communication::Allocate;
use timely::worker::Worker;
use timely::logging::TimelyEvent;
use timely::logging::TimelyEventBuilder;

// use timely::dataflow::operators::capture::event::EventIterator;

Expand All @@ -16,7 +16,7 @@ use differential_dataflow::trace::implementations::{KeySpine, ValSpine};
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::input::InputSession;

use differential_dataflow::logging::DifferentialEvent;
use differential_dataflow::logging::DifferentialEventBuilder;

use crate::{Time, Diff, Plan, Datum};

Expand Down Expand Up @@ -87,11 +87,11 @@ impl<V: ExchangeData+Datum> Manager<V>
// Deregister loggers, so that the logging dataflows can shut down.
worker
.log_register()
.insert::<TimelyEvent,_>("timely", move |_time, _data| { });
.insert::<TimelyEventBuilder,_>("timely", move |_time, _data| { });

worker
.log_register()
.insert::<DifferentialEvent,_>("differential/arrange", move |_time, _data| { });
.insert::<DifferentialEventBuilder,_>("differential/arrange", move |_time, _data| { });
}

/// Inserts a new input session by name.
Expand Down Expand Up @@ -226,4 +226,4 @@ impl<V: ExchangeData+Hash+Datum> TraceManager<V> {
.insert(keys.to_vec(), handle.clone());
}

}
}
7 changes: 5 additions & 2 deletions src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
use columnar::Columnar;
use serde::{Deserialize, Serialize};

/// Container builder for differential log events.
pub type DifferentialEventBuilder = timely::container::CapacityContainerBuilder<Vec<(std::time::Duration, DifferentialEvent)>>;

/// Logger for differential dataflow events.
pub type Logger = ::timely::logging::Logger<DifferentialEvent>;
pub type Logger = ::timely::logging_core::TypedLogger<DifferentialEventBuilder, DifferentialEvent>;

/// Enables logging of differential dataflow events.
pub fn enable<A, W>(worker: &mut timely::worker::Worker<A>, writer: W) -> Option<Box<dyn std::any::Any+'static>>
Expand All @@ -16,7 +19,7 @@ where
let mut logger = ::timely::logging::BatchLogger::new(writer);
worker
.log_register()
.insert::<DifferentialEvent,_>("differential/arrange", move |time, data| logger.publish_batch(time, data))
.insert::<DifferentialEventBuilder,_>("differential/arrange", move |time, data| logger.publish_batch(time, data))
}

/// Possible different differential events.
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ where
// Acquire a logger for arrange events.
let logger = {
let register = scope.log_register();
register.get::<crate::logging::DifferentialEvent>("differential/arrange")
register.get::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into)
};

// Where we will deposit received updates, and from which we extract batches.
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ where
let logger = {
let scope = stream.scope();
let register = scope.log_register();
register.get::<crate::logging::DifferentialEvent>("differential/arrange")
register.get::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into)
};

// Tracks the lower envelope of times in `priority_queue`.
Expand Down
2 changes: 1 addition & 1 deletion src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ where
let logger = {
let scope = trace.stream.scope();
let register = scope.log_register();
register.get::<crate::logging::DifferentialEvent>("differential/arrange")
register.get::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into)
};

let activator = Some(trace.stream.scope().activator_for(operator_info.address.clone()));
Expand Down
7 changes: 3 additions & 4 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@
use std::marker::PhantomData;

use timely::logging_core::Logger;
use timely::progress::frontier::AntichainRef;
use timely::progress::{frontier::Antichain, Timestamp};
use timely::Container;
use timely::container::{ContainerBuilder, PushInto};

use crate::logging::{BatcherEvent, DifferentialEvent};
use crate::logging::{BatcherEvent, Logger};
use crate::trace::{Batcher, Builder, Description};

/// Creates batches from containers of unordered tuples.
Expand All @@ -41,7 +40,7 @@ pub struct MergeBatcher<Input, C, M: Merger> {
/// The lower-bound frontier of the data, after the last call to seal.
frontier: Antichain<M::Time>,
/// Logger for size accounting.
logger: Option<Logger<DifferentialEvent>>,
logger: Option<Logger>,
/// Timely operator ID.
operator_id: usize,
/// The `Input` type needs to be called out as the type of container accepted, but it is not otherwise present.
Expand All @@ -58,7 +57,7 @@ where
type Time = M::Time;
type Output = M::Chunk;

fn new(logger: Option<Logger<DifferentialEvent>>, operator_id: usize) -> Self {
fn new(logger: Option<Logger>, operator_id: usize) -> Self {
Self {
logger,
operator_id,
Expand Down
5 changes: 2 additions & 3 deletions src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ pub mod description;
pub mod implementations;
pub mod wrappers;

use timely::logging_core::Logger;
use timely::progress::{Antichain, frontier::AntichainRef};
use timely::progress::Timestamp;

use crate::logging::DifferentialEvent;
use crate::logging::Logger;
use crate::trace::cursor::IntoOwned;
use crate::difference::Semigroup;
use crate::lattice::Lattice;
Expand Down Expand Up @@ -309,7 +308,7 @@ pub trait Batcher {
/// Times at which batches are formed.
type Time: Timestamp;
/// Allocates a new empty batcher.
fn new(logger: Option<Logger<DifferentialEvent>>, operator_id: usize) -> Self;
fn new(logger: Option<Logger>, operator_id: usize) -> Self;
/// Adds an unordered container of elements to the batcher.
fn push_container(&mut self, batch: &mut Self::Input);
/// Returns all updates not greater or equal to an element of `upper`.
Expand Down

0 comments on commit 17a053b

Please sign in to comment.