Skip to content

Commit fd849f4

Browse files
authored
Add (script add) and restrict (script remove) to owners (#1025)
1 parent 018af8a commit fd849f4

File tree

13 files changed

+364
-61
lines changed

13 files changed

+364
-61
lines changed

Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

aquamarine/src/actor.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use aquamarine_vm::AquamarineVM;
2121
use particle_protocol::Particle;
2222

2323
use futures::FutureExt;
24+
use std::ops::Mul;
2425
use std::{
2526
collections::VecDeque,
2627
fmt::Debug,
@@ -42,11 +43,12 @@ impl Deadline {
4243
}
4344
}
4445

45-
pub fn is_expired(&self, now: u64) -> bool {
46+
pub fn is_expired(&self, now_ms: u64) -> bool {
4647
self.timestamp
48+
.mul(1000)
4749
.checked_add(self.ttl as u64)
4850
// Whether ts is in the past
49-
.map(|ts| ts < now)
51+
.map(|ts| ts < now_ms)
5052
// If timestamp + ttl gives overflow, consider particle expired
5153
.unwrap_or_else(|| {
5254
log::warn!("timestamp {} + ttl {} overflowed", self.timestamp, self.ttl);
@@ -73,8 +75,8 @@ impl Actor {
7375
}
7476
}
7577

76-
pub fn is_expired(&self, now: u64) -> bool {
77-
self.deadline.is_expired(now)
78+
pub fn is_expired(&self, now_ms: u64) -> bool {
79+
self.deadline.is_expired(now_ms)
7880
}
7981

8082
pub fn ingest(&mut self, particle: AwaitedParticle) {

aquamarine/src/plumber.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@ use std::{
2727

2828
/// Get current time from OS
2929
#[cfg(not(test))]
30-
use real_time::now;
30+
use real_time::now_ms;
3131

3232
use crate::awaited_particle::{AwaitedEffects, AwaitedParticle};
3333
use futures::task::Waker;
3434
/// For tests, mocked time is used
3535
#[cfg(test)]
36-
use mock_time::now;
36+
use mock_time::now_ms;
3737

3838
pub struct Plumber {
3939
events: VecDeque<AwaitedEffects>,
@@ -58,7 +58,7 @@ impl Plumber {
5858
self.wake();
5959

6060
let deadline = Deadline::from(&particle);
61-
if deadline.is_expired(now()) {
61+
if deadline.is_expired(now_ms()) {
6262
log::info!("Particle {} is expired, ignoring", particle.id);
6363
self.events.push_back(AwaitedEffects::expired(particle));
6464
return;
@@ -80,7 +80,7 @@ impl Plumber {
8080
}
8181

8282
// Remove expired actors
83-
let now = now();
83+
let now = now_ms();
8484
self.actors.retain(|_, actor| !actor.is_expired(now));
8585

8686
// Gather effects and put VMs back
@@ -124,8 +124,8 @@ impl Plumber {
124124
/// Implements `now` by taking number of non-leap seconds from `Utc::now()`
125125
mod real_time {
126126
#[allow(dead_code)]
127-
pub fn now() -> u64 {
128-
chrono::Utc::now().timestamp() as u64
127+
pub fn now_ms() -> u64 {
128+
(chrono::Utc::now().timestamp() * 1000) as u64
129129
}
130130
}
131131

@@ -214,7 +214,7 @@ pub mod mock_time {
214214
static MOCK_TIME: RefCell<u64> = RefCell::new(0);
215215
}
216216

217-
pub fn now() -> u64 {
217+
pub fn now_ms() -> u64 {
218218
MOCK_TIME.with(|cell| cell.borrow().clone())
219219
}
220220

connection-pool/src/behaviour.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ impl ConnectionPoolBehaviour {
158158
// If particle is sent to the current node, process it locally
159159
self.queue.push_back(particle);
160160
outlet.send(true).ok();
161+
self.wake();
161162
} else {
162163
// Send particle to remote peer
163164
self.push_event(NetworkBehaviourAction::NotifyHandler {

crates/server-config/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ config-utils = { path = "../config-utils" }
99
trust-graph = "0.2.0"
1010
particle-protocol = { path = "../../particle-protocol"}
1111

12-
air-interpreter-wasm = "0.3.4"
12+
air-interpreter-wasm = "0.4.2"
1313
libp2p = { package = "fluence-fork-libp2p", version = "0.34.0" }
1414

1515
serde = { version = "1.0.118", features = ["derive"] }

crates/test-utils/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ connection-pool = { path = "../../connection-pool" }
1919
script-storage = { path = "../../script-storage" }
2020

2121
stepper-interface = "0.1.2"
22-
air-interpreter-wasm = "0.3.4"
22+
air-interpreter-wasm = "0.4.2"
2323
aquamarine-vm = "0.1.29"
2424

2525
libp2p = { package = "fluence-fork-libp2p", version = "0.34.0" }

crates/test-utils/src/connected_client.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::{
2525
use fluence_client::{Client, Transport};
2626
use particle_protocol::Particle;
2727

28+
use anyhow::bail;
2829
use async_std::task;
2930
use core::ops::Deref;
3031
use libp2p::{core::Multiaddr, PeerId};
@@ -175,4 +176,19 @@ impl ConnectedClient {
175176
let particle = self.receive();
176177
read_args(particle, &self.peer_id)
177178
}
179+
180+
/// Wait for a particle with specified `particle_id`, and read "op" "return" result from it
181+
pub fn wait_particle_args(&mut self, particle_id: String) -> anyhow::Result<Vec<JValue>> {
182+
let mut max = 10;
183+
loop {
184+
max -= 1;
185+
if max <= 0 {
186+
bail!("timed out waiting for particle {}", particle_id);
187+
}
188+
let particle = self.receive();
189+
if particle.id == particle_id {
190+
break Ok(read_args(particle, &self.peer_id));
191+
}
192+
}
193+
}
178194
}

deploy/deployment_config.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
"159.89.2.70",
1414
"157.230.98.75"
1515
],
16-
"branch": "script_storage",
16+
"branch": "script_storage_list",
1717
"bootstrap": "138.197.177.2",
1818
"nodes": [
1919
"138.197.177.2"

particle-closures/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,4 @@ parking_lot = "0.11.1"
3737
uuid = "0.8.2"
3838
chrono = "0.4.19"
3939
thiserror = "1.0.23"
40+
humantime-serde = "1.0.1"

particle-closures/src/host_closures.rs

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@ use script_storage::ScriptStorageApi;
2828
use server_config::ServicesConfig;
2929

3030
use async_std::task;
31+
use humantime_serde::re::humantime::format_duration as pretty;
3132
use libp2p::{core::Multiaddr, PeerId};
3233
use multihash::{Code, MultihashDigest};
3334
use serde_json::{json, Value as JValue};
35+
use std::borrow::Borrow;
3436
use std::num::ParseIntError;
3537
use std::time::Duration;
3638
use std::{str::FromStr, sync::Arc};
@@ -92,7 +94,7 @@ impl<C: Clone + Send + Sync + 'static + AsRef<KademliaApi> + AsRef<ConnectionPoo
9294
})
9395
}
9496

95-
fn route(&self, particle: ParticleParameters, args: Vec<IValue>) -> Option<IValue> {
97+
fn route(&self, params: ParticleParameters, args: Vec<IValue>) -> Option<IValue> {
9698
let args = match Args::parse(args) {
9799
Ok(args) => args,
98100
Err(err) => {
@@ -118,7 +120,7 @@ impl<C: Clone + Send + Sync + 'static + AsRef<KademliaApi> + AsRef<ConnectionPoo
118120
("dht", "add_provider") => (self.add_provider)(args),
119121
("dht", "get_providers") => (self.get_providers)(args),
120122

121-
("srv", "create") => (self.create_service)(particle, args),
123+
("srv", "create") => (self.create_service)(params, args),
122124
("srv", "get_interface") => (self.get_interface)(args),
123125
("srv", "get_interfaces") => (self.get_active_interfaces)(args),
124126

@@ -127,13 +129,14 @@ impl<C: Clone + Send + Sync + 'static + AsRef<KademliaApi> + AsRef<ConnectionPoo
127129
("dist", "get_modules") => (self.get_modules)(args),
128130
("dist", "get_blueprints") => (self.get_blueprints)(args),
129131

130-
("script", "add") => wrap(self.add_script(args)),
131-
("script", "remove") => wrap(self.remove_script(args)),
132+
("script", "add") => wrap(self.add_script(args, params)),
133+
("script", "remove") => wrap(self.remove_script(args, params)),
134+
("script", "list") => wrap(self.list_scripts()),
132135

133136
("op", "identify") => (self.identify)(args),
134137
("op", "identity") => ok(Array(args.function_args)),
135138

136-
_ => (self.call_service)(particle, args),
139+
_ => (self.call_service)(params, args),
137140
}
138141
}
139142

@@ -174,11 +177,10 @@ impl<C: Clone + Send + Sync + 'static + AsRef<KademliaApi> + AsRef<ConnectionPoo
174177
Ok(contact.map(|c| json!(c)))
175178
}
176179

177-
fn add_script(&self, args: Args) -> Result<JValue, JError> {
180+
fn add_script(&self, args: Args, params: ParticleParameters) -> Result<JValue, JError> {
178181
#[derive(thiserror::Error, Debug)]
179182
#[error("Error while deserializing field interval_sec: not a valid u64")]
180183
struct Error(#[source] ParseIntError);
181-
182184
let mut args = args.function_args.into_iter();
183185

184186
let script: String = Args::next("script", &mut args)?;
@@ -188,18 +190,46 @@ impl<C: Clone + Send + Sync + 'static + AsRef<KademliaApi> + AsRef<ConnectionPoo
188190
.transpose()
189191
.map_err(Error)?;
190192
let interval = interval.map(Duration::from_secs);
191-
let id = self.script_storage.add_script(script, interval)?;
193+
let creator = PeerId::from_str(&params.init_user_id)?;
194+
let id = self.script_storage.add_script(script, interval, creator)?;
192195

193196
Ok(json!(id))
194197
}
195198

196-
fn remove_script(&self, args: Args) -> Result<JValue, JError> {
197-
let uuid: String = Args::next("uuid", &mut args.function_args.into_iter())?;
198-
let ok = task::block_on(self.script_storage.remove_script(uuid))?;
199+
fn remove_script(&self, args: Args, params: ParticleParameters) -> Result<JValue, JError> {
200+
let mut args = args.function_args.into_iter();
201+
202+
let uuid: String = Args::next("uuid", &mut args)?;
203+
let force: Option<String> = Args::maybe_next("force", &mut args)?;
204+
// TODO HACK: this is a hack to allow anyone to delete any script if they know this secret
205+
let force = force.map_or(false, |s| s == "--force");
206+
let actor = PeerId::from_str(&params.init_user_id)?;
207+
208+
let ok = task::block_on(self.script_storage.remove_script(uuid, actor, force))?;
199209

200210
Ok(json!(ok))
201211
}
202212

213+
fn list_scripts(&self) -> Result<JValue, JError> {
214+
let scripts = task::block_on(self.script_storage.list_scripts())?;
215+
216+
Ok(JValue::Array(
217+
scripts
218+
.into_iter()
219+
.map(|(id, script)| {
220+
let id: &String = id.borrow();
221+
json!({
222+
"id": id,
223+
"src": script.src,
224+
"failures": script.failures,
225+
"interval": script.interval.map(|i| pretty(i).to_string()),
226+
"owner": script.owner.to_string(),
227+
})
228+
})
229+
.collect(),
230+
))
231+
}
232+
203233
fn kademlia(&self) -> &KademliaApi {
204234
self.connectivity.as_ref()
205235
}

particle-node/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ config-utils = { path = "../crates/config-utils" }
2020
kademlia = { path = "../crates/kademlia" }
2121

2222
trust-graph = "0.2.0"
23-
air-interpreter-wasm = "0.3.4"
23+
air-interpreter-wasm = "0.4.2"
2424

2525
libp2p = { package = "fluence-fork-libp2p", version = "0.34.0" }
2626
multihash ="0.13"

0 commit comments

Comments
 (0)