Skip to content

Commit 2400a6b

Browse files
authored
oracle.rs: Account -> Entry for clarity, lookup mapping with fewer requests (#53)
* oracle.rs: Account -> Entry for clarity, lookup with fewer requests * oracle.rs: Clarify how the hashmaps are used * oracle.rs: Add a batching option to on chain lookups
1 parent e0c3be4 commit 2400a6b

File tree

5 files changed

+137
-86
lines changed

5 files changed

+137
-86
lines changed

config/config.toml

+7
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,13 @@ key_store.root_path = "/path/to/keystore"
3131
# Whether subscribing to account updates over websocket is enabled
3232
# oracle.subscriber_enabled = true
3333

34+
# Ask the RPC for up to this many product/price accounts in a
35+
# single request. Tune this setting if you're experiencing
36+
# timeouts on data fetching. In order to keep concurrent open
37+
# socket count at bay, the batches are looked up sequentially,
38+
# trading off overall time it takes to fetch all symbols.
39+
# oracle.max_lookup_batch_size = 200
40+
3441
# Duration of the interval at which to refresh the cached network state (current slot and blockhash).
3542
# It is recommended to set this to slightly less than the network's block time,
3643
# as the slot fetched will be used as the time of the price update.

src/agent/metrics.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use {
22
super::{
3-
solana::oracle::PriceAccount,
3+
solana::oracle::PriceEntry,
44
store::{
55
global::{
66
AllAccountsData,
@@ -262,7 +262,7 @@ pub struct DashboardSymbolView {
262262
#[derive(Debug)]
263263
pub struct DashboardPriceView {
264264
local_data: Option<PriceInfo>,
265-
global_data: Option<PriceAccount>,
265+
global_data: Option<PriceEntry>,
266266
global_metadata: Option<PriceAccountMetadata>,
267267
}
268268

src/agent/pythd/adapter.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ impl Adapter {
363363
}
364364

365365
fn solana_product_account_to_pythd_api_product_account(
366-
product_account: &solana::oracle::ProductAccount,
366+
product_account: &solana::oracle::ProductEntry,
367367
all_accounts_data: &AllAccountsData,
368368
product_account_key: &solana_sdk::pubkey::Pubkey,
369369
) -> ProductAccount {
@@ -947,7 +947,7 @@ mod tests {
947947
"CkMrDWtmFJZcmAUC11qNaWymbXQKvnRx4cq1QudLav7t",
948948
)
949949
.unwrap(),
950-
solana::oracle::ProductAccount {
950+
solana::oracle::ProductEntry {
951951
account_data: pyth_sdk_solana::state::ProductAccount {
952952
magic: 0xa1b2c3d4,
953953
ver: 6,
@@ -1006,7 +1006,7 @@ mod tests {
10061006
"BjHoZWRxo9dgbR1NQhPyTiUs6xFiX6mGS4TMYvy3b2yc",
10071007
)
10081008
.unwrap(),
1009-
solana::oracle::ProductAccount {
1009+
solana::oracle::ProductEntry {
10101010
account_data: pyth_sdk_solana::state::ProductAccount {
10111011
magic: 0xa1b2c3d4,
10121012
ver: 5,

src/agent/solana/oracle.rs

+115-71
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ use {
1010
Context,
1111
Result,
1212
},
13-
futures_util::future::join_all,
1413
pyth_sdk_solana::state::{
1514
load_mapping_account,
1615
load_price_account,
@@ -35,7 +34,6 @@ use {
3534
HashMap,
3635
HashSet,
3736
},
38-
iter::zip,
3937
time::Duration,
4038
},
4139
tokio::{
@@ -48,15 +46,15 @@ use {
4846
#[derive(Default, Debug, Clone)]
4947
pub struct Data {
5048
pub mapping_accounts: HashMap<Pubkey, MappingAccount>,
51-
pub product_accounts: HashMap<Pubkey, ProductAccount>,
52-
pub price_accounts: HashMap<Pubkey, PriceAccount>,
49+
pub product_accounts: HashMap<Pubkey, ProductEntry>,
50+
pub price_accounts: HashMap<Pubkey, PriceEntry>,
5351
}
5452

5553
impl Data {
5654
fn new(
5755
mapping_accounts: HashMap<Pubkey, MappingAccount>,
58-
product_accounts: HashMap<Pubkey, ProductAccount>,
59-
price_accounts: HashMap<Pubkey, PriceAccount>,
56+
product_accounts: HashMap<Pubkey, ProductEntry>,
57+
price_accounts: HashMap<Pubkey, PriceEntry>,
6058
) -> Self {
6159
Data {
6260
mapping_accounts,
@@ -68,11 +66,11 @@ impl Data {
6866

6967
pub type MappingAccount = pyth_sdk_solana::state::MappingAccount;
7068
#[derive(Debug, Clone)]
71-
pub struct ProductAccount {
69+
pub struct ProductEntry {
7270
pub account_data: pyth_sdk_solana::state::ProductAccount,
7371
pub price_accounts: Vec<Pubkey>,
7472
}
75-
pub type PriceAccount = pyth_sdk_solana::state::PriceAccount;
73+
pub type PriceEntry = pyth_sdk_solana::state::PriceAccount;
7674

7775
// Oracle is responsible for fetching Solana account data stored in the Pyth on-chain Oracle.
7876
pub struct Oracle {
@@ -105,6 +103,13 @@ pub struct Config {
105103
pub updates_channel_capacity: usize,
106104
/// Capacity of the channel over which the Poller sends data to the Oracle
107105
pub data_channel_capacity: usize,
106+
107+
/// Ask the RPC for up to this many product/price accounts in a
108+
/// single request. Tune this setting if you're experiencing
109+
/// timeouts on data fetching. In order to keep concurrent open
110+
/// socket count at bay, the batches are looked up sequentially,
111+
/// trading off overall time it takes to fetch all symbols.
112+
pub max_lookup_batch_size: usize,
108113
}
109114

110115
impl Default for Config {
@@ -115,6 +120,7 @@ impl Default for Config {
115120
subscriber_enabled: true,
116121
updates_channel_capacity: 10000,
117122
data_channel_capacity: 10000,
123+
max_lookup_batch_size: 200,
118124
}
119125
}
120126
}
@@ -154,6 +160,7 @@ pub fn spawn_oracle(
154160
rpc_timeout,
155161
config.commitment,
156162
config.poll_interval_duration,
163+
config.max_lookup_batch_size,
157164
logger.clone(),
158165
);
159166
jhs.push(tokio::spawn(async move { poller.run().await }));
@@ -292,7 +299,7 @@ impl Oracle {
292299
async fn notify_product_account_update(
293300
&self,
294301
account_key: &Pubkey,
295-
account: &ProductAccount,
302+
account: &ProductEntry,
296303
) -> Result<()> {
297304
self.global_store_tx
298305
.send(global::Update::ProductAccountUpdate {
@@ -306,7 +313,7 @@ impl Oracle {
306313
async fn notify_price_account_update(
307314
&self,
308315
account_key: &Pubkey,
309-
account: &PriceAccount,
316+
account: &PriceEntry,
310317
) -> Result<()> {
311318
self.global_store_tx
312319
.send(global::Update::PriceAccountUpdate {
@@ -331,6 +338,9 @@ struct Poller {
331338
/// The interval with which to poll for data
332339
poll_interval: Interval,
333340

341+
/// Passed from Oracle config
342+
max_lookup_batch_size: usize,
343+
334344
/// Logger
335345
logger: Logger,
336346
}
@@ -343,6 +353,7 @@ impl Poller {
343353
rpc_timeout: Duration,
344354
commitment: CommitmentLevel,
345355
poll_interval_duration: Duration,
356+
max_lookup_batch_size: usize,
346357
logger: Logger,
347358
) -> Self {
348359
let rpc_client = RpcClient::new_with_timeout_and_commitment(
@@ -357,6 +368,7 @@ impl Poller {
357368
mapping_account_key,
358369
rpc_client,
359370
poll_interval,
371+
max_lookup_batch_size,
360372
logger,
361373
}
362374
}
@@ -419,88 +431,120 @@ impl Poller {
419431
async fn fetch_product_and_price_accounts<'a, A>(
420432
&self,
421433
mapping_accounts: A,
422-
) -> Result<(
423-
HashMap<Pubkey, ProductAccount>,
424-
HashMap<Pubkey, PriceAccount>,
425-
)>
434+
) -> Result<(HashMap<Pubkey, ProductEntry>, HashMap<Pubkey, PriceEntry>)>
426435
where
427436
A: IntoIterator<Item = &'a MappingAccount>,
428437
{
429-
let mut pubkeys = vec![];
430-
let mut futures = vec![];
438+
let mut product_keys = vec![];
431439

432-
// Fetch all product accounts in parallel
440+
// Get all product keys
433441
for mapping_account in mapping_accounts {
434442
for account_key in mapping_account
435443
.products
436444
.iter()
437445
.filter(|pubkey| **pubkey != Pubkey::default())
438446
{
439-
pubkeys.push(account_key.clone());
440-
futures.push(self.fetch_product_account(account_key));
447+
product_keys.push(account_key.clone());
441448
}
442449
}
443450

444-
let future_results = join_all(futures)
445-
.await
446-
.into_iter()
447-
.collect::<Result<Vec<_>>>()?;
448-
449-
let product_accounts = zip(
450-
pubkeys.into_iter(),
451-
future_results
452-
.clone()
453-
.into_iter()
454-
.map(|(product_account, _)| product_account),
455-
)
456-
.collect();
457-
458-
let price_accounts = future_results
459-
.into_iter()
460-
.flat_map(|(_, price_accounts)| price_accounts.into_iter())
461-
.collect();
462-
463-
Ok((product_accounts, price_accounts))
451+
let mut product_entries = HashMap::new();
452+
let mut price_entries = HashMap::new();
453+
454+
// Lookup products and their prices using the configured batch size
455+
for product_key_batch in product_keys.as_slice().chunks(self.max_lookup_batch_size) {
456+
let (mut batch_products, mut batch_prices) = self
457+
.fetch_batch_of_product_and_price_accounts(product_key_batch)
458+
.await?;
459+
460+
product_entries.extend(batch_products.drain());
461+
price_entries.extend(batch_prices.drain());
462+
}
463+
464+
Ok((product_entries, price_entries))
464465
}
465466

466-
async fn fetch_product_account(
467+
async fn fetch_batch_of_product_and_price_accounts(
467468
&self,
468-
product_account_key: &Pubkey,
469-
) -> Result<(ProductAccount, HashMap<Pubkey, PriceAccount>)> {
470-
// Fetch the product account
471-
let product_account = *load_product_account(
472-
&self
473-
.rpc_client
474-
.get_account_data(product_account_key)
475-
.await?,
476-
)
477-
.with_context(|| format!("load product account {}", product_account_key))?;
478-
479-
// Fetch the price accounts associated with this product account
480-
let mut price_accounts = HashMap::new();
481-
let mut price_account_key = product_account.px_acc;
482-
while price_account_key != Pubkey::default() {
483-
let price_account = self.fetch_price_account(&price_account_key).await?;
484-
price_accounts.insert(price_account_key, price_account);
485-
486-
price_account_key = price_account.next;
469+
product_key_batch: &[Pubkey],
470+
) -> Result<(HashMap<Pubkey, ProductEntry>, HashMap<Pubkey, PriceEntry>)> {
471+
let mut product_entries = HashMap::new();
472+
473+
let product_keys = product_key_batch;
474+
475+
// Look up the batch with a single request
476+
let product_accounts = self.rpc_client.get_multiple_accounts(product_keys).await?;
477+
478+
// Log missing products, fill the product entries with initial values
479+
for (product_key, product_account) in product_keys.iter().zip(product_accounts) {
480+
if let Some(prod_acc) = product_account {
481+
let product = load_product_account(prod_acc.data.as_slice())
482+
.context(format!("Could not parse product account {}", product_key))?;
483+
484+
product_entries.insert(
485+
*product_key,
486+
ProductEntry {
487+
account_data: *product,
488+
price_accounts: vec![],
489+
},
490+
);
491+
} else {
492+
warn!(self.logger, "Oracle: Could not find product on chain, skipping";
493+
"product_key" => product_key.to_string(),);
494+
}
487495
}
488496

489-
// Create the product account object
490-
let product_account = ProductAccount {
491-
account_data: product_account,
492-
price_accounts: price_accounts.keys().cloned().collect(),
493-
};
497+
let mut price_entries = HashMap::new();
494498

495-
Ok((product_account, price_accounts))
496-
}
499+
// Starting with top-level prices, look up price accounts in
500+
// batches, filling price entries and adding found prices to
501+
// the product entries
502+
let mut todo = product_entries
503+
.values()
504+
.map(|p| p.account_data.px_acc)
505+
.collect::<Vec<_>>();
506+
507+
while !todo.is_empty() {
508+
let price_accounts = self
509+
.rpc_client
510+
.get_multiple_accounts(todo.as_slice())
511+
.await?;
497512

498-
async fn fetch_price_account(&self, price_account_key: &Pubkey) -> Result<PriceAccount> {
499-
let data = self.rpc_client.get_account_data(price_account_key).await?;
500-
let price_account = *load_price_account(&data)
501-
.with_context(|| format!("load price account {}", price_account_key))?;
513+
// Any non-zero price.next pubkey will be gathered here and looked up on next iteration
514+
let mut next_todo = vec![];
515+
516+
// Process the response of each lookup request. If there's
517+
// a next price, it will be looked up on next iteration,
518+
// as todo gets replaced with next_todo.
519+
for (price_key, price_account) in todo.iter().zip(price_accounts) {
520+
if let Some(price_acc) = price_account {
521+
let price = load_price_account(&price_acc.data)
522+
.context(format!("Could not parse price account at {}", price_key))?;
523+
524+
if let Some(prod) = product_entries.get_mut(&price.prod) {
525+
prod.price_accounts.push(*price_key);
526+
price_entries.insert(*price_key, *price);
527+
} else {
528+
warn!(self.logger, "Could not find product entry for price, listed in its prod field, skipping";
529+
"missing_product" => price.prod.to_string(),
530+
"price_key" => price_key.to_string(),
531+
);
532+
533+
continue;
534+
}
535+
536+
if price.next != Pubkey::default() {
537+
next_todo.push(price.next.clone());
538+
}
539+
} else {
540+
warn!(self.logger, "Could not look up price account on chain, skipping"; "price_key" => price_key.to_string(),);
541+
continue;
542+
}
543+
}
502544

503-
Ok(price_account)
545+
todo = next_todo;
546+
}
547+
Ok((product_entries, price_entries))
504548
}
505549
}
506550

0 commit comments

Comments
 (0)