Skip to content

Commit

Permalink
Merge branch 'darkfi-master-track' into fix/dependencies_sqlcipher
Browse files Browse the repository at this point in the history
  • Loading branch information
spital committed Nov 9, 2023
2 parents f3f0e69 + 77eb05c commit f3e4a0a
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 95 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/darkfi-mmproxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ darkfi-serial = {path = "../../src/serial", features = ["async"]}

# Misc
log = "0.4.20"
rand = "0.8.5"

# Monero
epee-encoding = {version = "0.5.0", features = ["derive"]}
Expand Down
27 changes: 24 additions & 3 deletions bin/darkfi-mmproxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use darkfi::{
rpc::{
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult, JsonSubscriber},
server::{listen_and_serve, RequestHandler},
util::JsonValue,
},
system::{StoppableTask, StoppableTaskPtr},
Error, Result,
Expand Down Expand Up @@ -94,6 +95,8 @@ struct Monerod {
struct Worker {
/// JSON-RPC notification subscriber, used to send job notifications
job_sub: JsonSubscriber,
/// Current job ID for the worker
job_id: Uuid,
/// Keepalive sender channel, pinged from stratum keepalived
ka_send: channel::Sender<()>,
/// Background keepalive task reference
Expand All @@ -106,7 +109,24 @@ impl Worker {
ka_send: channel::Sender<()>,
ka_task: StoppableTaskPtr,
) -> Self {
Self { job_sub, ka_send, ka_task }
Self { job_sub, job_id: Uuid::new_v4(), ka_send, ka_task }
}

async fn send_job(&mut self, blob: String, target: String) -> Result<()> {
// Update job id
self.job_id = Uuid::new_v4();

let params: JsonValue = HashMap::from([
("blob".to_string(), blob.into()),
("job_id".to_string(), self.job_id.to_string().into()),
("target".to_string(), target.into()),
])
.into();

info!("Sending mining job notification to worker");
self.job_sub.notify(params).await;

Ok(())
}
}

Expand Down Expand Up @@ -169,10 +189,11 @@ impl RequestHandler for MiningProxy {
"on_getblockhash" => self.monero_on_get_block_hash(req.id, req.params).await,
"get_block_template" => self.monero_get_block_template(req.id, req.params).await,
"getblocktemplate" => self.monero_get_block_template(req.id, req.params).await,

/*
"submit_block" => self.monero_submit_block(req.id, req.params).await,
"submitblock" => self.monero_submit_block(req.id, req.params).await,
"generateblocks" => self.monero_generateblocks(req.id, req.params).await,

/*
"get_last_block_header" => self.monero_get_last_block_header(req.id, req.params).await,
"get_block_header_by_hash" => self.monero_get_block_header_by_hash(req.id, req.params).await,
"get_block_header_by_height" => self.monero_get_block_header_by_height(req.id, req.params).await,
Expand Down
76 changes: 65 additions & 11 deletions bin/darkfi-mmproxy/src/monero.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use darkfi::{
},
Error, Result,
};
use log::{debug, error};
use log::{debug, error, info};
use monero::blockdata::transaction::{ExtraField, RawExtraField, SubField::MergeMining};

use super::MiningProxy;
Expand All @@ -48,31 +48,31 @@ impl MiningProxy {
{
Ok(v) => v,
Err(e) => {
error!(target: "rpc::monero::oneshot_request", "Error sending RPC request to monerod: {}", e);
error!(target: "rpc::monero::oneshot_request", "[RPC] Error sending RPC request to monerod: {}", e);
return Err(Error::ParseFailed("Failed sending monerod RPC request"))
}
};

let response_bytes = match response.body_bytes().await {
Ok(v) => v,
Err(e) => {
error!(target: "rpc::monero::get_block_count", "Error reading monerod RPC response: {}", e);
error!(target: "rpc::monero::get_block_count", "[RPC] Error reading monerod RPC response: {}", e);
return Err(Error::ParseFailed("Failed reading monerod RPC reponse"))
}
};

let response_string = match String::from_utf8(response_bytes) {
Ok(v) => v,
Err(e) => {
error!(target: "rpc::monero::get_block_count", "Error parsing monerod RPC response: {}", e);
error!(target: "rpc::monero::get_block_count", "[RPC] Error parsing monerod RPC response: {}", e);
return Err(Error::ParseFailed("Failed parsing monerod RPC reponse"))
}
};

let response_json: JsonValue = match response_string.parse() {
Ok(v) => v,
Err(e) => {
error!(target: "rpc::monero::get_block_count", "Error parsing monerod RPC response: {}", e);
error!(target: "rpc::monero::get_block_count", "[RPC] Error parsing monerod RPC response: {}", e);
return Err(Error::ParseFailed("Failed parsing monerod RPC reponse"))
}
};
Expand All @@ -90,7 +90,7 @@ impl MiningProxy {
let rep = match self.oneshot_request(req).await {
Ok(v) => v,
Err(e) => {
error!(target: "rpc::monero::get_block_count", "{}", e);
error!(target: "rpc::monero::get_block_count", "[RPC] {}", e);
return JsonError::new(InternalError, Some(e.to_string()), id).into()
}
};
Expand Down Expand Up @@ -120,7 +120,7 @@ impl MiningProxy {
let rep = match self.oneshot_request(req).await {
Ok(v) => v,
Err(e) => {
error!(target: "rpc::monero::get_block_count", "{}", e);
error!(target: "rpc::monero::get_block_count", "[RPC] {}", e);
return JsonError::new(InternalError, Some(e.to_string()), id).into()
}
};
Expand Down Expand Up @@ -180,7 +180,7 @@ impl MiningProxy {
let mut rep = match self.oneshot_request(req).await {
Ok(v) => v,
Err(e) => {
error!(target: "rpc::monero::get_block_template", "{}", e);
error!(target: "rpc::monero::get_block_template", "[RPC] {}", e);
return JsonError::new(InternalError, Some(e.to_string()), id).into()
}
};
Expand Down Expand Up @@ -212,15 +212,69 @@ impl MiningProxy {
JsonResponse::new(rep, id).into()
}

/*
/// Submit a mined block to the network
/// <https://www.getmonero.org/resources/developer-guides/daemon-rpc.html#submit_block>
pub async fn monero_submit_block(&self, id: u16, params: JsonValue) -> JsonResult {
todo!()
debug!(target: "rpc::monero", "submit_block()");

let Some(params_vec) = params.get::<Vec<JsonValue>>() else {
return JsonError::new(InvalidParams, None, id).into()
};

if params_vec.is_empty() {
return JsonError::new(InvalidParams, None, id).into()
}

// Deserialize the block blob(s) to make sure it's a valid block
for element in params_vec.iter() {
let Some(block_hex) = element.get::<String>() else {
return JsonError::new(InvalidParams, None, id).into()
};

let Ok(block_bytes) = hex::decode(block_hex) else {
return JsonError::new(InvalidParams, None, id).into()
};

let Ok(block) = monero::consensus::deserialize::<monero::Block>(&block_bytes) else {
return JsonError::new(InvalidParams, None, id).into()
};

info!("[RPC] Got submitted Monero block id {}", block.id());
}

// Now when all the blocks submitted are valid, we'll just forward them to
// monerod to submit onto the network.
let req = JsonRequest::new("submit_block", params);
let rep = match self.oneshot_request(req).await {
Ok(v) => v,
Err(e) => {
error!(target: "rpc::monero::submit_block", "[RPC] {}", e);
return JsonError::new(InternalError, Some(e.to_string()), id).into()
}
};

JsonResponse::new(rep, id).into()
}

/// Generate a block and specify the address to receive the coinbase reward.
/// <https://www.getmonero.org/resources/developer-guides/daemon-rpc.html#generateblocks>
pub async fn monero_generateblocks(&self, id: u16, params: JsonValue) -> JsonResult {
todo!()
debug!(target: "rpc::monero", "generateblocks()");

// This request can just passthrough
let req = JsonRequest::new("generateblocks", params);
let rep = match self.oneshot_request(req).await {
Ok(v) => v,
Err(e) => {
error!(target: "rpc::monero::generateblocks", "[RPC] {}", e);
return JsonError::new(InternalError, Some(e.to_string()), id).into()
}
};

JsonResponse::new(rep, id).into()
}

/*
pub async fn monero_get_last_block_header(&self, id: u16, params: JsonValue) -> JsonResult {
todo!()
}
Expand Down
40 changes: 30 additions & 10 deletions bin/darkfi-mmproxy/src/stratum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use darkfi::{
Error, Result,
};
use log::{debug, error, info, warn};
use rand::{rngs::OsRng, Rng};
use smol::{channel, lock::RwLock};
use uuid::Uuid;

Expand Down Expand Up @@ -159,7 +160,7 @@ impl MiningProxy {
let ka_task = StoppableTask::new();

// Create worker
let worker = Worker::new(job_sub, ka_send, ka_task.clone());
let worker = Worker::new(job_sub.clone(), ka_send, ka_task.clone());

// Insert into connections map
self.workers.write().await.insert(uuid, worker);
Expand All @@ -174,15 +175,34 @@ impl MiningProxy {

info!("Added worker {} ({})", login, uuid);

// TODO: Send current job
JsonResponse::new(
JsonValue::Object(HashMap::from([(
"status".to_string(),
JsonValue::String("KEEPALIVED".to_string()),
)])),
id,
)
.into()
// Get block template for mining
let gbt_params: JsonValue = HashMap::from([
("wallet_address".to_string(), self.monerod.wallet_address.clone().into()),
("reserve_size".to_string(), (0_f64).into()),
])
.into();

let block_template = match self.monero_get_block_template(OsRng.gen(), gbt_params).await {
JsonResult::Response(resp) => {
resp.result.get::<HashMap<String, JsonValue>>().unwrap().clone()
}
_ => {
error!("[STRATUM] Failed getting block template from monerod");
return JsonError::new(ErrorCode::InternalError, None, id).into()
}
};

// Send the job to the worker
let mut workers = self.workers.write().await;
let worker = workers.get_mut(&uuid).unwrap();
let blob = block_template["blockhashing_blob"].get::<String>().unwrap();
let target = block_template["wide_difficulty"].get::<String>().unwrap();
if let Err(e) = worker.send_job(blob.clone(), target.clone()).await {
error!("[STRATUM] Failed sending job to {}: {}", uuid, e);
return JsonError::new(ErrorCode::InternalError, None, id).into()
}

job_sub.into()
}

pub async fn stratum_submit(&self, id: u16, params: JsonValue) -> JsonResult {
Expand Down
16 changes: 8 additions & 8 deletions bin/dnet/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ def add_node(self, node):

for i, id in enumerate(info["outbound_slots"]):
if id == 0:
outbounds = self.nodes[name]['outbound'][f"{i}"] = "none"
outbounds = self.nodes[name]['outbound'][f"{i}"] = ["none", 0]
continue
assert id in channel_lookup
url = channel_lookup[id]["url"]
outbounds = self.nodes[name]['outbound'][f"{i}"] = url
outbounds = self.nodes[name]['outbound'][f"{i}"] = [url, id]

for channel in channels:
if channel["session"] != "seed":
Expand Down Expand Up @@ -122,26 +122,26 @@ def add_event(self, event):
logging.debug(f"{current_time} inbound (disconnect): {addr}")
case "outbound_slot_sleeping":
slot = info["slot"]
event = self.nodes[name]['event']
event[(f"{name}", f"{slot}")] = ["sleeping", 0]
logging.debug(f"{current_time} slot {slot}: sleeping")
self.nodes[name]['event'][(f"{name}", f"{slot}")] = "sleeping"
case "outbound_slot_connecting":
slot = info["slot"]
addr = info["addr"]
event = self.nodes[name]['event']
event[(f"{name}", f"{slot}")] = f"connecting: addr={addr}"
event[(f"{name}", f"{slot}")] = [f"connecting: addr={addr}", 0]
logging.debug(f"{current_time} slot {slot}: connecting addr={addr}")
case "outbound_slot_connected":
slot = info["slot"]
addr = info["addr"]
channel_id = info["channel_id"]
event = self.nodes[name]['event']
event[(f"{name}", f"{slot}")] = f"connected: addr={addr}"
id = info["channel_id"]
self.nodes[name]['outbound'][f"{slot}"] = [addr, id]
logging.debug(f"{current_time} slot {slot}: connected addr={addr}")
case "outbound_slot_disconnected":
slot = info["slot"]
err = info["err"]
event = self.nodes[name]['event']
event[(f"{name}", f"{slot}")] = f"disconnected: {err}"
event[(f"{name}", f"{slot}")] = [f"disconnected: {err}", 0]
logging.debug(f"{current_time} slot {slot}: disconnected err='{err}'")
case "outbound_peer_discovery":
attempt = info["attempt"]
Expand Down
2 changes: 1 addition & 1 deletion bin/dnet/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
class JsonRpc:

async def start(self, host, port):
logging.info(f"trying to connect to {host}:{port}")
#logging.info(f"trying to connect to {host}:{port}")
reader, writer = await asyncio.open_connection(host, port)
self.reader = reader
self.writer = writer
Expand Down
Loading

0 comments on commit f3e4a0a

Please sign in to comment.