From 86657028d5c0a1fbad3b2222d1b248f155fa52f1 Mon Sep 17 00:00:00 2001 From: parazyd Date: Thu, 19 Oct 2023 14:18:51 +0200 Subject: [PATCH] mmproxy: Worker login, notifications, and keepalive. --- bin/darkfi-mmproxy/src/error.rs | 31 ++++++++ bin/darkfi-mmproxy/src/main.rs | 37 ++++++++- bin/darkfi-mmproxy/src/stratum.rs | 125 +++++++++++++++++++++++++----- 3 files changed, 169 insertions(+), 24 deletions(-) create mode 100644 bin/darkfi-mmproxy/src/error.rs diff --git a/bin/darkfi-mmproxy/src/error.rs b/bin/darkfi-mmproxy/src/error.rs new file mode 100644 index 000000000000..aa5a81d49faa --- /dev/null +++ b/bin/darkfi-mmproxy/src/error.rs @@ -0,0 +1,31 @@ +/* This file is part of DarkFi (https://dark.fi) + * + * Copyright (C) 2020-2023 Dyne.org foundation + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +use darkfi::rpc::jsonrpc::ErrorCode; + +/// Custom RPC error implementations +pub enum RpcError { + InvalidWorkerLogin = -32110, + UnsupportedMiningAlgo = -32111, +} + +impl From for ErrorCode { + fn from(x: RpcError) -> ErrorCode { + ErrorCode::ServerError(x as i32) + } +} diff --git a/bin/darkfi-mmproxy/src/main.rs b/bin/darkfi-mmproxy/src/main.rs index 76a041ec546f..253773ba015b 100644 --- a/bin/darkfi-mmproxy/src/main.rs +++ b/bin/darkfi-mmproxy/src/main.rs @@ -24,16 +24,17 @@ use std::{ use darkfi::{ async_daemonize, cli_desc, rpc::{ - jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult}, + jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult, JsonSubscriber}, server::{listen_and_serve, RequestHandler}, }, system::{StoppableTask, StoppableTaskPtr}, Error, Result, }; use darkfi_serial::async_trait; -use log::{debug, error, info}; +use log::{error, info}; use serde::Deserialize; use smol::{ + channel, lock::{Mutex, MutexGuard, RwLock}, stream::StreamExt, Executor, @@ -41,7 +42,9 @@ use smol::{ use structopt::StructOpt; use structopt_toml::StructOptToml; use url::Url; +use uuid::Uuid; +mod error; mod monero; mod stratum; @@ -64,6 +67,10 @@ struct Args { /// JSON-RPC server listen URL rpc_listen: Url, + #[structopt(long, default_value = "tcp://127.0.0.1:18081")] + /// monerod JSON-RPC server listen URL + monerod_rpc: Url, + #[structopt(long)] /// List of worker logins workers: Vec, @@ -73,11 +80,30 @@ struct Args { log: Option, } +struct Worker { + /// JSON-RPC notification subscriber, used to send job notifications + job_sub: JsonSubscriber, + /// Keepalive sender channel, pinged from stratum keepalived + ka_send: channel::Sender<()>, + /// Background keepalive task reference + ka_task: StoppableTaskPtr, +} + +impl Worker { + fn new( + job_sub: JsonSubscriber, + ka_send: channel::Sender<()>, + ka_task: StoppableTaskPtr, + ) -> Self { + Self { job_sub, ka_send, ka_task } + } +} + struct MiningProxy { /// Worker logins logins: HashMap, /// Workers UUIDs - workers: RwLock>, + workers: Arc>>, /// JSON-RPC connection tracker rpc_connections: Mutex>, /// Main async executor reference @@ -88,7 +114,7 @@ impl MiningProxy { fn new(logins: HashMap, executor: Arc>) -> Self { Self { logins, - workers: RwLock::new(HashMap::new()), + workers: Arc::new(RwLock::new(HashMap::new())), rpc_connections: Mutex::new(HashSet::new()), executor, } @@ -153,6 +179,7 @@ impl RequestHandler for MiningProxy { async_daemonize!(realmain); async fn realmain(args: Args, ex: Arc>) -> Result<()> { + info!("Starting DarkFi x Monero merge mining proxy..."); // Parse worker logins let mut logins = HashMap::new(); for worker in args.workers { @@ -180,6 +207,8 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { ex.clone(), ); + info!("Merge mining proxy ready, waiting for connections..."); + // Signal handling for graceful termination. let (signals_handler, signals_task) = SignalHandler::new(ex)?; signals_handler.wait_termination(signals_task).await?; diff --git a/bin/darkfi-mmproxy/src/stratum.rs b/bin/darkfi-mmproxy/src/stratum.rs index c2bfdcf444de..e4f129928b02 100644 --- a/bin/darkfi-mmproxy/src/stratum.rs +++ b/bin/darkfi-mmproxy/src/stratum.rs @@ -16,20 +16,56 @@ * along with this program. If not, see . */ -use std::collections::HashMap; - -use darkfi::rpc::{ - jsonrpc::{ErrorCode, JsonError, JsonResponse, JsonResult}, - util::JsonValue, +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use darkfi::{ + rpc::{ + jsonrpc::{ErrorCode, JsonError, JsonResponse, JsonResult, JsonSubscriber}, + util::JsonValue, + }, + system::{timeout::timeout, StoppableTask}, + Error, Result, }; +use log::{debug, error, warn}; +use smol::{channel, lock::RwLock}; use uuid::Uuid; -use super::MiningProxy; +use super::{error::RpcError, MiningProxy, Worker}; /// Algo string representing Monero's RandomX pub const RANDOMX_ALGO: &str = "rx/0"; impl MiningProxy { + /// Background task listening for keepalives from a worker, if timeout is reached + /// the worker will be dropped. + async fn keepalive_task( + workers: Arc>>, + uuid: Uuid, + ka_recv: channel::Receiver<()>, + ) -> Result<()> { + const TIMEOUT: Duration = Duration::from_secs(60); + + loop { + let Ok(r) = timeout(TIMEOUT, ka_recv.recv()).await else { + // Timeout, remove worker + warn!("keepalive_task {} worker timed out", uuid); + workers.write().await.remove(&uuid); + break + }; + + match r { + Ok(()) => continue, + Err(e) => { + error!("keepalive_task {} channel recv error: {}", uuid, e); + workers.write().await.remove(&uuid); + break + } + } + } + + Ok(()) + } + /// Stratum login method. `darkfi-mmproxy` will check that it is a valid worker /// login, and will also search for `RANDOMX_ALGO`. /// TODO: More proper error codes @@ -79,22 +115,60 @@ impl MiningProxy { } if !found_xmr_algo { - return JsonError::new(ErrorCode::InvalidParams, None, id).into() + return JsonError::new( + RpcError::UnsupportedMiningAlgo.into(), + Some("Unsupported mining algo".to_string()), + id, + ) + .into() } // Check valid login let Some(known_pass) = self.logins.get(login) else { - return JsonError::new(ErrorCode::InvalidParams, None, id).into() + return JsonError::new( + RpcError::InvalidWorkerLogin.into(), + Some("Unknown worker username".to_string()), + id, + ) + .into() }; if known_pass != pass { - return JsonError::new(ErrorCode::InvalidParams, None, id).into() + return JsonError::new( + RpcError::InvalidWorkerLogin.into(), + Some("Invalid worker password".to_string()), + id, + ) + .into() } // Login success, generate UUID let uuid = Uuid::new_v4(); - todo!() + // Create job subscriber + let job_sub = JsonSubscriber::new("job"); + + // Create keepalive channel + let (ka_send, ka_recv) = channel::unbounded(); + + // Create background keepalive task + let ka_task = StoppableTask::new(); + + // Create worker + let worker = Worker::new(job_sub, ka_send, ka_task.clone()); + + // Insert into connections map + self.workers.write().await.insert(uuid, worker); + + // Spawn background task + ka_task.start( + Self::keepalive_task(self.workers.clone(), uuid.clone(), ka_recv), + move |_| async move { debug!("keepalive_task for {} exited", uuid) }, + Error::DetachedTaskStopped, + self.executor.clone(), + ); + + todo!("send current job") } pub async fn stratum_submit(&self, id: u16, params: JsonValue) -> JsonResult { @@ -150,17 +224,28 @@ impl MiningProxy { return JsonError::new(ErrorCode::InvalidParams, None, id).into() }; - if self.workers.read().await.contains_key(uuid) { - return JsonResponse::new( - JsonValue::Object(HashMap::from([( - "status".to_string(), - JsonValue::String("KEEPALIVED".to_string()), - )])), - id, - ) - .into() + let Ok(uuid) = Uuid::try_from(uuid.as_str()) else { + return JsonError::new(ErrorCode::InvalidParams, None, id).into() + }; + + // Ping the keepalive task + let workers = self.workers.read().await; + let Some(worker) = workers.get(&uuid) else { + return JsonError::new(ErrorCode::InvalidParams, None, id).into() + }; + + if let Err(e) = worker.ka_send.send(()).await { + error!("stratum_keepalived: keepalive task ping error: {}", e); + return JsonError::new(ErrorCode::InvalidParams, None, id).into() } - return JsonError::new(ErrorCode::InvalidParams, None, id).into() + return JsonResponse::new( + JsonValue::Object(HashMap::from([( + "status".to_string(), + JsonValue::String("KEEPALIVED".to_string()), + )])), + id, + ) + .into() } }