Skip to content

Commit

Permalink
Send HMAC signatures to workers
Browse files Browse the repository at this point in the history
  • Loading branch information
kalabukdima committed May 15, 2024
1 parent 6b8f309 commit c1142bd
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 3 deletions.
6 changes: 5 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion crates/network-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "network-scheduler"
version = "0.4.4"
version = "0.4.5"
edition = "2021"

[dependencies]
Expand All @@ -9,11 +9,13 @@ async-trait = "0.1"
aws-config = { version = "1", features = ["behavior-version-latest"] }
aws-sdk-s3 = "1"
axum = { version = "0.6", features = ["json"] }
base64 = "0.22.1"
clap = { version = "4", features = ["derive", "env"] }
derive-enum-from-into = "0.1"
env_logger = "0.11"
futures = "0.3"
hex = "0.4"
hmac = "0.12.1"
iter_num_tools = "0.7"
itertools = "0.12"
lazy_static = "1"
Expand All @@ -27,8 +29,10 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_with = { version = "3", features = ["hex"] }
serde_yaml = "0.9"
sha2 = "0.10.8"
sha3 = "0.10"
tokio = { version = "1", features = ["full"] }
url = "2.5.0"

contract-client = { version = "0.1", path = "../contract-client" }
subsquid-messages = { workspace = true, features = ["semver"] }
Expand Down
5 changes: 5 additions & 0 deletions crates/network-scheduler/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ pub struct Config {
#[serde_as(as = "DurationSeconds")]
#[serde(rename = "successful_dial_retry_sec")]
pub successful_dial_retry: Duration,
#[serde_as(as = "DurationSeconds")]
#[serde(rename = "signature_refresh_interval_sec")]
pub signature_refresh_interval: Duration,
pub replication_factor: usize, // this is minimum
pub dynamic_replication: bool,
pub scheduling_unit_size: usize,
Expand All @@ -45,6 +48,8 @@ pub struct Config {
pub storage_domain: String,
pub dataset_buckets: Vec<String>,
pub scheduler_state_bucket: String,
#[serde(skip_serializing)]
pub cloudflare_storage_secret: String,
}

impl Config {
Expand Down
1 change: 1 addition & 0 deletions crates/network-scheduler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod metrics_server;
mod scheduler;
mod scheduling_unit;
mod server;
mod signature;
mod storage;
mod worker_state;

Expand Down
34 changes: 33 additions & 1 deletion crates/network-scheduler/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use semver::VersionReq;
use serde::{Deserialize, Serialize};

use contract_client::Worker;
use subsquid_messages::HttpHeader;
use subsquid_messages::{pong::Status as WorkerStatus, Ping};
use subsquid_network_transport::PeerId;

Expand All @@ -22,6 +23,9 @@ use crate::worker_state::{JailReason, WorkerState};
lazy_static! {
pub static ref SUPPORTED_WORKER_VERSIONS: VersionReq = ">=0.4.0".parse().unwrap();
}
const WORKER_ID_HEADER: &str = "worker-id";
const WORKER_SIGNATURE_HEADER: &str = "worker-signature";

#[derive(Default, Serialize, Deserialize)]
pub struct Scheduler {
known_units: HashMap<UnitId, SchedulingUnit>,
Expand Down Expand Up @@ -74,6 +78,12 @@ impl Scheduler {
}
}

pub fn regenerate_signatures(&mut self) {
for state in self.worker_states.values_mut() {
state.regenerate_signature();
}
}

/// Register ping msg from a worker. Returns worker status if ping was accepted, otherwise None
pub fn ping(&mut self, worker_id: PeerId, msg: Ping) -> WorkerStatus {
let version = msg.sem_version();
Expand All @@ -93,7 +103,9 @@ impl Scheduler {
return WorkerStatus::Jailed(worker_state.jail_reason_str());
}
let assigned_chunks = worker_state.assigned_chunks(&self.known_units);
WorkerStatus::Active(chunks_to_assignment(assigned_chunks))
let mut assignment = chunks_to_assignment(assigned_chunks);
add_signature_headers(&mut assignment, &worker_id, worker_state);
WorkerStatus::Active(assignment)
}

pub fn workers_to_dial(&self) -> Vec<PeerId> {
Expand Down Expand Up @@ -421,3 +433,23 @@ impl Scheduler {
});
}
}

fn add_signature_headers(
assignment: &mut subsquid_messages::WorkerAssignment,
worker_id: &PeerId,
worker_state: &WorkerState,
) {
assignment.http_headers.extend([
HttpHeader {
name: WORKER_ID_HEADER.to_string(),
value: worker_id.to_string(),
},
HttpHeader {
name: WORKER_SIGNATURE_HEADER.to_string(),
value: worker_state
.signature
.clone()
.expect("Worker signature not initialized"),
},
]);
}
13 changes: 13 additions & 0 deletions crates/network-scheduler/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ impl<S: Stream<Item = SchedulerEvent> + Send + Unpin + 'static> Server<S> {
self.spawn_jail_inactive_workers_task(storage_client.clone());
self.spawn_jail_stale_workers_task(storage_client.clone());
self.spawn_jail_unreachable_workers_task(storage_client);
self.spawn_regenerate_signatures_task();

let mut sigint = signal(SignalKind::interrupt())?;
let mut sigterm = signal(SignalKind::terminate())?;
Expand Down Expand Up @@ -219,6 +220,18 @@ impl<S: Stream<Item = SchedulerEvent> + Send + Unpin + 'static> Server<S> {
self.task_manager.spawn(task);
}

fn spawn_regenerate_signatures_task(&mut self) {
let scheduler = self.scheduler.clone();
let task = move |_| {
let scheduler = scheduler.clone();
async move {
log::info!("Regenerating signatures");
scheduler.write().await.regenerate_signatures();
}
};
self.task_manager.spawn_periodic(task, Config::get().signature_refresh_interval);
}

fn spawn_jail_inactive_workers_task(&mut self, storage_client: S3Storage) {
let timeout = Config::get().worker_inactive_timeout;
self.spawn_jail_task(storage_client, timeout, |s| s.jail_inactive_workers());
Expand Down
34 changes: 34 additions & 0 deletions crates/network-scheduler/src/signature.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use base64::{engine::general_purpose::STANDARD as base64, Engine};
use hmac::{Hmac, Mac};
use sha2::Sha256;
use url::form_urlencoded;

// Generate signature with timestamp as required by Cloudflare's is_timed_hmac_valid_v0 function
// https://developers.cloudflare.com/ruleset-engine/rules-language/functions/#hmac-validation
pub fn timed_hmac(message: &str, secret: &str, timestamp: usize) -> String {
let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
mac.update(format!("{}{}", message, timestamp).as_bytes());
let digest = mac.finalize().into_bytes();
let token: String =
form_urlencoded::byte_serialize(base64.encode(digest.as_slice()).as_bytes()).collect();
format!("{timestamp}-{token}")
}

pub fn timed_hmac_now(message: &str, secret: &str) -> String {
let timestamp = std::time::UNIX_EPOCH
.elapsed()
.unwrap()
.as_secs()
.try_into()
.unwrap();
timed_hmac(message, secret, timestamp)
}

#[test]
fn test_hmac_sign() {
let message = "12D3KooWBwbQFT48cNYGPbDwm8rjasbZkc1VMo6rCR6217qr165S";
let secret = "test_secret";
let timestamp = 1715662737;
let expected = "1715662737-E%2BaW1Y5hS587YGeJFKGTnp%2Fhn8rEMmSRlEslPiOQsuE%3D";
assert_eq!(timed_hmac(message, secret, timestamp), expected);
}
1 change: 1 addition & 0 deletions crates/network-scheduler/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ impl S3Storage {
let mut scheduler = Scheduler::from_json(bytes.as_ref())?;
// List of datasets could have changed since last run, need to clear deprecated units
scheduler.clear_deprecated_units();
scheduler.regenerate_signatures();
Ok(scheduler)
}

Expand Down
14 changes: 14 additions & 0 deletions crates/network-scheduler/src/worker_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use subsquid_network_transport::PeerId;
use crate::cli::Config;
use crate::data_chunk::DataChunk;
use crate::scheduling_unit::{SchedulingUnit, UnitId};
use crate::signature::timed_hmac_now;

#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -37,6 +38,8 @@ pub struct WorkerState {
pub unreachable_since: Option<SystemTime>,
#[serde(default)]
pub jail_reason: Option<JailReason>,
#[serde(skip)]
pub signature: Option<String>,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
Expand Down Expand Up @@ -82,6 +85,10 @@ impl WorkerState {
last_dial_ok: false,
unreachable_since: None,
jail_reason: None,
signature: Some(timed_hmac_now(
&peer_id.to_string(),
&Config::get().cloudflare_storage_secret,
)),
}
}

Expand Down Expand Up @@ -248,6 +255,13 @@ impl WorkerState {
self.jailed = false;
self.jail_reason = None;
}

pub fn regenerate_signature(&mut self) {
self.signature = Some(timed_hmac_now(
&self.peer_id.to_string(),
&Config::get().cloudflare_storage_secret,
));
}
}

impl Display for WorkerState {
Expand Down

0 comments on commit c1142bd

Please sign in to comment.