diff --git a/Cargo.lock b/Cargo.lock index e4dae8c42..d42b8bc8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5930,6 +5930,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b838eba278d213a8beaf485bd313fd580ca4505a00d5871caeb1457c55322cae" dependencies = [ "bitflags 2.5.0", + "chrono", "fallible-iterator", "fallible-streaming-iterator", "hashlink", @@ -7137,6 +7138,7 @@ name = "task_manager" version = "0.1.0" dependencies = [ "alloy-primitives", + "chrono", "num_enum 0.7.2", "raiko-primitives", "rand 0.9.0-alpha.1", diff --git a/task_manager/Cargo.toml b/task_manager/Cargo.toml index 2517e1783..e3052a5e9 100644 --- a/task_manager/Cargo.toml +++ b/task_manager/Cargo.toml @@ -6,8 +6,9 @@ edition = "2021" # { workspace = true } [dependencies] raiko-primitives = { workspace = true } -rusqlite = { workspace = true } +rusqlite = { workspace = true, features = ["chrono"] } num_enum = { workspace = true } +chrono = { workspace = true } [dev-dependencies] rand = "0.9.0-alpha.1" # This is an alpha version, that has rng.gen_iter::() diff --git a/task_manager/src/lib.rs b/task_manager/src/lib.rs index 33746c686..ccd511621 100644 --- a/task_manager/src/lib.rs +++ b/task_manager/src/lib.rs @@ -160,9 +160,10 @@ use std::path::Path; use raiko_primitives::{BlockNumber, ChainId, B256}; -use rusqlite::{named_params, Statement}; +use rusqlite::{named_params, Statement, MappedRows}; use rusqlite::{Connection, OpenFlags}; +use chrono::{DateTime, Utc}; use num_enum::{IntoPrimitive, FromPrimitive}; // Types @@ -382,7 +383,7 @@ impl TaskDb { id_task INTEGER NOT NULL, id_thirdparty INTEGER, id_status INTEGER NOT NULL, - timestamp TEXT NOT NULL, + timestamp TIMESTAMP DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) NOT NULL, FOREIGN KEY(id_task) REFERENCES tasks(id_task) FOREIGN KEY(id_thirdparty) REFERENCES thirdparties(id_thirdparty) FOREIGN KEY(id_status) REFERENCES status_codes(id_status) @@ -516,8 +517,9 @@ impl TaskDb { VALUES (new.submitter); -- Tasks are initialized at status 1000 - registered - INSERT INTO task_status(id_task, id_thirdparty, id_status, timestamp) - SELECT tmp.id_task, t3p.id_thirdparty, 1000, datetime('now') + -- timestamp is auto-filled with datetime('now'), see its field definition + INSERT INTO task_status(id_task, id_thirdparty, id_status) + SELECT tmp.id_task, t3p.id_thirdparty, 1000 FROM current_task tmp JOIN thirdparties t3p WHERE t3p.thirdparty_desc = new.submitter @@ -543,8 +545,9 @@ impl TaskDb { INSERT OR IGNORE INTO thirdparties(thirdparty_desc) VALUES (new.fulfiller); - INSERT INTO task_status(id_task, id_thirdparty, id_status, timestamp) - SELECT tmp.id_task, t3p.id_thirdparty, new.id_status, datetime('now') + -- timestamp is auto-filled with datetime('now'), see its field definition + INSERT INTO task_status(id_task, id_thirdparty, id_status) + SELECT tmp.id_task, t3p.id_thirdparty, new.id_status FROM current_task tmp LEFT JOIN thirdparties t3p -- fulfiller can be NULL, for example @@ -721,17 +724,23 @@ impl<'db> TaskManager<'db> { Ok(()) } + /// Returns the latest triplet (submitter or fulfiller, status, last update time) pub fn get_task_proving_status( &mut self, chain_id: ChainId, blockhash: &B256, proof_system: TaskProofsys, - ) -> Result { - let proving_status = self.get_task_proving_status.query_row(named_params! { + ) -> Result, TaskStatus, DateTime)>, TaskManagerError> { + let rows = self.get_task_proving_status.query_map(named_params! { ":chain_id": chain_id as u64, ":blockhash": blockhash.as_slice(), ":id_proofsys": proof_system as u8, - }, |r| r.get::<_, i32>(0).map(TaskStatus::from))?; + }, |row| Ok(( + row.get::<_, Option>(0)?, + TaskStatus::from(row.get::<_, i32>(1)?), + row.get::<_, DateTime>(2)?, + )))?; + let proving_status = rows.collect::, _>>()?; Ok(proving_status) } diff --git a/task_manager/tests/main.rs b/task_manager/tests/main.rs index 2da68b46a..b0adf34c6 100644 --- a/task_manager/tests/main.rs +++ b/task_manager/tests/main.rs @@ -7,6 +7,8 @@ #[cfg(test)] mod tests { + use std::time::Duration; + use rand::{Rng, SeedableRng}; use rand_chacha::ChaCha8Rng; @@ -164,112 +166,237 @@ mod tests { &payload, ).unwrap(); - assert_eq!( - TaskStatus::Registered, - tama.get_task_proving_status(chain_id, &blockhash, proofsys).unwrap() - ); + let task_status = tama.get_task_proving_status(chain_id, &blockhash, proofsys).unwrap(); + assert_eq!(task_status.len(), 1); + assert_eq!(task_status[0].0, Some(submitter.clone())); + assert_eq!(task_status[0].1, TaskStatus::Registered); tasks.push(( chain_id, blockhash, proofsys, + submitter, )); } - tama.update_task_progress( - tasks[0].0, - &tasks[0].1, - tasks[0].2, - None, - TaskStatus::Cancelled_NeverStarted, - None - ).unwrap(); + std::thread::sleep(Duration::from_millis(1)); - assert_eq!( - TaskStatus::Cancelled_NeverStarted, - tama.get_task_proving_status(tasks[0].0, &tasks[0].1, tasks[0].2).unwrap() - ); + { + tama.update_task_progress( + tasks[0].0, + &tasks[0].1, + tasks[0].2, + None, + TaskStatus::Cancelled_NeverStarted, + None + ).unwrap(); + { + let task_status = tama.get_task_proving_status(tasks[0].0, &tasks[0].1, tasks[0].2).unwrap(); + assert_eq!(task_status.len(), 2); + assert_eq!(task_status[0].0, None); + assert_eq!(task_status[0].1, TaskStatus::Cancelled_NeverStarted); + assert_eq!(task_status[1].0, Some(tasks[0].3.clone())); + assert_eq!(task_status[1].1, TaskStatus::Registered); + } + } // ----------------------- + { + tama.update_task_progress( + tasks[1].0, + &tasks[1].1, + tasks[1].2, + Some("A prover Network"), + TaskStatus::WorkInProgress, + None + ).unwrap(); - tama.update_task_progress( - tasks[1].0, - &tasks[1].1, - tasks[1].2, - Some("A prover Network"), - TaskStatus::WorkInProgress, - None - ).unwrap(); + { + let task_status = tama.get_task_proving_status(tasks[1].0, &tasks[1].1, tasks[1].2).unwrap(); + assert_eq!(task_status.len(), 2); + assert_eq!(task_status[0].0, Some(String::from("A prover Network"))); + assert_eq!(task_status[0].1, TaskStatus::WorkInProgress); + assert_eq!(task_status[1].0, Some(tasks[1].3.clone())); + assert_eq!(task_status[1].1, TaskStatus::Registered); + } + + std::thread::sleep(Duration::from_millis(1)); + + tama.update_task_progress( + tasks[1].0, + &tasks[1].1, + tasks[1].2, + Some("A prover Network"), + TaskStatus::CancellationInProgress, + None + ).unwrap(); - assert_eq!( - TaskStatus::WorkInProgress, - tama.get_task_proving_status(tasks[1].0, &tasks[1].1, tasks[1].2).unwrap() - ); - - tama.update_task_progress( - tasks[1].0, - &tasks[1].1, - tasks[1].2, - Some("A prover Network"), - TaskStatus::CancellationInProgress, - None - ).unwrap(); + { + let task_status = tama.get_task_proving_status(tasks[1].0, &tasks[1].1, tasks[1].2).unwrap(); + assert_eq!(task_status.len(), 2); + assert_eq!(task_status[0].0, Some(String::from("A prover Network"))); + assert_eq!(task_status[0].1, TaskStatus::CancellationInProgress); + assert_eq!(task_status[1].0, Some(tasks[1].3.clone())); + assert_eq!(task_status[1].1, TaskStatus::Registered); + } + + std::thread::sleep(Duration::from_millis(1)); + + tama.update_task_progress( + tasks[1].0, + &tasks[1].1, + tasks[1].2, + Some("A prover Network"), + TaskStatus::Cancelled, + None + ).unwrap(); - assert_eq!( - TaskStatus::CancellationInProgress, - tama.get_task_proving_status(tasks[1].0, &tasks[1].1, tasks[1].2).unwrap() - ); - - tama.update_task_progress( - tasks[1].0, - &tasks[1].1, - tasks[1].2, - Some("A prover Network"), - TaskStatus::Cancelled, - None - ).unwrap(); + { + let task_status = tama.get_task_proving_status(tasks[1].0, &tasks[1].1, tasks[1].2).unwrap(); + assert_eq!(task_status.len(), 2); + assert_eq!(task_status[0].0, Some(String::from("A prover Network"))); + assert_eq!(task_status[0].1, TaskStatus::Cancelled); + assert_eq!(task_status[1].0, Some(tasks[1].3.clone())); + assert_eq!(task_status[1].1, TaskStatus::Registered); + } + } + + // ----------------------- + { + tama.update_task_progress( + tasks[2].0, + &tasks[2].1, + tasks[2].2, + Some("A based prover"), + TaskStatus::WorkInProgress, + None + ).unwrap(); - assert_eq!( - TaskStatus::Cancelled, - tama.get_task_proving_status(tasks[1].0, &tasks[1].1, tasks[1].2).unwrap() - ); + { + let task_status = tama.get_task_proving_status(tasks[2].0, &tasks[2].1, tasks[2].2).unwrap(); + assert_eq!(task_status.len(), 2); + assert_eq!(task_status[0].0, Some(String::from("A based prover"))); + assert_eq!(task_status[0].1, TaskStatus::WorkInProgress); + assert_eq!(task_status[1].0, Some(tasks[2].3.clone())); + assert_eq!(task_status[1].1, TaskStatus::Registered); + } + + std::thread::sleep(Duration::from_millis(1)); + + let proof: Vec<_> = (&mut rng).gen_iter::().take(128).collect(); + tama.update_task_progress( + tasks[2].0, + &tasks[2].1, + tasks[2].2, + Some("A based prover"), + TaskStatus::Success, + Some(&proof) + ).unwrap(); + + { + let task_status = tama.get_task_proving_status(tasks[2].0, &tasks[2].1, tasks[2].2).unwrap(); + assert_eq!(task_status.len(), 2); + assert_eq!(task_status[0].0, Some(String::from("A based prover"))); + assert_eq!(task_status[0].1, TaskStatus::Success); + assert_eq!(task_status[1].0, Some(tasks[2].3.clone())); + assert_eq!(task_status[1].1, TaskStatus::Registered); + } + + assert_eq!( + proof, + tama.get_task_proof(tasks[2].0, &tasks[2].1, tasks[2].2).unwrap() + ); + } // ----------------------- + { + tama.update_task_progress( + tasks[3].0, + &tasks[3].1, + tasks[3].2, + Some("A flaky prover"), + TaskStatus::WorkInProgress, + None + ).unwrap(); - tama.update_task_progress( - tasks[2].0, - &tasks[2].1, - tasks[2].2, - Some("A based prover"), - TaskStatus::WorkInProgress, - None - ).unwrap(); + { + let task_status = tama.get_task_proving_status(tasks[3].0, &tasks[3].1, tasks[3].2).unwrap(); + assert_eq!(task_status.len(), 2); + assert_eq!(task_status[0].0, Some(String::from("A flaky prover"))); + assert_eq!(task_status[0].1, TaskStatus::WorkInProgress); + assert_eq!(task_status[1].0, Some(tasks[3].3.clone())); + assert_eq!(task_status[1].1, TaskStatus::Registered); + } + + std::thread::sleep(Duration::from_millis(1)); + + tama.update_task_progress( + tasks[3].0, + &tasks[3].1, + tasks[3].2, + Some("A flaky prover"), + TaskStatus::NetworkFailure, + None + ).unwrap(); - assert_eq!( - TaskStatus::WorkInProgress, - tama.get_task_proving_status(tasks[2].0, &tasks[2].1, tasks[2].2).unwrap() - ); - - let proof: Vec<_> = (&mut rng).gen_iter::().take(128).collect(); - tama.update_task_progress( - tasks[2].0, - &tasks[2].1, - tasks[2].2, - Some("A based prover"), - TaskStatus::Success, - Some(&proof) - ).unwrap(); + { + let task_status = tama.get_task_proving_status(tasks[3].0, &tasks[3].1, tasks[3].2).unwrap(); + assert_eq!(task_status.len(), 2); + assert_eq!(task_status[0].0, Some(String::from("A flaky prover"))); + assert_eq!(task_status[0].1, TaskStatus::NetworkFailure); + assert_eq!(task_status[1].0, Some(tasks[3].3.clone())); + assert_eq!(task_status[1].1, TaskStatus::Registered); + } + + std::thread::sleep(Duration::from_millis(1)); + + tama.update_task_progress( + tasks[3].0, + &tasks[3].1, + tasks[3].2, + Some("A based prover"), + TaskStatus::WorkInProgress, + None + ).unwrap(); - assert_eq!( - TaskStatus::Success, - tama.get_task_proving_status(tasks[2].0, &tasks[2].1, tasks[2].2).unwrap() - ); + { + let task_status = tama.get_task_proving_status(tasks[3].0, &tasks[3].1, tasks[3].2).unwrap(); + assert_eq!(task_status.len(), 3); + assert_eq!(task_status[0].0, Some(String::from("A based prover"))); + assert_eq!(task_status[0].1, TaskStatus::WorkInProgress); + assert_eq!(task_status[1].0, Some(String::from("A flaky prover"))); + assert_eq!(task_status[1].1, TaskStatus::NetworkFailure); + assert_eq!(task_status[2].0, Some(tasks[3].3.clone())); + assert_eq!(task_status[2].1, TaskStatus::Registered); + } + + std::thread::sleep(Duration::from_millis(1)); + + let proof: Vec<_> = (&mut rng).gen_iter::().take(128).collect(); + tama.update_task_progress( + tasks[3].0, + &tasks[3].1, + tasks[3].2, + Some("A based prover"), + TaskStatus::Success, + Some(&proof) + ).unwrap(); - assert_eq!( - proof, - tama.get_task_proof(tasks[2].0, &tasks[2].1, tasks[2].2).unwrap() - ); + { + let task_status = tama.get_task_proving_status(tasks[3].0, &tasks[3].1, tasks[3].2).unwrap(); + assert_eq!(task_status.len(), 3); + assert_eq!(task_status[0].0, Some(String::from("A based prover"))); + assert_eq!(task_status[0].1, TaskStatus::Success); + assert_eq!(task_status[1].0, Some(String::from("A flaky prover"))); + assert_eq!(task_status[1].1, TaskStatus::NetworkFailure); + assert_eq!(task_status[2].0, Some(tasks[3].3.clone())); + assert_eq!(task_status[2].1, TaskStatus::Registered); + } + assert_eq!( + proof, + tama.get_task_proof(tasks[3].0, &tasks[3].1, tasks[3].2).unwrap() + ); + } } - }