Skip to content

Commit efc17f5

Browse files
authored
fix: spawn transaction senders in exporter (#134)
1 parent 756495f commit efc17f5

File tree

4 files changed

+35
-33
lines changed

4 files changed

+35
-33
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "pyth-agent"
3-
version = "2.10.1"
3+
version = "2.10.2"
44
edition = "2021"
55

66
[[bin]]

rust-toolchain.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
[toolchain]
2-
channel = "stable"
2+
channel = "1.79.0"
33
profile = "minimal"
44
components = ["rustfmt", "clippy"]

src/agent/services/exporter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ mod exporter {
262262
config.exporter.unchanged_publish_threshold,
263263
).await {
264264
if let Err(err) = publish_batches(
265-
&*state,
265+
state.clone(),
266266
client.clone(),
267267
network,
268268
&network_state_rx,

src/agent/state/exporter.rs

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ async fn estimate_compute_unit_price_micro_lamports(
429429
)
430430
)]
431431
pub async fn publish_batches<S>(
432-
state: &S,
432+
state: Arc<S>,
433433
client: Arc<RpcClient>,
434434
network: Network,
435435
network_state_rx: &watch::Receiver<NetworkState>,
@@ -466,7 +466,7 @@ where
466466
let network_state = *network_state_rx.borrow();
467467
for batch in batches {
468468
batch_futures.push(publish_batch(
469-
state,
469+
state.clone(),
470470
client.clone(),
471471
network,
472472
network_state,
@@ -494,7 +494,7 @@ where
494494
.into_iter()
495495
.collect::<Result<Vec<_>>>()?;
496496

497-
Exporter::record_publish(state, batch_state).await;
497+
Exporter::record_publish(&*state, batch_state).await;
498498
Ok(())
499499
}
500500

@@ -509,7 +509,7 @@ where
509509
)
510510
)]
511511
async fn publish_batch<S>(
512-
state: &S,
512+
state: Arc<S>,
513513
client: Arc<RpcClient>,
514514
network: Network,
515515
network_state: NetworkState,
@@ -535,7 +535,7 @@ where
535535
let mut instructions = Vec::new();
536536

537537
// Refresh the data in the batch
538-
let local_store_contents = LocalStore::get_all_price_infos(state).await;
538+
let local_store_contents = LocalStore::get_all_price_infos(&*state).await;
539539
let refreshed_batch = batch.iter().map(|(identifier, _)| {
540540
(
541541
identifier,
@@ -615,7 +615,7 @@ where
615615
// Use the estimated previous price if it is higher
616616
// than the current price.
617617
let recent_compute_unit_price_micro_lamports =
618-
Exporter::get_recent_compute_unit_price_micro_lamports(state).await;
618+
Exporter::get_recent_compute_unit_price_micro_lamports(&*state).await;
619619

620620
if let Some(estimated_recent_price) = recent_compute_unit_price_micro_lamports {
621621
// Get the estimated compute unit price and wrap it so it stays below the maximum
@@ -633,7 +633,7 @@ where
633633
// in this batch. This will use the maximum total compute unit fee if the publisher
634634
// hasn't updated for >= MAXIMUM_SLOT_GAP_FOR_DYNAMIC_COMPUTE_UNIT_PRICE slots.
635635
let result = GlobalStore::price_accounts(
636-
state,
636+
&*state,
637637
network,
638638
price_accounts.clone().into_iter().collect(),
639639
)
@@ -697,31 +697,33 @@ where
697697
network_state.blockhash,
698698
);
699699

700-
let signature = match client
701-
.send_transaction_with_config(
702-
&transaction,
703-
RpcSendTransactionConfig {
704-
skip_preflight: true,
705-
..RpcSendTransactionConfig::default()
706-
},
707-
)
708-
.await
709-
{
710-
Ok(signature) => signature,
711-
Err(err) => {
712-
tracing::error!(err = ?err, "Exporter: failed to send transaction.");
713-
return Ok(());
714-
}
715-
};
700+
tokio::spawn(async move {
701+
let signature = match client
702+
.send_transaction_with_config(
703+
&transaction,
704+
RpcSendTransactionConfig {
705+
skip_preflight: true,
706+
..RpcSendTransactionConfig::default()
707+
},
708+
)
709+
.await
710+
{
711+
Ok(signature) => signature,
712+
Err(err) => {
713+
tracing::error!(err = ?err, "Exporter: failed to send transaction.");
714+
return;
715+
}
716+
};
716717

717-
tracing::debug!(
718-
signature = signature.to_string(),
719-
instructions = instructions.len(),
720-
price_accounts = ?price_accounts,
721-
"Sent upd_price transaction.",
722-
);
718+
tracing::debug!(
719+
signature = signature.to_string(),
720+
instructions = instructions.len(),
721+
price_accounts = ?price_accounts,
722+
"Sent upd_price transaction.",
723+
);
723724

724-
Transactions::add_transaction(state, signature).await;
725+
Transactions::add_transaction(&*state, signature).await;
726+
});
725727

726728
Ok(())
727729
}

0 commit comments

Comments
 (0)