Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(consensus)!: state change on foreign proposals in local blocks #1130

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions applications/tari_indexer/src/transaction_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ where
let transaction_substate_address = SubstateAddress::for_transaction_receipt(tx_hash.into_array().into());

if transaction.all_inputs_iter().next().is_none() {
self.try_with_committee(iter::once(transaction_substate_address), |mut client| {
self.try_with_committee(iter::once(transaction_substate_address), 2, |mut client| {
let transaction = transaction.clone();
async move { client.submit_transaction(transaction).await }
})
Expand All @@ -98,7 +98,7 @@ where
.all_inputs_iter()
// If there is no version specified, submit to the validator node with version 0
.map(|i| i.or_zero_version().to_substate_address());
self.try_with_committee(involved, |mut client| {
self.try_with_committee(involved, 2, |mut client| {
let transaction = transaction.clone();
async move { client.submit_transaction(transaction).await }
})
Expand All @@ -123,7 +123,7 @@ where
transaction_id: TransactionId,
) -> Result<TransactionResultStatus, TransactionManagerError> {
let transaction_substate_address = SubstateAddress::for_transaction_receipt(transaction_id.into_array().into());
self.try_with_committee(iter::once(transaction_substate_address), |mut client| async move {
self.try_with_committee(iter::once(transaction_substate_address), 1, |mut client| async move {
client.get_finalized_transaction_result(transaction_id).await.optional()
})
.await?
Expand All @@ -140,7 +140,7 @@ where
) -> Result<SubstateResult, TransactionManagerError> {
let shard = SubstateAddress::from_substate_id(&substate_address, version);

self.try_with_committee(iter::once(shard), |mut client| {
self.try_with_committee(iter::once(shard), 1, |mut client| {
// This double clone looks strange, but it's needed because this function is called in a loop
// and each iteration needs its own copy of the address (because of the move).
let substate_address = substate_address.clone();
Expand All @@ -160,6 +160,7 @@ where
async fn try_with_committee<'a, F, T, E, TFut, IShard>(
&self,
substate_addresses: IShard,
mut num_to_query: usize,
mut callback: F,
) -> Result<T, TransactionManagerError>
where
Expand All @@ -186,27 +187,37 @@ where
return Err(TransactionManagerError::NoCommitteeMembers);
}

let mut num_succeeded = 0;
let mut last_error = None;
let mut last_return = None;
for validator in all_members {
let client = self.client_provider.create_client(&validator);
match callback(client).await {
Ok(ret) => {
return Ok(ret);
num_to_query = num_to_query.saturating_sub(1);
num_succeeded += 1;
last_return = Some(ret);
if num_to_query == 0 {
break;
}
},
Err(err) => {
warn!(
target: LOG_TARGET,
"Request failed for validator '{}': {}", validator, err
);
last_error = Some(err.to_string());
continue;
},
}
}

Err(TransactionManagerError::AllValidatorsFailed {
committee_size,
last_error,
})
if num_succeeded == 0 {
return Err(TransactionManagerError::AllValidatorsFailed {
committee_size,
last_error,
});
}

Ok(last_return.expect("last_return must be Some if num_succeeded > 0"))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

import React from "react";
import { jsonRpc } from "../utils/json_rpc.tsx";
import NodeControls from "./NodeControls.tsx";

interface Props {
showLogs: boolean;
}

export default function MinotariNodes(props: Props) {
const [nodes, setNodes] = React.useState<null | [any]>(null);
const [isLoading, setIsLoading] = React.useState(true);


React.useEffect(() => {
jsonRpc("list_instances", { by_type: "MinoTariNode" }).then((nodes: any) => setNodes(nodes.instances))
.then(() => setIsLoading(false));
}, []);

if (isLoading) {
return <div>Loading...</div>;
}

return (
<div>
{nodes!.map((node: any, i: number) => (
<Node key={i} {...node} showLogs={props.showLogs} />
))}
</div>
);
}

function Node(props: any) {
const onStart = () => {
jsonRpc("start_instance", { instance_id: props.id });
};

const onStop = () => {
jsonRpc("stop_instance", { instance_id: props.id });
};

const onDeleteData = () => {
jsonRpc("delete_instance_data", { instance_id: props.id });
};

return (
<div className="info">
<div>
<b>Name</b>
{props.name}
</div>

<div>
<b>GRPC</b>
{props.ports.grpc}
</div>
<NodeControls isRunning={props.is_running} onStart={onStart} onStop={onStop} onDeleteData={onDeleteData} />
{props.showLogs && <div>TODO</div>}
</div>
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ function Wallet(props: any) {
jsonRpc("delete_instance_data", { instance_id: props.id });
};

const wallet = props.danWallets[0];

return (
<div className="info">
<div>
Expand All @@ -60,7 +62,8 @@ function Wallet(props: any) {
{props.ports.grpc}
</div>
<NodeControls isRunning={props.is_running} onStart={onStart} onStop={onStop} onDeleteData={onDeleteData} />
<BurnFunds instanceId={props.id} danWallet={props.danWallets[0]} />
{(wallet) ?
<BurnFunds instanceId={props.id} danWallet={wallet} /> : <></>}
{props.showLogs && <div>TODO</div>}
</div>
);
Expand Down
6 changes: 2 additions & 4 deletions applications/tari_swarm_daemon/webui/src/routes/Main.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { jsonRpc } from "../utils/json_rpc";
import { ExecutedTransaction } from "../Types.ts";
import MinotariWallet from "../components/MinotariWallet";
import NodeControls from "../components/NodeControls.tsx";
import MinotariNodes from "../components/MinotariNodes.tsx";

enum Executable {
BaseNode = 1,
Expand Down Expand Up @@ -377,7 +378,6 @@ export default function Main() {
const [vns, setVns] = useState({});
const [danWallet, setDanWallets] = useState({});
const [indexers, setIndexers] = useState({});
const [node, setNode] = useState<{ grpc: any }>();
const [logs, setLogs] = useState<any | null>({});
const [stdoutLogs, setStdoutLogs] = useState<any | null>({});
const [connectorSample, setConnectorSample] = useState(null);
Expand Down Expand Up @@ -470,7 +470,6 @@ export default function Main() {
jsonRpc("get_stdout", "miner").then((resp) => {
setStdoutLogs((state: any) => ({ ...state, miner: resp }));
});
jsonRpc("grpc_node").then((resp) => setNode({ grpc: resp }));
jsonRpc("list_instances", { by_type: null }).then(({ instances }) => setInstances(instances));
};

Expand Down Expand Up @@ -502,8 +501,7 @@ export default function Main() {
<button onClick={() => setHorizontal(!horizontal)}>Swap rows/columns</button>
<div className="label">Base layer</div>
<div className="infos">
<ShowInfo executable={Executable.BaseNode} name="node" node={node} logs={logs?.node}
stdoutLogs={stdoutLogs?.node} showLogs={showLogs} horizontal={horizontal} onReload={getInfo} />
<MinotariNodes showLogs={showLogs} />
<MinotariWallet showLogs={showLogs} />
<ShowInfo executable={Executable.Miner} name="miner" logs={logs?.miner}
stdoutLogs={stdoutLogs?.miner} showLogs={showLogs} horizontal={horizontal}>
Expand Down
3 changes: 2 additions & 1 deletion applications/tari_validator_node/src/p2p/rpc/service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,14 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl {
.map_err(|e| RpcStatus::bad_request(format!("Malformed transaction: {}", e)))?;

let transaction_id = *transaction.id();
info!(target: LOG_TARGET, "🌐 Received transaction {transaction_id} from peer");

self.mempool
.submit_transaction(transaction)
.await
.map_err(|e| RpcStatus::bad_request(format!("Invalid transaction: {}", e)))?;

debug!(target: LOG_TARGET, "Accepted instruction into mempool");
debug!(target: LOG_TARGET, "Accepted transaction {transaction_id} into mempool");

Ok(Response::new(proto::rpc::SubmitTransactionResponse {
transaction_id: transaction_id.as_bytes().to_vec(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,63 +106,6 @@ impl MempoolGossip<PeerAddress> {
self.gossip.publish_message(topic, msg.clone()).await?;
}

// let committees = self.epoch_manager.get_committees_by_shards(epoch, shards).await?;
// let local_shard = self.epoch_manager.get_local_committee_shard(epoch).await?;
// let local_committee = self.epoch_manager.get_local_committee(epoch).await?;
//
// if local_committee.is_empty() {
// error!(target: LOG_TARGET, "BUG: forward_to_foreign_replicas: get_local_committee returned empty
// committee"); return Ok(());
// }
//
// let Some(our_index) = local_committee
// .members()
// .position(|addr| addr == &self.validator_address)
// else {
// error!(target: LOG_TARGET, "BUG: forward_to_foreign_replicas: get_local_committee returned committee that
// this node is not part of"); return Ok(());
// };
//
// let mut selected_members = vec![];
// for (bucket, committee) in committees {
// // Dont forward locally
// if bucket == local_shard.bucket() {
// continue;
// }
// if exclude_bucket.map(|b| b == bucket).unwrap_or(false) {
// continue;
// }
// if committee.is_empty() {
// error!(
// target: LOG_TARGET,
// "BUG: forward_to_foreign_replicas: get_committees_by_shards returned empty committee"
// );
// continue;
// }
// let n = if local_committee.len() > committee.len() {
// // Our local committee is bigger, so we send to a single node
// 1
// } else {
// // Our local committee is smaller, so we send to a portion of their nodes
// committee.len() / local_committee.len()
// };
//
// selected_members.extend(committee.select_n_starting_from(n, our_index).cloned());
// }
//
// debug!(
// target: LOG_TARGET,
// "forward_to_foreign_replicas: {} member(s) selected",
// selected_members.len(),
// );
//
// if selected_members.is_empty() {
// return Ok(());
// }
//
// // TODO: change this to use goissipsub
// self.outbound.broadcast(selected_members.iter(), msg).await?;

Ok(())
}

Expand All @@ -172,41 +115,7 @@ impl MempoolGossip<PeerAddress> {
addresses: HashSet<SubstateAddress>,
msg: T,
) -> Result<(), MempoolError> {
// let committees = self.epoch_manager.get_committees_by_shards(epoch, shards).await?;
// let local_shard = self.epoch_manager.get_local_committee_shard(epoch).await?;
//
// let mut selected_members = vec![];
// for (bucket, committee) in committees {
// // Dont forward locally
// if bucket == local_shard.bucket() {
// continue;
// }
// if committee.is_empty() {
// error!(
// target: LOG_TARGET,
// "BUG: gossip_to_foreign_replicas: get_committees_by_shards returned empty committee"
// );
// continue;
// }
// let f = committee.max_failures();
//
// selected_members.extend(committee.select_n_random(f + 1).cloned());
// }
//
// debug!(
// target: LOG_TARGET,
// "gossip_to_foreign_replicas: {} member(s) selected",
// selected_members.len(),
// );
//
// if selected_members.is_empty() {
// return Ok(());
// }
//
// self.outbound.broadcast(selected_members.iter(), msg).await?;

self.forward_to_foreign_replicas(epoch, addresses, msg, None).await?;

Ok(())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ use crate::p2p::services::mempool::MempoolError;
pub enum MempoolRequest {
SubmitTransaction {
transaction: Box<Transaction>,
/// If true, the transaction will be propagated to peers
should_propagate: bool,
reply: oneshot::Sender<Result<(), MempoolError>>,
},
RemoveTransactions {
Expand Down Expand Up @@ -64,7 +62,6 @@ impl MempoolHandle {
self.tx_mempool_request
.send(MempoolRequest::SubmitTransaction {
transaction: Box::new(transaction),
should_propagate: true,
reply,
})
.await?;
Expand Down
Loading
Loading