Skip to content

Commit

Permalink
done: bugfix - WsServer - pair_average_price_repositories - made dist…
Browse files Browse the repository at this point in the history
…inct repository driver instance for every pair
  • Loading branch information
formiat committed Feb 22, 2022
1 parent c23418d commit 5d44b8b
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 71 deletions.
6 changes: 3 additions & 3 deletions src/config_scheme/repositories_prepared.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::config_scheme::config_scheme::ConfigScheme;
use crate::repository::repositories::{
Repositories, RepositoriesByMarketName, RepositoryForF64ByTimestamp,
MarketRepositoriesByMarketName, Repositories, WorkerRepositoriesByPairTuple,
};
use crate::worker::market_helpers::pair_average_price::{
make_pair_average_price, PairAveragePriceType,
Expand All @@ -10,8 +10,8 @@ use crate::worker::network_helpers::ws_server::ws_channels_holder::{
};

pub struct RepositoriesPrepared {
pub pair_average_price_repository: Option<RepositoryForF64ByTimestamp>,
pub market_repositories: Option<RepositoriesByMarketName>,
pub pair_average_price_repository: Option<WorkerRepositoriesByPairTuple>,
pub market_repositories: Option<MarketRepositoriesByMarketName>,
pub ws_channels_holder: WsChannelsHolderHashMap,
pub pair_average_price: PairAveragePriceType,
}
Expand Down
13 changes: 8 additions & 5 deletions src/helper_functions.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::config_scheme::config_scheme::ConfigScheme;
use crate::config_scheme::storage::Storage;
use crate::repository::repositories::{Repositories, RepositoryForF64ByTimestamp};
use crate::repository::repositories::{
Repositories, RepositoryForF64ByTimestamp, WorkerRepositoriesByPairTuple,
};
use crate::worker::helper_functions::date_time_from_timestamp_sec;
use chrono::{DateTime, Utc};
use clap::ArgMatches;
Expand Down Expand Up @@ -76,7 +78,7 @@ fn get_daily_prices(
Some(prices)
}

fn make_repository(config: &ConfigScheme) -> RepositoryForF64ByTimestamp {
fn make_repositories(config: &ConfigScheme) -> WorkerRepositoriesByPairTuple {
match config.service.storage.as_ref().unwrap() {
Storage::Sled(tree) => Repositories::make_pair_average_price_sled(config, Arc::clone(tree)),
}
Expand Down Expand Up @@ -110,14 +112,15 @@ pub fn fill_historical_data(config: &ConfigScheme) {
let coins: Vec<String> = coins.split(',').map(|v| v.to_string()).collect();
assert!(!coins.is_empty());

let repository = make_repository(config);
let mut repositories = make_repositories(config);

let mut threads = Vec::new();
for coin in coins {
let pair_tuple = (coin.to_string(), "USD".to_string());
let prices = get_daily_prices(&coin, timestamp_to, day_count).unwrap();
let repository_2 = repository.clone();
let repository = repositories.remove(&pair_tuple).unwrap();

let thread = thread::spawn(move || fill_storage(repository_2, prices, coin));
let thread = thread::spawn(move || fill_storage(repository, prices, coin));
threads.push(thread);

// To prevent DDoS attack on cryptocompare.com
Expand Down
46 changes: 32 additions & 14 deletions src/repository/repositories.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ use std::collections::HashMap;
use std::sync::{Arc, Mutex};

pub type RepositoryForF64ByTimestamp = Box<dyn Repository<DateTime<Utc>, f64> + Send>;
pub type RepositoriesByMarketValue = HashMap<MarketValue, RepositoryForF64ByTimestamp>;
pub type RepositoriesByPairTuple = HashMap<(String, String), RepositoriesByMarketValue>;
pub type RepositoriesByMarketName = HashMap<String, RepositoriesByPairTuple>;
pub type WorkerRepositoriesByPairTuple = HashMap<(String, String), RepositoryForF64ByTimestamp>;
pub type MarketRepositoriesByMarketValue = HashMap<MarketValue, RepositoryForF64ByTimestamp>;
pub type MarketRepositoriesByPairTuple = HashMap<(String, String), MarketRepositoriesByMarketValue>;
pub type MarketRepositoriesByMarketName = HashMap<String, MarketRepositoriesByPairTuple>;

pub struct Repositories {
pub pair_average_price: RepositoryForF64ByTimestamp,
pub market_repositories: RepositoriesByMarketName,
pub pair_average_price: WorkerRepositoriesByPairTuple,
pub market_repositories: MarketRepositoriesByMarketName,
}

impl Repositories {
Expand All @@ -41,8 +42,8 @@ impl Repositories {
pub fn optionize_fields(
config: Option<Self>,
) -> (
Option<RepositoryForF64ByTimestamp>,
Option<RepositoriesByMarketName>,
Option<WorkerRepositoriesByPairTuple>,
Option<MarketRepositoriesByMarketName>,
) {
match config {
Some(config) => (
Expand All @@ -56,18 +57,35 @@ impl Repositories {
pub fn make_pair_average_price_sled(
config: &ConfigScheme,
tree: Arc<Mutex<vsdbsled::Db>>,
) -> RepositoryForF64ByTimestamp {
Box::new(F64ByTimestampSled::new(
"worker__".to_string() + &MarketValue::PairAveragePrice.to_string(),
Arc::clone(&tree),
config.service.historical_storage_frequency_ms,
))
) -> WorkerRepositoriesByPairTuple {
let market_config = &config.market;
let service_config = &config.service;

let mut hash_map = HashMap::new();
for exchange_pair in &market_config.exchange_pairs {
let pair = format!("{}_{}", exchange_pair.pair.0, exchange_pair.pair.1);
let entity_name = format!(
"worker__{}__{}",
MarketValue::PairAveragePrice.to_string(),
pair
);

let repository: RepositoryForF64ByTimestamp = Box::new(F64ByTimestampSled::new(
entity_name,
Arc::clone(&tree),
service_config.historical_storage_frequency_ms,
));

hash_map.insert(exchange_pair.pair.clone(), repository);
}

hash_map
}

fn make_market_repositories_sled(
config: &ConfigScheme,
tree: Arc<Mutex<vsdbsled::Db>>,
) -> RepositoriesByMarketName {
) -> MarketRepositoriesByMarketName {
let market_config = &config.market;
let service_config = &config.service;

Expand Down
4 changes: 2 additions & 2 deletions src/worker/market_helpers/exchange_pair_info.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::repository::repositories::RepositoriesByMarketValue;
use crate::repository::repositories::MarketRepositoriesByMarketValue;
use crate::worker::market_helpers::market_value::MarketValue;
use crate::worker::market_helpers::stored_and_ws_transmissible_f64::StoredAndWsTransmissibleF64;
use crate::worker::network_helpers::ws_server::ws_channel_name::WsChannelName;
Expand All @@ -17,7 +17,7 @@ pub struct ExchangePairInfo {

impl ExchangePairInfo {
pub fn new(
repositories: Option<RepositoriesByMarketValue>,
repositories: Option<MarketRepositoriesByMarketValue>,
ws_channels_holder: &WsChannelsHolderHashMap,
market_name: String,
pair: (String, String),
Expand Down
8 changes: 5 additions & 3 deletions src/worker/market_helpers/market.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::repository::repositories::{RepositoriesByMarketValue, RepositoriesByPairTuple};
use crate::repository::repositories::{
MarketRepositoriesByMarketValue, MarketRepositoriesByPairTuple,
};
use crate::worker::helper_functions::get_pair_ref;
use crate::worker::market_helpers::exchange_pair::ExchangePair;
use crate::worker::market_helpers::market_channels::MarketChannels;
Expand Down Expand Up @@ -26,7 +28,7 @@ use std::time;
pub fn market_factory(
mut spine: MarketSpine,
exchange_pairs: Vec<ExchangePair>,
repositories: Option<RepositoriesByPairTuple>,
repositories: Option<MarketRepositoriesByPairTuple>,
ws_channels_holder: &WsChannelsHolderHashMap,
) -> Arc<Mutex<dyn Market + Send>> {
let mut repositories = repositories.unwrap_or_default();
Expand Down Expand Up @@ -294,7 +296,7 @@ pub trait Market {
fn add_exchange_pair(
&mut self,
exchange_pair: ExchangePair,
repositories: Option<RepositoriesByMarketValue>,
repositories: Option<MarketRepositoriesByMarketValue>,
ws_channels_holder: &WsChannelsHolderHashMap,
) {
let pair_string = self.make_pair(get_pair_ref(&exchange_pair.pair));
Expand Down
4 changes: 2 additions & 2 deletions src/worker/market_helpers/market_spine.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::repository::repositories::RepositoriesByMarketValue;
use crate::repository::repositories::MarketRepositoriesByMarketValue;
use crate::worker::market_helpers::conversion_type::ConversionType;
use crate::worker::market_helpers::exchange_pair::ExchangePair;
use crate::worker::market_helpers::exchange_pair_info::ExchangePairInfo;
Expand Down Expand Up @@ -108,7 +108,7 @@ impl MarketSpine {
&mut self,
pair_string: String,
exchange_pair: ExchangePair,
repositories: Option<RepositoriesByMarketValue>,
repositories: Option<MarketRepositoriesByMarketValue>,
ws_channels_holder: &WsChannelsHolderHashMap,
) {
self.exchange_pairs.insert(
Expand Down
8 changes: 5 additions & 3 deletions src/worker/market_helpers/pair_average_price.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::config_scheme::market_config::MarketConfig;
use crate::repository::repositories::RepositoryForF64ByTimestamp;
use crate::repository::repositories::WorkerRepositoriesByPairTuple;
use crate::worker::market_helpers::market_value::MarketValue;
use crate::worker::market_helpers::stored_and_ws_transmissible_f64::StoredAndWsTransmissibleF64;
use crate::worker::network_helpers::ws_server::ws_channel_name::WsChannelName;
Expand All @@ -11,7 +11,7 @@ pub type PairAveragePriceType = HashMap<(String, String), Arc<Mutex<StoredAndWsT

pub fn make_pair_average_price(
market_config: &MarketConfig,
repository: Option<RepositoryForF64ByTimestamp>,
mut repository: Option<WorkerRepositoriesByPairTuple>,
ws_channels_holder: &WsChannelsHolderHashMap,
) -> PairAveragePriceType {
let mut hash_map = HashMap::new();
Expand All @@ -26,7 +26,9 @@ pub fn make_pair_average_price(
let ws_channels = ws_channels_holder.get(&key).unwrap();

let pair_average_price = Arc::new(Mutex::new(StoredAndWsTransmissibleF64::new(
repository.clone(),
repository
.as_mut()
.map(|v| v.remove(&exchange_pair.pair).unwrap()),
vec![
WsChannelName::CoinAveragePrice,
WsChannelName::CoinAveragePriceCandles,
Expand Down
12 changes: 12 additions & 0 deletions src/worker/network_helpers/ws_server/requests/ws_method_request.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::worker::network_helpers::ws_server::interval::Interval;
use crate::worker::network_helpers::ws_server::jsonrpc_request::JsonRpcId;
use crate::worker::network_helpers::ws_server::ws_channel_name::WsChannelName;

#[derive(Debug, Clone)]
pub enum WsMethodRequest {
Expand All @@ -18,3 +19,14 @@ pub enum WsMethodRequest {
to: Option<u64>,
},
}

impl WsMethodRequest {
pub fn get_method(&self) -> WsChannelName {
match self {
Self::CoinAveragePriceHistorical { .. } => WsChannelName::CoinAveragePriceHistorical,
Self::CoinAveragePriceCandlesHistorical { .. } => {
WsChannelName::CoinAveragePriceCandlesHistorical
}
}
}
}
Loading

0 comments on commit 5d44b8b

Please sign in to comment.