Skip to content

Commit

Permalink
mmproxy: Worker login, notifications, and keepalive.
Browse files Browse the repository at this point in the history
  • Loading branch information
parazyd committed Oct 19, 2023
1 parent 1d22ee4 commit 8665702
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 24 deletions.
31 changes: 31 additions & 0 deletions bin/darkfi-mmproxy/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2023 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

use darkfi::rpc::jsonrpc::ErrorCode;

/// Custom RPC error implementations
pub enum RpcError {
InvalidWorkerLogin = -32110,
UnsupportedMiningAlgo = -32111,
}

impl From<RpcError> for ErrorCode {
fn from(x: RpcError) -> ErrorCode {
ErrorCode::ServerError(x as i32)
}
}
37 changes: 33 additions & 4 deletions bin/darkfi-mmproxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,27 @@ use std::{
use darkfi::{
async_daemonize, cli_desc,
rpc::{
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult},
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult, JsonSubscriber},
server::{listen_and_serve, RequestHandler},
},
system::{StoppableTask, StoppableTaskPtr},
Error, Result,
};
use darkfi_serial::async_trait;
use log::{debug, error, info};
use log::{error, info};
use serde::Deserialize;
use smol::{
channel,
lock::{Mutex, MutexGuard, RwLock},
stream::StreamExt,
Executor,
};
use structopt::StructOpt;
use structopt_toml::StructOptToml;
use url::Url;
use uuid::Uuid;

mod error;
mod monero;
mod stratum;

Expand All @@ -64,6 +67,10 @@ struct Args {
/// JSON-RPC server listen URL
rpc_listen: Url,

#[structopt(long, default_value = "tcp://127.0.0.1:18081")]
/// monerod JSON-RPC server listen URL
monerod_rpc: Url,

#[structopt(long)]
/// List of worker logins
workers: Vec<String>,
Expand All @@ -73,11 +80,30 @@ struct Args {
log: Option<String>,
}

struct Worker {
/// JSON-RPC notification subscriber, used to send job notifications
job_sub: JsonSubscriber,
/// Keepalive sender channel, pinged from stratum keepalived
ka_send: channel::Sender<()>,
/// Background keepalive task reference
ka_task: StoppableTaskPtr,
}

impl Worker {
fn new(
job_sub: JsonSubscriber,
ka_send: channel::Sender<()>,
ka_task: StoppableTaskPtr,
) -> Self {
Self { job_sub, ka_send, ka_task }
}
}

struct MiningProxy {
/// Worker logins
logins: HashMap<String, String>,
/// Workers UUIDs
workers: RwLock<HashMap<String, String>>,
workers: Arc<RwLock<HashMap<Uuid, Worker>>>,
/// JSON-RPC connection tracker
rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
/// Main async executor reference
Expand All @@ -88,7 +114,7 @@ impl MiningProxy {
fn new(logins: HashMap<String, String>, executor: Arc<Executor<'static>>) -> Self {
Self {
logins,
workers: RwLock::new(HashMap::new()),
workers: Arc::new(RwLock::new(HashMap::new())),
rpc_connections: Mutex::new(HashSet::new()),
executor,
}
Expand Down Expand Up @@ -153,6 +179,7 @@ impl RequestHandler for MiningProxy {

async_daemonize!(realmain);
async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
info!("Starting DarkFi x Monero merge mining proxy...");
// Parse worker logins
let mut logins = HashMap::new();
for worker in args.workers {
Expand Down Expand Up @@ -180,6 +207,8 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
ex.clone(),
);

info!("Merge mining proxy ready, waiting for connections...");

// Signal handling for graceful termination.
let (signals_handler, signals_task) = SignalHandler::new(ex)?;
signals_handler.wait_termination(signals_task).await?;
Expand Down
125 changes: 105 additions & 20 deletions bin/darkfi-mmproxy/src/stratum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,56 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

use std::collections::HashMap;

use darkfi::rpc::{
jsonrpc::{ErrorCode, JsonError, JsonResponse, JsonResult},
util::JsonValue,
use std::{collections::HashMap, sync::Arc, time::Duration};

use darkfi::{
rpc::{
jsonrpc::{ErrorCode, JsonError, JsonResponse, JsonResult, JsonSubscriber},
util::JsonValue,
},
system::{timeout::timeout, StoppableTask},
Error, Result,
};
use log::{debug, error, warn};
use smol::{channel, lock::RwLock};
use uuid::Uuid;

use super::MiningProxy;
use super::{error::RpcError, MiningProxy, Worker};

/// Algo string representing Monero's RandomX
pub const RANDOMX_ALGO: &str = "rx/0";

impl MiningProxy {
/// Background task listening for keepalives from a worker, if timeout is reached
/// the worker will be dropped.
async fn keepalive_task(
workers: Arc<RwLock<HashMap<Uuid, Worker>>>,
uuid: Uuid,
ka_recv: channel::Receiver<()>,
) -> Result<()> {
const TIMEOUT: Duration = Duration::from_secs(60);

loop {
let Ok(r) = timeout(TIMEOUT, ka_recv.recv()).await else {
// Timeout, remove worker
warn!("keepalive_task {} worker timed out", uuid);
workers.write().await.remove(&uuid);
break
};

match r {
Ok(()) => continue,
Err(e) => {
error!("keepalive_task {} channel recv error: {}", uuid, e);
workers.write().await.remove(&uuid);
break
}
}
}

Ok(())
}

/// Stratum login method. `darkfi-mmproxy` will check that it is a valid worker
/// login, and will also search for `RANDOMX_ALGO`.
/// TODO: More proper error codes
Expand Down Expand Up @@ -79,22 +115,60 @@ impl MiningProxy {
}

if !found_xmr_algo {
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
return JsonError::new(
RpcError::UnsupportedMiningAlgo.into(),
Some("Unsupported mining algo".to_string()),
id,
)
.into()
}

// Check valid login
let Some(known_pass) = self.logins.get(login) else {
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
return JsonError::new(
RpcError::InvalidWorkerLogin.into(),
Some("Unknown worker username".to_string()),
id,
)
.into()
};

if known_pass != pass {
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
return JsonError::new(
RpcError::InvalidWorkerLogin.into(),
Some("Invalid worker password".to_string()),
id,
)
.into()
}

// Login success, generate UUID
let uuid = Uuid::new_v4();

todo!()
// Create job subscriber
let job_sub = JsonSubscriber::new("job");

// Create keepalive channel
let (ka_send, ka_recv) = channel::unbounded();

// Create background keepalive task
let ka_task = StoppableTask::new();

// Create worker
let worker = Worker::new(job_sub, ka_send, ka_task.clone());

// Insert into connections map
self.workers.write().await.insert(uuid, worker);

// Spawn background task
ka_task.start(
Self::keepalive_task(self.workers.clone(), uuid.clone(), ka_recv),
move |_| async move { debug!("keepalive_task for {} exited", uuid) },
Error::DetachedTaskStopped,
self.executor.clone(),
);

todo!("send current job")
}

pub async fn stratum_submit(&self, id: u16, params: JsonValue) -> JsonResult {
Expand Down Expand Up @@ -150,17 +224,28 @@ impl MiningProxy {
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
};

if self.workers.read().await.contains_key(uuid) {
return JsonResponse::new(
JsonValue::Object(HashMap::from([(
"status".to_string(),
JsonValue::String("KEEPALIVED".to_string()),
)])),
id,
)
.into()
let Ok(uuid) = Uuid::try_from(uuid.as_str()) else {
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
};

// Ping the keepalive task
let workers = self.workers.read().await;
let Some(worker) = workers.get(&uuid) else {
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
};

if let Err(e) = worker.ka_send.send(()).await {
error!("stratum_keepalived: keepalive task ping error: {}", e);
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
}

return JsonError::new(ErrorCode::InvalidParams, None, id).into()
return JsonResponse::new(
JsonValue::Object(HashMap::from([(
"status".to_string(),
JsonValue::String("KEEPALIVED".to_string()),
)])),
id,
)
.into()
}
}

0 comments on commit 8665702

Please sign in to comment.