Skip to content

Commit

Permalink
Merge branch 'development' into feat-resource-metadata-url
Browse files Browse the repository at this point in the history
  • Loading branch information
therealdannzor committed Sep 4, 2024
2 parents 00f3f4c + 9b6b704 commit cf78331
Show file tree
Hide file tree
Showing 133 changed files with 2,376 additions and 5,117 deletions.
31 changes: 21 additions & 10 deletions applications/tari_indexer/src/transaction_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ where
let transaction_substate_address = SubstateAddress::for_transaction_receipt(tx_hash.into_array().into());

if transaction.all_inputs_iter().next().is_none() {
self.try_with_committee(iter::once(transaction_substate_address), |mut client| {
self.try_with_committee(iter::once(transaction_substate_address), 2, |mut client| {
let transaction = transaction.clone();
async move { client.submit_transaction(transaction).await }
})
Expand All @@ -98,7 +98,7 @@ where
.all_inputs_iter()
// If there is no version specified, submit to the validator node with version 0
.map(|i| i.or_zero_version().to_substate_address());
self.try_with_committee(involved, |mut client| {
self.try_with_committee(involved, 2, |mut client| {
let transaction = transaction.clone();
async move { client.submit_transaction(transaction).await }
})
Expand All @@ -123,7 +123,7 @@ where
transaction_id: TransactionId,
) -> Result<TransactionResultStatus, TransactionManagerError> {
let transaction_substate_address = SubstateAddress::for_transaction_receipt(transaction_id.into_array().into());
self.try_with_committee(iter::once(transaction_substate_address), |mut client| async move {
self.try_with_committee(iter::once(transaction_substate_address), 1, |mut client| async move {
client.get_finalized_transaction_result(transaction_id).await.optional()
})
.await?
Expand All @@ -140,7 +140,7 @@ where
) -> Result<SubstateResult, TransactionManagerError> {
let shard = SubstateAddress::from_substate_id(&substate_address, version);

self.try_with_committee(iter::once(shard), |mut client| {
self.try_with_committee(iter::once(shard), 1, |mut client| {
// This double clone looks strange, but it's needed because this function is called in a loop
// and each iteration needs its own copy of the address (because of the move).
let substate_address = substate_address.clone();
Expand All @@ -160,6 +160,7 @@ where
async fn try_with_committee<'a, F, T, E, TFut, IShard>(
&self,
substate_addresses: IShard,
mut num_to_query: usize,
mut callback: F,
) -> Result<T, TransactionManagerError>
where
Expand All @@ -186,27 +187,37 @@ where
return Err(TransactionManagerError::NoCommitteeMembers);
}

let mut num_succeeded = 0;
let mut last_error = None;
let mut last_return = None;
for validator in all_members {
let client = self.client_provider.create_client(&validator);
match callback(client).await {
Ok(ret) => {
return Ok(ret);
num_to_query = num_to_query.saturating_sub(1);
num_succeeded += 1;
last_return = Some(ret);
if num_to_query == 0 {
break;
}
},
Err(err) => {
warn!(
target: LOG_TARGET,
"Request failed for validator '{}': {}", validator, err
);
last_error = Some(err.to_string());
continue;
},
}
}

Err(TransactionManagerError::AllValidatorsFailed {
committee_size,
last_error,
})
if num_succeeded == 0 {
return Err(TransactionManagerError::AllValidatorsFailed {
committee_size,
last_error,
});
}

Ok(last_return.expect("last_return must be Some if num_succeeded > 0"))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

import React from "react";
import { jsonRpc } from "../utils/json_rpc.tsx";
import NodeControls from "./NodeControls.tsx";

interface Props {
showLogs: boolean;
}

export default function MinotariNodes(props: Props) {
const [nodes, setNodes] = React.useState<null | [any]>(null);
const [isLoading, setIsLoading] = React.useState(true);


React.useEffect(() => {
jsonRpc("list_instances", { by_type: "MinoTariNode" }).then((nodes: any) => setNodes(nodes.instances))
.then(() => setIsLoading(false));
}, []);

if (isLoading) {
return <div>Loading...</div>;
}

return (
<div>
{nodes!.map((node: any, i: number) => (
<Node key={i} {...node} showLogs={props.showLogs} />
))}
</div>
);
}

function Node(props: any) {
const onStart = () => {
jsonRpc("start_instance", { instance_id: props.id });
};

const onStop = () => {
jsonRpc("stop_instance", { instance_id: props.id });
};

const onDeleteData = () => {
jsonRpc("delete_instance_data", { instance_id: props.id });
};

return (
<div className="info">
<div>
<b>Name</b>
{props.name}
</div>

<div>
<b>GRPC</b>
{props.ports.grpc}
</div>
<NodeControls isRunning={props.is_running} onStart={onStart} onStop={onStop} onDeleteData={onDeleteData} />
{props.showLogs && <div>TODO</div>}
</div>
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ function Wallet(props: any) {
jsonRpc("delete_instance_data", { instance_id: props.id });
};

const wallet = props.danWallets[0];

return (
<div className="info">
<div>
Expand All @@ -60,7 +62,8 @@ function Wallet(props: any) {
{props.ports.grpc}
</div>
<NodeControls isRunning={props.is_running} onStart={onStart} onStop={onStop} onDeleteData={onDeleteData} />
<BurnFunds instanceId={props.id} danWallet={props.danWallets[0]} />
{(wallet) ?
<BurnFunds instanceId={props.id} danWallet={wallet} /> : <></>}
{props.showLogs && <div>TODO</div>}
</div>
);
Expand Down
6 changes: 2 additions & 4 deletions applications/tari_swarm_daemon/webui/src/routes/Main.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { jsonRpc } from "../utils/json_rpc";
import { ExecutedTransaction } from "../Types.ts";
import MinotariWallet from "../components/MinotariWallet";
import NodeControls from "../components/NodeControls.tsx";
import MinotariNodes from "../components/MinotariNodes.tsx";

enum Executable {
BaseNode = 1,
Expand Down Expand Up @@ -377,7 +378,6 @@ export default function Main() {
const [vns, setVns] = useState({});
const [danWallet, setDanWallets] = useState({});
const [indexers, setIndexers] = useState({});
const [node, setNode] = useState<{ grpc: any }>();
const [logs, setLogs] = useState<any | null>({});
const [stdoutLogs, setStdoutLogs] = useState<any | null>({});
const [connectorSample, setConnectorSample] = useState(null);
Expand Down Expand Up @@ -470,7 +470,6 @@ export default function Main() {
jsonRpc("get_stdout", "miner").then((resp) => {
setStdoutLogs((state: any) => ({ ...state, miner: resp }));
});
jsonRpc("grpc_node").then((resp) => setNode({ grpc: resp }));
jsonRpc("list_instances", { by_type: null }).then(({ instances }) => setInstances(instances));
};

Expand Down Expand Up @@ -502,8 +501,7 @@ export default function Main() {
<button onClick={() => setHorizontal(!horizontal)}>Swap rows/columns</button>
<div className="label">Base layer</div>
<div className="infos">
<ShowInfo executable={Executable.BaseNode} name="node" node={node} logs={logs?.node}
stdoutLogs={stdoutLogs?.node} showLogs={showLogs} horizontal={horizontal} onReload={getInfo} />
<MinotariNodes showLogs={showLogs} />
<MinotariWallet showLogs={showLogs} />
<ShowInfo executable={Executable.Miner} name="miner" logs={logs?.miner}
stdoutLogs={stdoutLogs?.miner} showLogs={showLogs} horizontal={horizontal}>
Expand Down
3 changes: 2 additions & 1 deletion applications/tari_validator_node/src/p2p/rpc/service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,14 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl {
.map_err(|e| RpcStatus::bad_request(format!("Malformed transaction: {}", e)))?;

let transaction_id = *transaction.id();
info!(target: LOG_TARGET, "🌐 Received transaction {transaction_id} from peer");

self.mempool
.submit_transaction(transaction)
.await
.map_err(|e| RpcStatus::bad_request(format!("Invalid transaction: {}", e)))?;

debug!(target: LOG_TARGET, "Accepted instruction into mempool");
debug!(target: LOG_TARGET, "Accepted transaction {transaction_id} into mempool");

Ok(Response::new(proto::rpc::SubmitTransactionResponse {
transaction_id: transaction_id.as_bytes().to_vec(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,63 +106,6 @@ impl MempoolGossip<PeerAddress> {
self.gossip.publish_message(topic, msg.clone()).await?;
}

// let committees = self.epoch_manager.get_committees_by_shards(epoch, shards).await?;
// let local_shard = self.epoch_manager.get_local_committee_shard(epoch).await?;
// let local_committee = self.epoch_manager.get_local_committee(epoch).await?;
//
// if local_committee.is_empty() {
// error!(target: LOG_TARGET, "BUG: forward_to_foreign_replicas: get_local_committee returned empty
// committee"); return Ok(());
// }
//
// let Some(our_index) = local_committee
// .members()
// .position(|addr| addr == &self.validator_address)
// else {
// error!(target: LOG_TARGET, "BUG: forward_to_foreign_replicas: get_local_committee returned committee that
// this node is not part of"); return Ok(());
// };
//
// let mut selected_members = vec![];
// for (bucket, committee) in committees {
// // Dont forward locally
// if bucket == local_shard.bucket() {
// continue;
// }
// if exclude_bucket.map(|b| b == bucket).unwrap_or(false) {
// continue;
// }
// if committee.is_empty() {
// error!(
// target: LOG_TARGET,
// "BUG: forward_to_foreign_replicas: get_committees_by_shards returned empty committee"
// );
// continue;
// }
// let n = if local_committee.len() > committee.len() {
// // Our local committee is bigger, so we send to a single node
// 1
// } else {
// // Our local committee is smaller, so we send to a portion of their nodes
// committee.len() / local_committee.len()
// };
//
// selected_members.extend(committee.select_n_starting_from(n, our_index).cloned());
// }
//
// debug!(
// target: LOG_TARGET,
// "forward_to_foreign_replicas: {} member(s) selected",
// selected_members.len(),
// );
//
// if selected_members.is_empty() {
// return Ok(());
// }
//
// // TODO: change this to use goissipsub
// self.outbound.broadcast(selected_members.iter(), msg).await?;

Ok(())
}

Expand All @@ -172,41 +115,7 @@ impl MempoolGossip<PeerAddress> {
addresses: HashSet<SubstateAddress>,
msg: T,
) -> Result<(), MempoolError> {
// let committees = self.epoch_manager.get_committees_by_shards(epoch, shards).await?;
// let local_shard = self.epoch_manager.get_local_committee_shard(epoch).await?;
//
// let mut selected_members = vec![];
// for (bucket, committee) in committees {
// // Dont forward locally
// if bucket == local_shard.bucket() {
// continue;
// }
// if committee.is_empty() {
// error!(
// target: LOG_TARGET,
// "BUG: gossip_to_foreign_replicas: get_committees_by_shards returned empty committee"
// );
// continue;
// }
// let f = committee.max_failures();
//
// selected_members.extend(committee.select_n_random(f + 1).cloned());
// }
//
// debug!(
// target: LOG_TARGET,
// "gossip_to_foreign_replicas: {} member(s) selected",
// selected_members.len(),
// );
//
// if selected_members.is_empty() {
// return Ok(());
// }
//
// self.outbound.broadcast(selected_members.iter(), msg).await?;

self.forward_to_foreign_replicas(epoch, addresses, msg, None).await?;

Ok(())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ use crate::p2p::services::mempool::MempoolError;
pub enum MempoolRequest {
SubmitTransaction {
transaction: Box<Transaction>,
/// If true, the transaction will be propagated to peers
should_propagate: bool,
reply: oneshot::Sender<Result<(), MempoolError>>,
},
RemoveTransactions {
Expand Down Expand Up @@ -64,7 +62,6 @@ impl MempoolHandle {
self.tx_mempool_request
.send(MempoolRequest::SubmitTransaction {
transaction: Box::new(transaction),
should_propagate: true,
reply,
})
.await?;
Expand Down
Loading

0 comments on commit cf78331

Please sign in to comment.