Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful shutdown for mining-proxy #1021

Merged
merged 2 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 58 additions & 103 deletions roles/mining-proxy/src/lib/downstream_mining.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
#![allow(dead_code)]
use std::{convert::TryInto, sync::Arc};

use super::upstream_mining::{StdFrame as UpstreamFrame, UpstreamMiningNode};
use async_channel::{Receiver, SendError, Sender};
use tokio::{net::TcpListener, sync::oneshot::Receiver as TokioReceiver};
use tracing::{info, warn};

use codec_sv2::{StandardEitherFrame, StandardSv2Frame};
use network_helpers_sv2::plain_connection_tokio::PlainConnection;
use roles_logic_sv2::{
common_messages_sv2::{SetupConnection, SetupConnectionSuccess},
common_properties::{CommonDownstreamData, IsDownstream, IsMiningDownstream},
Expand All @@ -15,25 +19,23 @@ use roles_logic_sv2::{
routing_logic::MiningProxyRoutingLogic,
utils::Mutex,
};
use tracing::info;

use codec_sv2::{StandardEitherFrame, StandardSv2Frame};
use super::upstream_mining::{ProxyRemoteSelector, StdFrame as UpstreamFrame, UpstreamMiningNode};

pub type Message = MiningDeviceMessages<'static>;
pub type StdFrame = StandardSv2Frame<Message>;
pub type EitherFrame = StandardEitherFrame<Message>;

/// 1 to 1 connection with a downstream node that implement the mining (sub)protocol can be either
/// a mining device or a downstream proxy.
/// A downstream can only be linked with an upstream at a time. Support multi upstrems for
/// downstream do no make much sense.
/// A downstream can only be linked with an upstream at a time. Support multi upstreams for
/// downstream do not make much sense.
#[derive(Debug)]
pub struct DownstreamMiningNode {
id: u32,
receiver: Receiver<EitherFrame>,
sender: Sender<EitherFrame>,
pub status: DownstreamMiningNodeStatus,
pub prev_job_id: Option<u32>,
upstream: Option<Arc<Mutex<UpstreamMiningNode>>>,
}

Expand All @@ -47,22 +49,14 @@ pub enum DownstreamMiningNodeStatus {
#[derive(Debug, Clone)]
#[allow(clippy::enum_variant_names)]
pub enum Channel {
DowntreamHomUpstreamGroup {
DownstreamHomUpstreamGroup {
data: CommonDownstreamData,
channel_id: u32,
group_id: u32,
},
DowntreamHomUpstreamExtended {
DownstreamHomUpstreamExtended {
data: CommonDownstreamData,
channel_id: u32,
group_id: u32,
},
// Below variant is not supported cause do not have much sense
// DowntreamNonHomUpstreamGroup { data: CommonDownstreamData, group_ids: Vec<u32>, extended_ids: Vec<u32>},
DowntreamNonHomUpstreamExtended {
data: CommonDownstreamData,
group_ids: Vec<u32>,
extended_ids: Vec<u32>,
},
}

Expand Down Expand Up @@ -101,7 +95,7 @@ impl DownstreamMiningNodeStatus {
match self {
DownstreamMiningNodeStatus::Initializing => panic!(),
DownstreamMiningNodeStatus::Paired(data) => {
let channel = Channel::DowntreamHomUpstreamGroup {
let channel = Channel::DownstreamHomUpstreamGroup {
data: *data,
channel_id,
group_id,
Expand All @@ -113,50 +107,22 @@ impl DownstreamMiningNodeStatus {
}
}

fn open_channel_for_down_hom_up_extended(&mut self, channel_id: u32, group_id: u32) {
fn open_channel_for_down_hom_up_extended(&mut self, channel_id: u32, _group_id: u32) {
match self {
DownstreamMiningNodeStatus::Initializing => panic!(),
DownstreamMiningNodeStatus::Paired(data) => {
let channel = Channel::DowntreamHomUpstreamExtended {
let channel = Channel::DownstreamHomUpstreamExtended {
data: *data,
channel_id,
group_id,
};
let self_ = Self::ChannelOpened(channel);
let _ = std::mem::replace(self, self_);
}
DownstreamMiningNodeStatus::ChannelOpened(..) => panic!("Channel already opened"),
}
}

fn add_extended_from_non_hom_for_up_extended(&mut self, id: u32) {
match self {
DownstreamMiningNodeStatus::Initializing => panic!(),
DownstreamMiningNodeStatus::Paired(data) => {
let channel = Channel::DowntreamNonHomUpstreamExtended {
data: *data,
group_ids: vec![],
extended_ids: vec![id],
};
let self_ = Self::ChannelOpened(channel);
let _ = std::mem::replace(self, self_);
}
DownstreamMiningNodeStatus::ChannelOpened(
Channel::DowntreamNonHomUpstreamExtended { extended_ids, .. },
) => {
if !extended_ids.contains(&id) {
extended_ids.push(id)
}
}
_ => panic!(),
}
}
}

use core::convert::TryInto;
use std::sync::Arc;
use tokio::task;

impl PartialEq for DownstreamMiningNode {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
Expand All @@ -177,16 +143,12 @@ impl DownstreamMiningNode {
self.status
.open_channel_for_down_hom_up_extended(channel_id, group_id);
}
pub fn add_extended_from_non_hom_for_up_extended(&mut self, id: u32) {
self.status.add_extended_from_non_hom_for_up_extended(id);
}

pub fn new(receiver: Receiver<EitherFrame>, sender: Sender<EitherFrame>, id: u32) -> Self {
Self {
receiver,
sender,
status: DownstreamMiningNodeStatus::Initializing,
prev_job_id: None,
upstream: None,
id,
}
Expand Down Expand Up @@ -316,7 +278,7 @@ impl DownstreamMiningNode {

pub fn exit(self_: Arc<Mutex<Self>>) {
if let Some(up) = self_.safe_lock(|s| s.upstream.clone()).unwrap() {
super::upstream_mining::UpstreamMiningNode::remove_dowstream(up, &self_);
UpstreamMiningNode::remove_dowstream(up, &self_);
};
self_
.safe_lock(|s| {
Expand All @@ -326,8 +288,6 @@ impl DownstreamMiningNode {
}
}

use super::upstream_mining::ProxyRemoteSelector;

/// It impl UpstreamMining cause the proxy act as an upstream node for the DownstreamMiningNode
impl
ParseDownstreamMiningMessages<
Expand Down Expand Up @@ -414,14 +374,14 @@ impl
match &self.status {
DownstreamMiningNodeStatus::Initializing => todo!(),
DownstreamMiningNodeStatus::Paired(_) => todo!(),
DownstreamMiningNodeStatus::ChannelOpened(Channel::DowntreamHomUpstreamGroup {
DownstreamMiningNodeStatus::ChannelOpened(Channel::DownstreamHomUpstreamGroup {
..
}) => {
let remote = self.upstream.as_ref().unwrap();
let message = Mining::SubmitSharesStandard(m);
Ok(SendTo::RelayNewMessageToRemote(remote.clone(), message))
}
DownstreamMiningNodeStatus::ChannelOpened(Channel::DowntreamHomUpstreamExtended {
DownstreamMiningNodeStatus::ChannelOpened(Channel::DownstreamHomUpstreamExtended {
..
}) => {
// Safe unwrap is channel have been opened it means that the dowsntream is paired
Expand All @@ -430,12 +390,6 @@ impl
let res = UpstreamMiningNode::handle_std_shr(remote.clone(), m).unwrap();
Ok(SendTo::Respond(res))
}
DownstreamMiningNodeStatus::ChannelOpened(
Channel::DowntreamNonHomUpstreamExtended { .. },
) => {
// unreachable cause the proxy do not support this kind of channel
unreachable!();
}
}
}

Expand Down Expand Up @@ -483,44 +437,48 @@ impl
}
}

use network_helpers_sv2::plain_connection_tokio::PlainConnection;
use std::net::SocketAddr;
use tokio::net::TcpListener;

pub async fn listen_for_downstream_mining(address: SocketAddr) {
info!("Listening for downstream mining connections on {}", address);
let listner = TcpListener::bind(address).await.unwrap();
pub async fn listen_for_downstream_mining(
listener: TcpListener,
mut shutdown_rx: TokioReceiver<()>,
) {
let mut ids = roles_logic_sv2::utils::Id::new();

while let Ok((stream, _)) = listner.accept().await {
let (receiver, sender): (Receiver<EitherFrame>, Sender<EitherFrame>) =
PlainConnection::new(stream).await;
let node = DownstreamMiningNode::new(receiver, sender, ids.next());

task::spawn(async move {
let mut incoming: StdFrame = node.receiver.recv().await.unwrap().try_into().unwrap();
let message_type = incoming.get_header().unwrap().msg_type();
let payload = incoming.payload();
let routing_logic = super::get_common_routing_logic();
let node = Arc::new(Mutex::new(node));

// Call handle_setup_connection or fail
match DownstreamMiningNode::handle_message_common(
node.clone(),
message_type,
payload,
routing_logic,
) {
Ok(SendToCommon::RelayNewMessageToRemote(_, message)) => {
let message = match message {
roles_logic_sv2::parsers::CommonMessages::SetupConnectionSuccess(m) => m,
_ => panic!(),
};
DownstreamMiningNode::start(node, message).await
loop {
tokio::select! {
accept_result = listener.accept() => {
let (stream, _) = accept_result.expect("failed to accept downstream connection");
let (receiver, sender): (Receiver<EitherFrame>, Sender<EitherFrame>) =
PlainConnection::new(stream).await;
let node = DownstreamMiningNode::new(receiver, sender, ids.next());

let mut incoming: StdFrame =
node.receiver.recv().await.unwrap().try_into().unwrap();
let message_type = incoming.get_header().unwrap().msg_type();
let payload = incoming.payload();
let routing_logic = super::get_common_routing_logic();
let node = Arc::new(Mutex::new(node));

// Call handle_setup_connection or fail
let common_msg = DownstreamMiningNode::handle_message_common(
node.clone(),
message_type,
payload,
routing_logic
).expect("failed to process downstream message");


if let SendToCommon::RelayNewMessageToRemote(_, relay_msg) = common_msg {
if let roles_logic_sv2::parsers::CommonMessages::SetupConnectionSuccess(setup_msg) = relay_msg {
DownstreamMiningNode::start(node, setup_msg).await;
}
} else {
warn!("Received unexpected message from downstream");
}
_ => panic!(),
}
});
_ = &mut shutdown_rx => {
info!("Closing listener");
johnnyasantoss marked this conversation as resolved.
Show resolved Hide resolved
return;
}
}
}
}

Expand All @@ -529,14 +487,11 @@ impl IsDownstream for DownstreamMiningNode {
match self.status {
DownstreamMiningNodeStatus::Initializing => panic!(),
DownstreamMiningNodeStatus::Paired(data) => data,
DownstreamMiningNodeStatus::ChannelOpened(Channel::DowntreamHomUpstreamGroup {
DownstreamMiningNodeStatus::ChannelOpened(Channel::DownstreamHomUpstreamGroup {
data,
..
}) => data,
DownstreamMiningNodeStatus::ChannelOpened(
Channel::DowntreamNonHomUpstreamExtended { data, .. },
) => data,
DownstreamMiningNodeStatus::ChannelOpened(Channel::DowntreamHomUpstreamExtended {
DownstreamMiningNodeStatus::ChannelOpened(Channel::DownstreamHomUpstreamExtended {
data,
..
}) => data,
Expand Down
35 changes: 17 additions & 18 deletions roles/mining-proxy/src/lib/upstream_mining.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
#![allow(dead_code)]

use super::EXTRANONCE_RANGE_1_LENGTH;
use roles_logic_sv2::utils::Id;
use core::convert::TryInto;
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};

use super::downstream_mining::{Channel, DownstreamMiningNode, StdFrame as DownstreamFrame};
use async_channel::{Receiver, SendError, Sender};
use async_recursion::async_recursion;
use nohash_hasher::BuildNoHashHasher;
use tokio::{net::TcpStream, task};
use tracing::{debug, error, info};

use codec_sv2::{HandshakeRole, Initiator, StandardEitherFrame, StandardSv2Frame};
use network_helpers_sv2::noise_connection_tokio::Connection;
use nohash_hasher::BuildNoHashHasher;
use roles_logic_sv2::{
channel_logic::{
channel_factory::{ExtendedChannelKind, OnNewShare, ProxyExtendedChannelFactory, Share},
Expand All @@ -26,14 +28,15 @@ use roles_logic_sv2::{
routing_logic::MiningProxyRoutingLogic,
selectors::{DownstreamMiningSelector, ProxyDownstreamMiningSelector as Prs},
template_distribution_sv2::SubmitSolution,
utils::{GroupId, Mutex},
utils::{GroupId, Id, Mutex},
};
use std::{collections::HashMap, sync::Arc};
use tokio::{net::TcpStream, task};
use tracing::error;

use stratum_common::bitcoin::TxOut;

use super::{
downstream_mining::{Channel, DownstreamMiningNode, StdFrame as DownstreamFrame},
EXTRANONCE_RANGE_1_LENGTH,
};

pub type Message = PoolMessages<'static>;
pub type StdFrame = StandardSv2Frame<Message>;
pub type EitherFrame = StandardEitherFrame<Message>;
Expand Down Expand Up @@ -188,10 +191,6 @@ pub struct UpstreamMiningNode {
reconnect: bool,
}

use core::convert::TryInto;
use std::{net::SocketAddr, time::Duration};
use tracing::{debug, info};

/// It assume that endpoint NEVER change flags and version!
/// I can open both extended and group channel with upstream.
impl UpstreamMiningNode {
Expand Down Expand Up @@ -471,11 +470,10 @@ impl UpstreamMiningNode {
super::downstream_mining::DownstreamMiningNodeStatus::ChannelOpened(
channel,
) => match channel {
Channel::DowntreamHomUpstreamGroup { channel_id, .. } => Some(*channel_id),
Channel::DowntreamHomUpstreamExtended { channel_id, .. } => {
Channel::DownstreamHomUpstreamGroup { channel_id, .. } => Some(*channel_id),
Channel::DownstreamHomUpstreamExtended { channel_id, .. } => {
Some(*channel_id)
}
Channel::DowntreamNonHomUpstreamExtended { .. } => todo!(),
},
})
.unwrap()
Expand Down Expand Up @@ -1048,7 +1046,7 @@ impl
.ok_or(Error::NoDownstreamsConnected)?;
for downstream in downstreams {
match downstream.safe_lock(|r| r.get_channel().clone()).unwrap() {
Channel::DowntreamHomUpstreamGroup {
Channel::DownstreamHomUpstreamGroup {
channel_id,
group_id,
..
Expand Down Expand Up @@ -1257,9 +1255,10 @@ impl IsMiningUpstream<DownstreamMiningNode, ProxyRemoteSelector> for UpstreamMin

#[cfg(test)]
mod tests {
use super::*;
use std::net::{IpAddr, Ipv4Addr};

use super::*;

#[test]
fn new_upstream_minining_node() {
let id = 0;
Expand Down
Loading
Loading