Skip to content

Commit

Permalink
Add automatically swicth upstream for JDC
Browse files Browse the repository at this point in the history
  • Loading branch information
UnidenifiedUser committed Sep 19, 2023
1 parent 241693a commit 7ded80c
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 39 deletions.
8 changes: 4 additions & 4 deletions roles/jd-client/proxy-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ min_extranonce2_size = 8
withhold = true

# Auth keys for open encrypted connection downstream
authority_public_key = "3VANfft6ei6jQq1At7d8nmiZzVhBFS4CiQujdgim1ign"
authority_secret_key = "7qbpUjScc865jyX2kiB4NVJANoC7GA7TAJupdzXWkc62"
authority_public_key = "2di19GHYQnAZJmEpoUeP7C3Eg9TCcksHr23rZCC83dvUiZgiDL"
authority_secret_key = "2Z1FZug7mZNyM63ggkm37r4oKQ29khLjAvEx43rGkFN47RcJ2t"
cert_validity_sec = 3600

# How many time the JDC try to reinitialize itself after a failure
Expand All @@ -29,11 +29,11 @@ retry = 10
tp_address = "75.119.150.111:8442"

[[upstreams]]
authority_pubkey = "3VANfft6ei6jQq1At7d8nmiZzVhBFS4CiQujdgim1ign"
authority_pubkey = "2di19GHYQnAZJmEpoUeP7C3Eg9TCcksHr23rZCC83dvUiZgiDL"
pool_address = "127.0.0.1:34254"
jd_address = "127.0.0.1:34264"

[[upstreams]]
authority_pubkey = "3VANfft6ei6jQq1At7d8nmiZzVhBFS4CiQujdgim1ign"
authority_pubkey = "2di19GHYQnAZJmEpoUeP7C3Eg9TCcksHr23rZCC83dvUiZgiDL"
pool_address = "127.0.0.1:34254"
jd_address = "127.0.0.1:34264"
62 changes: 37 additions & 25 deletions roles/jd-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ fn process_cli_args<'a>() -> ProxyResult<'a, ProxyConfig> {
async fn main() {
tracing_subscriber::fmt::init();

let mut failed = 0;
let mut failed: i32 = 0;
let mut upstream_index = 0;
let mut interrupt_signal_future = Box::pin(tokio::signal::ctrl_c().fuse());

// Channel used to manage failed tasks
Expand All @@ -111,11 +112,19 @@ async fn main() {

let proxy_config = process_cli_args().unwrap();

while failed < proxy_config.retry {
while failed < proxy_config.retry as i32 {
{
let task_collector = task_collector.clone();
let tx_status = tx_status.clone();
let initialize = initialize_jd(tx_status.clone(), task_collector.clone());
let initialize = initialize_jd(
tx_status.clone(),
task_collector.clone(),
proxy_config
.upstreams
.get(upstream_index)
.expect("No more upstreams in config")
.clone(),
);
tokio::task::spawn(initialize);
}
// Check all tasks if is_finished() is true, if so exit
Expand Down Expand Up @@ -178,6 +187,20 @@ async fn main() {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
State::UpstreamRogue => {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
task_collector
.safe_lock(|s| {
for handle in s {
handle.abort();
}
})
.unwrap();
failed -= 1;
upstream_index += 1;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
State::Healthy(msg) => {
info!("HEALTHY message: {}", msg);
}
Expand All @@ -190,33 +213,22 @@ async fn main() {
async fn initialize_jd(
tx_status: async_channel::Sender<Status<'static>>,
task_collector: Arc<Mutex<Vec<AbortHandle>>>,
upstream_config: proxy_config::Upstream,
) {
let proxy_config = process_cli_args().unwrap();

// Format `Upstream` connection address
let mut parts = proxy_config.upstreams[0].pool_address.split(':');
let address = parts.next().unwrap_or_else(|| {
panic!(
"Invalid pool address {}",
proxy_config.upstreams[0].pool_address
)
});
let mut parts = upstream_config.pool_address.split(':');
let address = parts
.next()
.unwrap_or_else(|| panic!("Invalid pool address {}", upstream_config.pool_address));
let port = parts
.next()
.and_then(|p| p.parse::<u16>().ok())
.unwrap_or_else(|| {
panic!(
"Invalid pool address {}",
proxy_config.upstreams[0].pool_address
)
});
.unwrap_or_else(|| panic!("Invalid pool address {}", upstream_config.pool_address));
let upstream_addr = SocketAddr::new(
IpAddr::from_str(address).unwrap_or_else(|_| {
panic!(
"Invalid pool address {}",
proxy_config.upstreams[0].pool_address
)
}),
IpAddr::from_str(address)
.unwrap_or_else(|_| panic!("Invalid pool address {}", upstream_config.pool_address)),
port,
);

Expand All @@ -234,7 +246,7 @@ async fn initialize_jd(
// Instantiate a new `Upstream` (SV2 Pool)
let upstream = match upstream_sv2::Upstream::new(
upstream_addr,
proxy_config.upstreams[0].authority_pubkey.clone(),
upstream_config.authority_pubkey.clone(),
0, // TODO
status::Sender::Upstream(tx_status.clone()),
send_channel_factory,
Expand Down Expand Up @@ -296,12 +308,12 @@ async fn initialize_jd(
let ip_tp = parts.next().unwrap().to_string();
let port_tp = parts.next().unwrap().parse::<u16>().unwrap();

let mut parts = proxy_config.upstreams[0].jd_address.split(':');
let mut parts = upstream_config.jd_address.split(':');
let ip_jd = parts.next().unwrap().to_string();
let port_jd = parts.next().unwrap().parse::<u16>().unwrap();
let jd = JobDeclarator::new(
SocketAddr::new(IpAddr::from_str(ip_jd.as_str()).unwrap(), port_jd),
proxy_config.upstreams[0]
upstream_config
.authority_pubkey
.clone()
.into_inner()
Expand Down
1 change: 1 addition & 0 deletions roles/jd-client/src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub enum State<'a> {
DownstreamShutdown(Error<'a>),
BridgeShutdown(Error<'a>),
UpstreamShutdown(Error<'a>),
UpstreamRogue,
Healthy(String),
}

Expand Down
14 changes: 10 additions & 4 deletions roles/jd-client/src/upstream_sv2/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,15 @@ impl Upstream {
.await
.unwrap();
}
Ok(SendTo::None(None)) => (),
// Used when we receive s submit share error
Ok(SendTo::None(None)) => {
let sender = self_.safe_lock(|s| s.tx_status.clone()).unwrap();
let _ = sender
.send(status::Status {
state: status::State::UpstreamRogue,
})
.await;
}
// No need to handle impossible state just panic cause are impossible and we
// will never panic ;-) Verified: handle_message_mining only either panics,
// returns Ok(SendTo::None(None)) or Ok(SendTo::None(Some(m))), or returns Err
Expand Down Expand Up @@ -542,9 +550,7 @@ impl ParseUpstreamMiningMessages<Downstream, NullDownstreamMiningSelector, NoRou
) -> Result<roles_logic_sv2::handlers::mining::SendTo<Downstream>, RolesLogicError> {
info!("Up: Rejected Submitted Share");
debug!("Up: Handling SubmitSharesError: {:?}", &m);
Ok(SendTo::RelaySameMessageToRemote(
self.downstream.as_ref().unwrap().clone(),
))
Ok(SendTo::None(None))
}

/// The SV2 `NewMiningJob` message is NOT handled because it is NOT used for the Translator
Expand Down
2 changes: 1 addition & 1 deletion roles/jd-server/jds-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ tp_address = "127.0.0.1:8442"
# tp_address = "75.119.150.111:8442"

# SRI Pool JD config
listen_jd_address = "127.0.0.1:34264"
listen_jd_address = "127.0.0.1:34264"
12 changes: 7 additions & 5 deletions test/config/interop-jd-translator/jdc-config.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
# Local SRI Pool Upstream Connection
upstream_address = "127.0.0.1"
upstream_port = 34254
upstream_authority_pubkey = "2di19GHYQnAZJmEpoUeP7C3Eg9TCcksHr23rZCC83dvUiZgiDL"

# Local Mining Device Downstream Connection
downstream_address = "0.0.0.0"
downstream_port = 34265
Expand All @@ -28,6 +23,13 @@ cert_validity_sec = 3600
# How many time the JDC try to reinitialize itself after a failure
retry = 10

tp_address = "75.119.150.111:8442"

[jd_config]
jd_address = "127.0.0.1:34264"
tp_address = "75.119.150.111:8442"

[[upstreams]]
authority_pubkey = "2di19GHYQnAZJmEpoUeP7C3Eg9TCcksHr23rZCC83dvUiZgiDL"
pool_address = "127.0.0.1:34254"
jd_address = "127.0.0.1:34264"

0 comments on commit 7ded80c

Please sign in to comment.