diff --git a/src/main.rs b/src/main.rs index af2aff8..90009e8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -158,54 +158,57 @@ async fn refresh_eve_scout_system_relations( ) -> Result<(), ReplicationError> { drop_system_connections(&graph, "Thera").await?; drop_system_connections(&graph, "Turnur").await?; - let signatures = get_public_signatures(client.clone()).await?; - let wormhole_saves: Vec<_> = signatures + + let mut set = JoinSet::new(); + + get_public_signatures(client.clone()) + .await? .iter() .filter(|sig| sig.signature_type == "wormhole") - .map(|wormhole| tokio::spawn(save_wormhole(graph.clone(), wormhole.clone()))) - .collect(); - futures::future::try_join_all(wormhole_saves).await?; - Ok(()) + .for_each(|wormhole| { + set.spawn(save_wormhole(graph.clone(), wormhole.clone())); + }); + + error_if_any_member_has_error(&mut set) + .await + .unwrap() + .map_err(TargetError) } async fn pull_all_stargates(client: Client, graph: Arc) -> Result<(), ReplicationError> { - let saved_system_ids = get_all_system_ids(graph.clone()).await?; - let stargate_pulls: Vec<_> = saved_system_ids + let mut set = JoinSet::new(); + + get_all_system_ids(graph.clone()) + .await? .iter() - .map(|&system_id| { - tokio::spawn(pull_system_stargates( + .for_each(|&system_id| { + set.spawn(pull_system_stargates( client.clone(), graph.clone(), system_id, - )) - }) - .collect(); - futures::future::try_join_all(stargate_pulls).await?; - Ok(()) + )); + }); + + error_if_any_member_has_error(&mut set).await.unwrap() } async fn pull_all_systems(client: Client, graph: Arc) -> Result<(), ReplicationError> { - let system_ids = get_system_ids(&client).await.unwrap(); - println!("Received {} system ids from ESI", system_ids.len()); - let mut set = JoinSet::new(); - system_ids.iter().for_each(|&system_id| { - println!("Spawning task to pull {} system if its missing", system_id); - set.spawn(pull_system_if_missing( - client.clone(), - graph.clone(), - system_id, - )); - }); - - while let Some(res) = set.join_next().await { - if let Err(e) = res.unwrap() { - return Err(e); - } - } + get_system_ids(&client) + .await + .unwrap() + .iter() + .for_each(|&system_id| { + println!("Spawning task to pull {} system if its missing", system_id); + set.spawn(pull_system_if_missing( + client.clone(), + graph.clone(), + system_id, + )); + }); - Ok(()) + error_if_any_member_has_error(&mut set).await.unwrap() } async fn pull_system_if_missing( @@ -214,9 +217,8 @@ async fn pull_system_if_missing( system_id: i64, ) -> Result<(), ReplicationError> { println!("Checking if system_id {} exists in the database", system_id); - let result = system_id_exists(graph.clone(), system_id).await; - match result { + match system_id_exists(graph.clone(), system_id).await { Ok(exists) => { if exists { println!( @@ -304,76 +306,96 @@ async fn pull_system_stargates( graph: Arc, system_id: i64, ) -> Result<(), ReplicationError> { - let system = get_system(graph.clone(), system_id).await?; - let stargate_pulls: Vec<_> = system + let mut set = JoinSet::new(); + + get_system(graph.clone(), system_id) + .await? .unwrap() .stargates .iter() - .map(|&stargate_id| { - tokio::spawn(pull_stargate_if_missing( + .for_each(|&stargate_id| { + set.spawn(pull_stargate_if_missing( client.clone(), graph.clone(), stargate_id, - )) - }) - .collect(); - futures::future::try_join_all(stargate_pulls).await?; - Ok(()) + )); + }); + + error_if_any_member_has_error(&mut set).await.unwrap() +} + +async fn error_if_any_member_has_error( + set: &mut JoinSet>, +) -> Option> { + while let Some(res) = set.join_next().await { + if let Err(e) = res.unwrap() { + return Some(Err(e)); + } + } + None } async fn pull_system_kills(client: Client, graph: Arc) -> Result { let response = get_system_kills(&client).await?; let galaxy_kills: i32 = response.system_kills.iter().map(|s| s.ship_kills).sum(); - let kill_saves: Vec<_> = response - .system_kills - .iter() - .map(|system_kill| { - tokio::spawn(set_last_hour_system_kills( - graph.clone(), - system_kill.system_id, - system_kill.ship_kills, - )) - }) - .collect(); - futures::future::try_join_all(kill_saves).await?; - Ok(galaxy_kills) + + let mut set = JoinSet::new(); + + response.system_kills.iter().for_each(|system_kill| { + set.spawn(set_last_hour_system_kills( + graph.clone(), + system_kill.system_id, + system_kill.ship_kills, + )); + }); + + error_if_any_member_has_error(&mut set) + .await + .unwrap() + .map_err(TargetError) + .map(|_| galaxy_kills) } async fn pull_system_jumps(client: Client, graph: Arc) -> Result { let response = get_system_jumps(&client).await?; let galaxy_jumps: i32 = response.system_jumps.iter().map(|s| s.ship_jumps).sum(); - let jump_saves: Vec<_> = response - .system_jumps - .iter() - .map(|system_jump| { - tokio::spawn(set_last_hour_system_jumps( - graph.clone(), - system_jump.system_id, - system_jump.ship_jumps, - )) - }) - .collect(); - futures::future::try_join_all(jump_saves).await?; - Ok(galaxy_jumps) + + let mut set = JoinSet::new(); + + response.system_jumps.iter().for_each(|system_jump| { + set.spawn(set_last_hour_system_jumps( + graph.clone(), + system_jump.system_id, + system_jump.ship_jumps, + )); + }); + + error_if_any_member_has_error(&mut set) + .await + .unwrap() + .map_err(TargetError) + .map(|_| galaxy_jumps) } async fn pull_system_risk(client: Client, graph: Arc) -> Result<(), ReplicationError> { let galaxy_kills = pull_system_kills(client.clone(), graph.clone()).await?; let galaxy_jumps = pull_system_jumps(client.clone(), graph.clone()).await?; let system_ids = get_all_system_ids(graph.clone()).await?; - let risk_saves: Vec<_> = system_ids - .iter() - .map(|&system_id| { - tokio::spawn(set_system_jump_risk( - graph.clone(), - system_id, - galaxy_jumps, - galaxy_kills, - )) - }) - .collect(); - futures::future::try_join_all(risk_saves).await?; - Ok(()) + let mut set = JoinSet::new(); + + system_ids.iter().for_each(|&system_id| { + set.spawn(set_system_jump_risk( + graph.clone(), + system_id, + galaxy_jumps, + galaxy_kills, + )); + }); + + error_if_any_member_has_error(&mut set) + .await + .unwrap() + .map_err(TargetError) } async fn pull_stargate_if_missing(