Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit ddc8d70

Browse files
rphmeierpepyakin
authored andcommitted
Collator node workflow (#280)
* arbitrary application logic in CLI * collation work * split up exit and work futures in application * collation node workflow * typo * indentation fix * doc grumbles * rename Application to Worker * refactor Worker::exit to exit_only
1 parent 6f7f8bf commit ddc8d70

File tree

15 files changed

+293
-96
lines changed

15 files changed

+293
-96
lines changed

Cargo.lock

Lines changed: 7 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ authors = ["Parity Technologies <[email protected]>"]
1010
[dependencies]
1111
error-chain = "0.12"
1212
polkadot-cli = { path = "polkadot/cli" }
13+
futures = "0.1"
14+
ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" }
1315

1416
[workspace]
1517
members = [

polkadot/cli/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ ed25519 = { path = "../../substrate/ed25519" }
2020
app_dirs = "1.2"
2121
tokio = "0.1.7"
2222
futures = "0.1.17"
23-
ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" }
2423
fdlimit = "0.1"
2524
parking_lot = "0.4"
2625
serde_json = "1.0"

polkadot/cli/src/lib.rs

Lines changed: 60 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,9 @@ extern crate atty;
2424
extern crate ansi_term;
2525
extern crate regex;
2626
extern crate time;
27+
extern crate fdlimit;
2728
extern crate futures;
2829
extern crate tokio;
29-
extern crate ctrlc;
30-
extern crate fdlimit;
3130
extern crate ed25519;
3231
extern crate triehash;
3332
extern crate parking_lot;
@@ -66,6 +65,11 @@ mod informant;
6665
mod chain_spec;
6766

6867
pub use chain_spec::ChainSpec;
68+
pub use client::error::Error as ClientError;
69+
pub use client::backend::Backend as ClientBackend;
70+
pub use state_machine::Backend as StateMachineBackend;
71+
pub use polkadot_primitives::Block as PolkadotBlock;
72+
pub use service::{Components as ServiceComponents, Service};
6973

7074
use std::io::{self, Write, Read, stdin, stdout};
7175
use std::fs::File;
@@ -117,6 +121,26 @@ fn base_path(matches: &clap::ArgMatches) -> PathBuf {
117121
.unwrap_or_else(default_base_path)
118122
}
119123

124+
/// Additional worker making use of the node, to run asynchronously before shutdown.
125+
///
126+
/// This will be invoked with the service and spawn a future that resolves
127+
/// when complete.
128+
pub trait Worker {
129+
/// A future that resolves when the work is done or the node should exit.
130+
/// This will be run on a tokio runtime.
131+
type Work: Future<Item=(),Error=()>;
132+
133+
/// An exit scheduled for the future.
134+
type Exit: Future<Item=(),Error=()> + Send + 'static;
135+
136+
/// Don't work, but schedule an exit.
137+
fn exit_only(self) -> Self::Exit;
138+
139+
/// Do work and schedule exit.
140+
fn work<C: ServiceComponents>(self, service: &Service<C>) -> Self::Work
141+
where ClientError: From<<<<C as ServiceComponents>::Backend as ClientBackend<PolkadotBlock>>::State as StateMachineBackend>::Error>;
142+
}
143+
120144
/// Parse command line arguments and start the node.
121145
///
122146
/// IANA unassigned port ranges that we could use:
@@ -125,9 +149,10 @@ fn base_path(matches: &clap::ArgMatches) -> PathBuf {
125149
/// 9556-9591 Unassigned
126150
/// 9803-9874 Unassigned
127151
/// 9926-9949 Unassigned
128-
pub fn run<I, T>(args: I) -> error::Result<()> where
152+
pub fn run<I, T, W>(args: I, worker: W) -> error::Result<()> where
129153
I: IntoIterator<Item = T>,
130154
T: Into<std::ffi::OsString> + Clone,
155+
W: Worker,
131156
{
132157
let yaml = load_yaml!("./cli.yml");
133158
let matches = match clap::App::from_yaml(yaml).version(&(crate_version!().to_owned() + "\n")[..]).get_matches_from_safe(args) {
@@ -154,11 +179,11 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
154179
}
155180

156181
if let Some(matches) = matches.subcommand_matches("export-blocks") {
157-
return export_blocks(matches);
182+
return export_blocks(matches, worker.exit_only());
158183
}
159184

160185
if let Some(matches) = matches.subcommand_matches("import-blocks") {
161-
return import_blocks(matches);
186+
return import_blocks(matches, worker.exit_only());
162187
}
163188

164189
let spec = load_spec(&matches)?;
@@ -255,8 +280,8 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
255280
};
256281

257282
match role == service::Role::LIGHT {
258-
true => run_until_exit(&mut runtime, service::new_light(config, executor)?, &matches, sys_conf)?,
259-
false => run_until_exit(&mut runtime, service::new_full(config, executor)?, &matches, sys_conf)?,
283+
true => run_until_exit(&mut runtime, service::new_light(config, executor)?, &matches, sys_conf, worker)?,
284+
false => run_until_exit(&mut runtime, service::new_full(config, executor)?, &matches, sys_conf, worker)?,
260285
}
261286

262287
// TODO: hard exit if this stalls?
@@ -272,16 +297,19 @@ fn build_spec(matches: &clap::ArgMatches) -> error::Result<()> {
272297
Ok(())
273298
}
274299

275-
fn export_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
300+
fn export_blocks<E>(matches: &clap::ArgMatches, exit: E) -> error::Result<()>
301+
where E: Future<Item=(),Error=()> + Send + 'static
302+
{
276303
let base_path = base_path(matches);
277304
let spec = load_spec(&matches)?;
278305
let mut config = service::Configuration::default_with_spec(spec);
279306
config.database_path = db_path(&base_path).to_string_lossy().into();
280307
info!("DB path: {}", config.database_path);
281308
let client = service::new_client(config)?;
282-
let (exit_send, exit) = std::sync::mpsc::channel();
283-
ctrlc::CtrlC::set_handler(move || {
284-
exit_send.clone().send(()).expect("Error sending exit notification");
309+
let (exit_send, exit_recv) = std::sync::mpsc::channel();
310+
::std::thread::spawn(move || {
311+
let _ = exit.wait();
312+
let _ = exit_send.send(());
285313
});
286314
info!("Exporting blocks");
287315
let mut block: u32 = match matches.value_of("from") {
@@ -310,7 +338,7 @@ fn export_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
310338
}
311339

312340
loop {
313-
if exit.try_recv().is_ok() {
341+
if exit_recv.try_recv().is_ok() {
314342
break;
315343
}
316344
match client.block(&BlockId::number(block as u64))? {
@@ -334,15 +362,19 @@ fn export_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
334362
Ok(())
335363
}
336364

337-
fn import_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
365+
fn import_blocks<E>(matches: &clap::ArgMatches, exit: E) -> error::Result<()>
366+
where E: Future<Item=(),Error=()> + Send + 'static
367+
{
338368
let spec = load_spec(&matches)?;
339369
let base_path = base_path(matches);
340370
let mut config = service::Configuration::default_with_spec(spec);
341371
config.database_path = db_path(&base_path).to_string_lossy().into();
342372
let client = service::new_client(config)?;
343-
let (exit_send, exit) = std::sync::mpsc::channel();
344-
ctrlc::CtrlC::set_handler(move || {
345-
exit_send.clone().send(()).expect("Error sending exit notification");
373+
let (exit_send, exit_recv) = std::sync::mpsc::channel();
374+
375+
::std::thread::spawn(move || {
376+
let _ = exit.wait();
377+
let _ = exit_send.send(());
346378
});
347379

348380
let mut file: Box<Read> = match matches.value_of("INPUT") {
@@ -354,7 +386,7 @@ fn import_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
354386
let count: u32 = Slicable::decode(&mut file).ok_or("Error reading file")?;
355387
let mut block = 0;
356388
for _ in 0 .. count {
357-
if exit.try_recv().is_ok() {
389+
if exit_recv.try_recv().is_ok() {
358390
break;
359391
}
360392
match SignedBlock::decode(&mut file) {
@@ -377,27 +409,19 @@ fn import_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
377409
Ok(())
378410
}
379411

380-
fn run_until_exit<C>(runtime: &mut Runtime, service: service::Service<C>, matches: &clap::ArgMatches, sys_conf: SystemConfiguration) -> error::Result<()>
412+
fn run_until_exit<C, W>(
413+
runtime: &mut Runtime,
414+
service: service::Service<C>,
415+
matches: &clap::ArgMatches,
416+
sys_conf: SystemConfiguration,
417+
worker: W,
418+
) -> error::Result<()>
381419
where
382420
C: service::Components,
421+
W: Worker,
383422
client::error::Error: From<<<<C as service::Components>::Backend as client::backend::Backend<Block>>::State as state_machine::Backend>::Error>,
384423
{
385-
let exit = {
386-
let (exit_send, exit) = exit_future::signal();
387-
let exit_send = ::std::cell::RefCell::new(Some(exit_send));
388-
ctrlc::CtrlC::set_handler(move || {
389-
let exit_send = exit_send
390-
.try_borrow_mut()
391-
.expect("only borrowed in non-reetrant signal handler; qed")
392-
.take();
393-
394-
if let Some(signal) = exit_send {
395-
signal.fire();
396-
}
397-
});
398-
399-
exit
400-
};
424+
let (exit_send, exit) = exit_future::signal();
401425

402426
let executor = runtime.executor();
403427
informant::start(&service, exit.clone(), executor.clone());
@@ -422,7 +446,8 @@ fn run_until_exit<C>(runtime: &mut Runtime, service: service::Service<C>, matche
422446
)
423447
};
424448

425-
let _ = exit.wait();
449+
let _ = worker.work(&service).wait();
450+
exit_send.fire();
426451
Ok(())
427452
}
428453

polkadot/collator/Cargo.toml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,16 @@
22
name = "polkadot-collator"
33
version = "0.1.0"
44
authors = ["Parity Technologies <[email protected]>"]
5-
description = "Abstract collation logic"
5+
description = "Collator node implementation"
66

77
[dependencies]
88
futures = "0.1.17"
9+
substrate-client = { path = "../../substrate/client" }
910
substrate-codec = { path = "../../substrate/codec", version = "0.1" }
1011
substrate-primitives = { path = "../../substrate/primitives", version = "0.1" }
12+
polkadot-api = { path = "../api" }
1113
polkadot-runtime = { path = "../runtime", version = "0.1" }
1214
polkadot-primitives = { path = "../primitives", version = "0.1" }
13-
polkadot-parachain = { path = "../parachain", version = "0.1" }
15+
polkadot-cli = { path = "../cli" }
16+
log = "0.4"
17+
ed25519 = { path = "../../substrate/ed25519" }

0 commit comments

Comments
 (0)