Skip to content

Commit

Permalink
Merge pull request #239 from dora-rs/hot-reloading
Browse files Browse the repository at this point in the history
Hot reloading Python Operator
  • Loading branch information
haixuanTao authored Mar 31, 2023
2 parents dfa914a + c2071e7 commit 4f4dfab
Show file tree
Hide file tree
Showing 17 changed files with 507 additions and 27 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ jobs:
dora-cli new test_project --internal-create-with-path-dependencies
cd test_project
cargo build --all
UUID=$(dora-cli start dataflow.yml)
dora-cli start dataflow.yml --name ci-test
sleep 10
dora-cli stop $UUID
dora-cli stop --name ci-test
cd ..
dora-cli destroy
Expand Down
83 changes: 83 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions apis/rust/node/src/daemon_connection/event_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ impl EventStream {
let event = match event {
EventItem::NodeEvent { event, ack_channel } => match event {
NodeEvent::Stop => Event::Stop,
NodeEvent::Reload { operator_id } => Event::Reload { operator_id },
NodeEvent::InputClosed { id } => Event::InputClosed { id },
NodeEvent::Input { id, metadata, data } => {
let data = match data {
Expand Down
8 changes: 7 additions & 1 deletion apis/rust/node/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use std::marker::PhantomData;

use dora_core::{config::DataId, message::Metadata};
use dora_core::{
config::{DataId, OperatorId},
message::Metadata,
};
use eyre::Context;
use shared_memory::{Shmem, ShmemConf};

#[derive(Debug)]
#[non_exhaustive]
pub enum Event<'a> {
Stop,
Reload {
operator_id: Option<OperatorId>,
},
Input {
id: DataId,
metadata: Metadata<'static>,
Expand Down
8 changes: 8 additions & 0 deletions binaries/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ license.workspace = true
name = "dora-cli"
path = "src/main.rs"

[features]
default = ["tracing"]
tracing = ["dep:dora-tracing"]

[dependencies]
clap = { version = "4.0.3", features = ["derive"] }
eyre = "0.6.8"
Expand All @@ -27,3 +31,7 @@ atty = "0.2.14"
uuid = { version = "1.2.1", features = ["v4", "serde"] }
inquire = "0.5.2"
communication-layer-request-reply = { workspace = true }
notify = "5.1.0"
ctrlc = "3.2.5"
tracing = "0.1.36"
dora-tracing = { workspace = true, optional = true }
145 changes: 145 additions & 0 deletions binaries/cli/src/attach.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use communication_layer_request_reply::TcpRequestReplyConnection;
use dora_core::{
descriptor::{resolve_path, CoreNodeKind, Descriptor},
topics::{ControlRequest, ControlRequestReply},
};
use eyre::Context;
use notify::event::ModifyKind;
use notify::{Config, Event as NotifyEvent, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use std::collections::HashMap;
use std::{path::PathBuf, sync::mpsc, time::Duration};
use tracing::{error, info};
use uuid::Uuid;

use crate::control_connection;

pub fn attach_dataflow(
dataflow: Descriptor,
dataflow_path: PathBuf,
dataflow_id: Uuid,
session: &mut Option<Box<TcpRequestReplyConnection>>,
hot_reload: bool,
) -> Result<(), eyre::ErrReport> {
let (tx, rx) = mpsc::sync_channel(2);

// Generate path hashmap
let mut node_path_lookup = HashMap::new();

let nodes = dataflow.resolve_aliases();

let working_dir = dataflow_path
.canonicalize()
.context("failed to canoncialize dataflow path")?
.parent()
.ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))?
.to_owned();

for node in nodes {
match node.kind {
// Reloading Custom Nodes is not supported. See: https://github.com/dora-rs/dora/pull/239#discussion_r1154313139
CoreNodeKind::Custom(_cn) => (),
CoreNodeKind::Runtime(rn) => {
for op in rn.operators.iter() {
if let dora_core::descriptor::OperatorSource::Python(source) = &op.config.source
{
let path = resolve_path(source, &working_dir).wrap_err_with(|| {
format!("failed to resolve node source `{}`", source)
})?;
node_path_lookup
.insert(path, (dataflow_id, node.id.clone(), Some(op.id.clone())));
}
// Reloading non-python operator is not supported. See: https://github.com/dora-rs/dora/pull/239#discussion_r1154313139
}
}
}
}

// Setup dataflow file watcher if reload option is set.
let watcher_tx = tx.clone();
let _watcher = if hot_reload {
let hash = node_path_lookup.clone();
let paths = hash.keys();
let notifier = move |event| {
if let Ok(NotifyEvent {
paths,
kind: EventKind::Modify(ModifyKind::Data(_data)),
..
}) = event
{
for path in paths {
if let Some((dataflow_id, node_id, operator_id)) = node_path_lookup.get(&path) {
watcher_tx
.send(ControlRequest::Reload {
dataflow_id: dataflow_id.clone(),
node_id: node_id.clone(),
operator_id: operator_id.clone(),
})
.context("Could not send reload request to the cli loop")
.unwrap();
}
}
// TODO: Manage different file event
}
};

let mut watcher = RecommendedWatcher::new(
notifier,
Config::default().with_poll_interval(Duration::from_secs(1)),
)?;

for path in paths {
watcher.watch(path, RecursiveMode::Recursive)?;
}
Some(watcher)
} else {
None
};

// Setup Ctrlc Watcher to stop dataflow after ctrlc
let ctrlc_tx = tx;
let mut ctrlc_sent = false;
ctrlc::set_handler(move || {
if ctrlc_sent {
std::process::abort();
} else {
if ctrlc_tx
.send(ControlRequest::Stop {
dataflow_uuid: dataflow_id,
})
.is_err()
{
// bail!("failed to report ctrl-c event to dora-daemon");
}
ctrlc_sent = true;
}
})
.wrap_err("failed to set ctrl-c handler")?;

loop {
let control_request = match rx.recv_timeout(Duration::from_secs(1)) {
Err(_err) => ControlRequest::Check {
dataflow_uuid: dataflow_id,
},
Ok(reload_event) => reload_event,
};

let reply_raw = control_connection(session)?
.request(&serde_json::to_vec(&control_request)?)
.wrap_err("failed to send request message to coordinator")?;
let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStarted { uuid: _ } => (),
ControlRequestReply::DataflowStopped { uuid } => {
info!("dataflow {uuid} stopped");
break;
}
ControlRequestReply::DataflowReloaded { uuid } => {
info!("dataflow {uuid} reloaded")
}
other => error!("Received unexpected Coordinator Reply: {:#?}", other),
};
}

Ok(())
}
Loading

0 comments on commit 4f4dfab

Please sign in to comment.