Skip to content

Commit

Permalink
Merge #1204: Overhaul core Tracker: extract torrents context (part 2)
Browse files Browse the repository at this point in the history
046578d refactor: [#1203] use directly the InMemoryTorrentRepository (Jose Celano)
0f1b2fb refactor: [#1203] use InMemoryTorrentRepository directly in core tracker tests (Jose Celano)
2ac68f6 refactor: [#1203] move test (Jose Celano)
94673d6 refactor: [#1203] inline methods in core tracker (Jose Celano)

Pull request description:

  This is part 2 of the refactor initiated [here](#1202).

  The core `Tracker` after this refactor:

  ```rust
  pub struct Tracker {
      /// The tracker configuration.
      config: Core,

      /// The service to check is a torrent is whitelisted.
      whitelist_authorization: Arc<whitelist::authorization::Authorization>,

      /// The in-memory torrents repository.
      in_memory_torrent_repository: Arc<InMemoryTorrentRepository>,

      /// The persistent torrents repository.
      db_torrent_repository: Arc<DatabasePersistentTorrentRepository>,
  }
  ```

ACKs for top commit:
  josecelano:
    ACK 046578d

Tree-SHA512: db32af87815375ca570b81392c392dae0b602d12f54543c413d34f6a8dcb69f7e2f56a56e5d552a271d7fe2e4f751277d9f6e204caee297a2e3333f6a7b77e24
  • Loading branch information
josecelano committed Jan 24, 2025
2 parents a4277a7 + 046578d commit 731fd01
Show file tree
Hide file tree
Showing 17 changed files with 306 additions and 207 deletions.
2 changes: 1 addition & 1 deletion src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ pub async fn start(config: &Configuration, app_container: &AppContainer) -> Vec<
if let Some(http_api_config) = &config.http_api {
if let Some(job) = tracker_apis::start_job(
http_api_config,
app_container.tracker.clone(),
app_container.in_memory_torrent_repository.clone(),
app_container.keys_handler.clone(),
app_container.whitelist_manager.clone(),
app_container.ban_service.clone(),
Expand Down
14 changes: 6 additions & 8 deletions src/bootstrap/jobs/tracker_apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use super::make_rust_tls;
use crate::core::authentication::handler::KeysHandler;
use crate::core::statistics::event::sender::Sender;
use crate::core::statistics::repository::Repository;
use crate::core::torrent::repository::in_memory::InMemoryTorrentRepository;
use crate::core::whitelist::manager::WhiteListManager;
use crate::core::{self};
use crate::servers::apis::server::{ApiServer, Launcher};
use crate::servers::apis::Version;
use crate::servers::registar::ServiceRegistrationForm;
Expand Down Expand Up @@ -63,7 +63,6 @@ pub struct ApiServerJobStarted();
#[allow(clippy::too_many_arguments)]
#[instrument(skip(
config,
tracker,
keys_handler,
whitelist_manager,
ban_service,
Expand All @@ -73,7 +72,7 @@ pub struct ApiServerJobStarted();
))]
pub async fn start_job(
config: &HttpApi,
tracker: Arc<core::Tracker>,
in_memory_torrent_repository: Arc<InMemoryTorrentRepository>,
keys_handler: Arc<KeysHandler>,
whitelist_manager: Arc<WhiteListManager>,
ban_service: Arc<RwLock<BanService>>,
Expand All @@ -95,7 +94,7 @@ pub async fn start_job(
start_v1(
bind_to,
tls,
tracker.clone(),
in_memory_torrent_repository.clone(),
keys_handler.clone(),
whitelist_manager.clone(),
ban_service.clone(),
Expand All @@ -114,7 +113,6 @@ pub async fn start_job(
#[instrument(skip(
socket,
tls,
tracker,
keys_handler,
whitelist_manager,
ban_service,
Expand All @@ -126,7 +124,7 @@ pub async fn start_job(
async fn start_v1(
socket: SocketAddr,
tls: Option<RustlsConfig>,
tracker: Arc<core::Tracker>,
in_memory_torrent_repository: Arc<InMemoryTorrentRepository>,
keys_handler: Arc<KeysHandler>,
whitelist_manager: Arc<WhiteListManager>,
ban_service: Arc<RwLock<BanService>>,
Expand All @@ -137,7 +135,7 @@ async fn start_v1(
) -> JoinHandle<()> {
let server = ApiServer::new(Launcher::new(socket, tls))
.start(
tracker,
in_memory_torrent_repository,
keys_handler,
whitelist_manager,
stats_event_sender,
Expand Down Expand Up @@ -179,7 +177,7 @@ mod tests {

start_job(
config,
app_container.tracker,
app_container.in_memory_torrent_repository,
app_container.keys_handler,
app_container.whitelist_manager,
app_container.ban_service,
Expand Down
127 changes: 53 additions & 74 deletions src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,6 @@ use torrust_tracker_configuration::{AnnouncePolicy, Core, TORRENT_PEERS_LIMIT};
use torrust_tracker_primitives::core::{AnnounceData, ScrapeData};
use torrust_tracker_primitives::peer;
use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics;

/// The domain layer tracker service.
///
Expand All @@ -475,7 +474,7 @@ pub struct Tracker {
config: Core,

/// The service to check is a torrent is whitelisted.
pub whitelist_authorization: Arc<whitelist::authorization::Authorization>,
whitelist_authorization: Arc<whitelist::authorization::Authorization>,

/// The in-memory torrents repository.
in_memory_torrent_repository: Arc<InMemoryTorrentRepository>,
Expand Down Expand Up @@ -619,7 +618,9 @@ impl Tracker {

let stats = self.upsert_peer_and_get_stats(info_hash, peer);

let peers = self.get_peers_for(info_hash, peer, peers_wanted.limit());
let peers = self
.in_memory_torrent_repository
.get_peers_for(info_hash, peer, peers_wanted.limit());

AnnounceData {
peers,
Expand All @@ -638,7 +639,7 @@ impl Tracker {

for info_hash in info_hashes {
let swarm_metadata = match self.whitelist_authorization.authorize(info_hash).await {
Ok(()) => self.get_swarm_metadata(info_hash),
Ok(()) => self.in_memory_torrent_repository.get_swarm_metadata(info_hash),
Err(_) => SwarmMetadata::zeroed(),
};
scrape_data.add_file(info_hash, swarm_metadata);
Expand Down Expand Up @@ -684,40 +685,6 @@ impl Tracker {
drop(self.db_torrent_repository.save(&info_hash, completed));
}
}

/// It returns the data for a `scrape` response.
fn get_swarm_metadata(&self, info_hash: &InfoHash) -> SwarmMetadata {
self.in_memory_torrent_repository.get_swarm_metadata(info_hash)
}

/// # Context: Tracker
///
/// Get torrent peers for a given torrent and client.
///
/// It filters out the client making the request.
fn get_peers_for(&self, info_hash: &InfoHash, peer: &peer::Peer, limit: usize) -> Vec<Arc<peer::Peer>> {
self.in_memory_torrent_repository.get_peers_for(info_hash, peer, limit)
}

/// # Context: Tracker
///
/// Get torrent peers for a given torrent.
#[must_use]
pub fn get_torrent_peers(&self, info_hash: &InfoHash) -> Vec<Arc<peer::Peer>> {
self.in_memory_torrent_repository.get_torrent_peers(info_hash)
}

/// It calculates and returns the general `Tracker`
/// [`TorrentsMetrics`]
///
/// # Context: Tracker
///
/// # Panics
/// Panics if unable to get the torrent metrics.
#[must_use]
pub fn get_torrents_metrics(&self) -> TorrentsMetrics {
self.in_memory_torrent_repository.get_torrents_metrics()
}
}

#[must_use]
Expand All @@ -742,15 +709,17 @@ mod tests {
use bittorrent_primitives::info_hash::fixture::gen_seeded_infohash;
use bittorrent_primitives::info_hash::InfoHash;
use torrust_tracker_configuration::TORRENT_PEERS_LIMIT;
use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics;
use torrust_tracker_primitives::DurationSinceUnixEpoch;
use torrust_tracker_test_helpers::configuration;

use crate::app_test::initialize_tracker_dependencies;
use crate::core::peer::Peer;
use crate::core::services::{initialize_tracker, initialize_whitelist_manager};
use crate::core::torrent::manager::TorrentsManager;
use crate::core::torrent::repository::in_memory::InMemoryTorrentRepository;
use crate::core::whitelist::manager::WhiteListManager;
use crate::core::{whitelist, TorrentsMetrics, Tracker};
use crate::core::{whitelist, Tracker};

fn public_tracker() -> Tracker {
let config = configuration::ephemeral_public();
Expand All @@ -773,6 +742,29 @@ mod tests {
)
}

fn public_tracker_and_in_memory_torrents_repository() -> (Arc<Tracker>, Arc<InMemoryTorrentRepository>) {
let config = configuration::ephemeral_public();

let (
_database,
_in_memory_whitelist,
whitelist_authorization,
_authentication_service,
in_memory_torrent_repository,
db_torrent_repository,
_torrents_manager,
) = initialize_tracker_dependencies(&config);

let tracker = Arc::new(initialize_tracker(
&config,
&whitelist_authorization,
&in_memory_torrent_repository,
&db_torrent_repository,
));

(tracker, in_memory_torrent_repository)
}

fn whitelisted_tracker() -> (Tracker, Arc<whitelist::authorization::Authorization>, Arc<WhiteListManager>) {
let config = configuration::ephemeral_listed();

Expand All @@ -798,7 +790,7 @@ mod tests {
(tracker, whitelist_authorization, whitelist_manager)
}

pub fn tracker_persisting_torrents_in_database() -> (Tracker, Arc<TorrentsManager>) {
pub fn tracker_persisting_torrents_in_database() -> (Tracker, Arc<TorrentsManager>, Arc<InMemoryTorrentRepository>) {
let mut config = configuration::ephemeral_listed();
config.core.tracker_policy.persistent_torrent_completed_stat = true;

Expand All @@ -819,7 +811,7 @@ mod tests {
&db_torrent_repository,
);

(tracker, torrents_manager)
(tracker, torrents_manager, in_memory_torrent_repository)
}

fn sample_info_hash() -> InfoHash {
Expand Down Expand Up @@ -906,33 +898,16 @@ mod tests {
}
}

#[tokio::test]
async fn should_collect_torrent_metrics() {
let tracker = public_tracker();

let torrents_metrics = tracker.get_torrents_metrics();

assert_eq!(
torrents_metrics,
TorrentsMetrics {
complete: 0,
downloaded: 0,
incomplete: 0,
torrents: 0
}
);
}

#[tokio::test]
async fn it_should_return_the_peers_for_a_given_torrent() {
let tracker = public_tracker();
let (tracker, in_memory_torrent_repository) = public_tracker_and_in_memory_torrents_repository();

let info_hash = sample_info_hash();
let peer = sample_peer();

let _ = tracker.upsert_peer_and_get_stats(&info_hash, &peer);

let peers = tracker.get_torrent_peers(&info_hash);
let peers = in_memory_torrent_repository.get_torrent_peers(&info_hash);

assert_eq!(peers, vec![Arc::new(peer)]);
}
Expand All @@ -957,7 +932,7 @@ mod tests {

#[tokio::test]
async fn it_should_return_74_peers_at_the_most_for_a_given_torrent() {
let tracker = public_tracker();
let (tracker, in_memory_torrent_repository) = public_tracker_and_in_memory_torrents_repository();

let info_hash = sample_info_hash();

Expand All @@ -975,7 +950,7 @@ mod tests {
let _ = tracker.upsert_peer_and_get_stats(&info_hash, &peer);
}

let peers = tracker.get_torrent_peers(&info_hash);
let peers = in_memory_torrent_repository.get_torrent_peers(&info_hash);

assert_eq!(peers.len(), 74);
}
Expand All @@ -989,7 +964,9 @@ mod tests {

let _ = tracker.upsert_peer_and_get_stats(&info_hash, &peer);

let peers = tracker.get_peers_for(&info_hash, &peer, TORRENT_PEERS_LIMIT);
let peers = tracker
.in_memory_torrent_repository
.get_peers_for(&info_hash, &peer, TORRENT_PEERS_LIMIT);

assert_eq!(peers, vec![]);
}
Expand Down Expand Up @@ -1019,18 +996,20 @@ mod tests {
let _ = tracker.upsert_peer_and_get_stats(&info_hash, &peer);
}

let peers = tracker.get_peers_for(&info_hash, &excluded_peer, TORRENT_PEERS_LIMIT);
let peers = tracker
.in_memory_torrent_repository
.get_peers_for(&info_hash, &excluded_peer, TORRENT_PEERS_LIMIT);

assert_eq!(peers.len(), 74);
}

#[tokio::test]
async fn it_should_return_the_torrent_metrics() {
let tracker = public_tracker();
let (tracker, in_memory_torrent_repository) = public_tracker_and_in_memory_torrents_repository();

let _ = tracker.upsert_peer_and_get_stats(&sample_info_hash(), &leecher());

let torrent_metrics = tracker.get_torrents_metrics();
let torrent_metrics = in_memory_torrent_repository.get_torrents_metrics();

assert_eq!(
torrent_metrics,
Expand All @@ -1045,7 +1024,7 @@ mod tests {

#[tokio::test]
async fn it_should_get_many_the_torrent_metrics() {
let tracker = public_tracker();
let (tracker, in_memory_torrent_repository) = public_tracker_and_in_memory_torrents_repository();

let start_time = std::time::Instant::now();
for i in 0..1_000_000 {
Expand All @@ -1054,7 +1033,7 @@ mod tests {
let result_a = start_time.elapsed();

let start_time = std::time::Instant::now();
let torrent_metrics = tracker.get_torrents_metrics();
let torrent_metrics = in_memory_torrent_repository.get_torrents_metrics();
let result_b = start_time.elapsed();

assert_eq!(
Expand Down Expand Up @@ -1346,24 +1325,24 @@ mod tests {

#[tokio::test]
async fn it_should_authorize_the_announce_and_scrape_actions_on_whitelisted_torrents() {
let (tracker, _whitelist_authorization, whitelist_manager) = whitelisted_tracker();
let (_tracker, whitelist_authorization, whitelist_manager) = whitelisted_tracker();

let info_hash = sample_info_hash();

let result = whitelist_manager.add_torrent_to_whitelist(&info_hash).await;
assert!(result.is_ok());

let result = tracker.whitelist_authorization.authorize(&info_hash).await;
let result = whitelist_authorization.authorize(&info_hash).await;
assert!(result.is_ok());
}

#[tokio::test]
async fn it_should_not_authorize_the_announce_and_scrape_actions_on_not_whitelisted_torrents() {
let (tracker, _whitelist_authorization, _whitelist_manager) = whitelisted_tracker();
let (_tracker, whitelist_authorization, _whitelist_manager) = whitelisted_tracker();

let info_hash = sample_info_hash();

let result = tracker.whitelist_authorization.authorize(&info_hash).await;
let result = whitelist_authorization.authorize(&info_hash).await;
assert!(result.is_err());
}
}
Expand Down Expand Up @@ -1479,7 +1458,7 @@ mod tests {

#[tokio::test]
async fn it_should_persist_the_number_of_completed_peers_for_all_torrents_into_the_database() {
let (tracker, torrents_manager) = tracker_persisting_torrents_in_database();
let (tracker, torrents_manager, in_memory_torrent_repository) = tracker_persisting_torrents_in_database();

let info_hash = sample_info_hash();

Expand All @@ -1494,7 +1473,7 @@ mod tests {
assert_eq!(swarm_stats.downloaded, 1);

// Remove the newly updated torrent from memory
let _unused = tracker.in_memory_torrent_repository.remove(&info_hash);
let _unused = in_memory_torrent_repository.remove(&info_hash);

torrents_manager.load_torrents_from_database().unwrap();

Expand Down
Loading

0 comments on commit 731fd01

Please sign in to comment.