diff --git a/README.md b/README.md index d824923..acb0f76 100644 --- a/README.md +++ b/README.md @@ -6,11 +6,14 @@ An application for finding optimal routes between systems in EVE Online. Make sure you have Docker engine installed, then run `docker compose up`. This should build a container for Neo4j, install the graph-data-science plugin, and also build a container for the eve-graph app. +On start, eve-graph will attempt to synchronize systems and stargates with ESI before accepting requests. If routing +isn't working properly, inspect the logs for the api container. + ### Collecting data -You need to exercise the system refresh, stargate refresh, and wormhole refresh endpoints to hydrate the database -with data on first run. Also, every time you restart the database, the in memory "graph" of data that the gds plugin -uses will need to be rebuilt, calling to refresh wormholes also refreshes this "graph" (and you should call to refresh -wormholes regularly). +When you first start eve-graph, you will need to exercise the wormhole refresh endpoint to hydrate the database +ephemeral J-space connections. Also, every time you restart the database, the in memory "graph" of data that the gds +plugin uses will need to be rebuilt, calling to refresh wormholes also refreshes this "graph" (and you should call to +refresh wormholes regularly). ### Finding the shortest route If you want to find the shortest route between two systems, say Cleyd and Jita, simply issue a get request to diff --git a/src/database.rs b/src/database.rs index a01e975..3a2a18c 100644 --- a/src/database.rs +++ b/src/database.rs @@ -8,39 +8,33 @@ use serde::{Deserialize, Serialize}; use crate::eve_scout::EveScoutSignature; pub async fn get_graph_client_with_retry(max_retries: usize) -> Result, Error> { + println!("Connecting to Neo4j"); let neo4j_container_name = "neo4j"; let uri = format!("bolt://{}:7687", neo4j_container_name); let user = "neo4j"; let pass = "neo4jneo4j"; - for attempt in 1..=max_retries { - println!("Trying to build a Neo4j graph client, attempt {}", attempt); + for _attempt in 1..=max_retries { match Graph::new(uri.clone(), user, pass).await { Ok(graph) => { let graph = Arc::new(graph); match check_neo4j_health(graph.clone()).await { Ok(_) => { - println!("Successfully built a healthy Neo4j graph client"); + println!("Connected to Neo4j"); return Ok(graph); } - Err(err) => { - println!("Neo4j isn't ready yet: {}", err); - } + Err(err) => {} }; } - Err(err) => println!("Failed to build a Neo4j graph client: {}", err), + Err(err) => {} }; let seconds = 5; - println!( - "Waiting {} seconds before trying to build another Neo4j graph client", - seconds - ); tokio::time::sleep(Duration::from_secs(seconds)).await; } println!( - "Failed to get graph client after max of {} attempts", + "Failed to connect to Neo4j after the allowed {} attempts", max_retries ); Err(ConnectionError) @@ -50,7 +44,7 @@ async fn check_neo4j_health(graph: Arc) -> Result<(), Error> { let test_query = "MATCH (n) RETURN n LIMIT 1"; let mut result = match graph.execute(query(test_query)).await { Ok(row_stream) => row_stream, - Err(err) => { + Err(_err) => { return Err(ConnectionError); } }; @@ -175,6 +169,27 @@ pub async fn get_all_system_ids(graph: Arc) -> Result, Error> { Ok(system_ids) } +pub async fn get_saved_system_count(graph: &Arc) -> Result { + let get_system_count = "MATCH (s:System) RETURN COUNT(s) as count"; + let mut result = graph.execute(query(get_system_count)).await?; + let row = result.next().await?; + + match row { + None => Ok(0), + Some(row) => row.get::("count").map_err(DeserializationError), + } +} +pub async fn get_saved_stargate_count(graph: &Arc) -> Result { + let get_stargate_count = "MATCH (sg:Stargate) RETURN COUNT(sg) as count"; + let mut result = graph.execute(query(get_stargate_count)).await?; + let row = result.next().await?; + + match row { + None => Ok(0), + Some(row) => row.get::("count").map_err(DeserializationError), + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct Stargate { pub destination_stargate_id: i64, @@ -559,3 +574,23 @@ pub async fn find_safest_route( None => Ok(None), } } + +pub async fn remove_duplicate_systems(graph: Arc) -> Result<(), Error> { + let remove_duplicates = " + MATCH (s:System) + WITH s.system_id AS systemId, COLLECT(s) AS duplicates, COUNT(*) AS count + WHERE count > 1 + FOREACH (duplicate IN TAIL(duplicates) | DETACH DELETE duplicate)"; + + graph.run(query(remove_duplicates)).await +} + +pub async fn remove_duplicate_stargates(graph: Arc) -> Result<(), Error> { + let remove_duplicates = " + MATCH (s:Stargate) + WITH s.stargate_id AS stargateId, COLLECT(s) AS duplicates, COUNT(*) AS count + WHERE count > 1 + FOREACH (duplicate IN TAIL(duplicates) | DETACH DELETE duplicate)"; + + graph.run(query(remove_duplicates)).await +} diff --git a/src/main.rs b/src/main.rs index 64f5146..2b3d57c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +use std::cmp::Ordering; use std::convert::Infallible; use std::sync::Arc; @@ -72,6 +73,17 @@ async fn main() { .or(stargates_routes) .recover(handle_rejection); + match synchronize_esi_systems(client.clone(), graph.clone()).await { + Ok(_) => { + // Stargate sync relies on systems being saved + match synchronize_esi_stargates(client.clone(), graph.clone()).await { + Ok(_) => {} + Err(err) => println!("Stargate synchronization failed {}", err), + } + } + Err(err) => println!("System synchronization failed {}", err), + } + println!("Serving routes on 8008"); warp::serve(service_routes).run(([0, 0, 0, 0], 8008)).await; } @@ -207,6 +219,7 @@ async fn refresh_eve_scout_system_relations( } async fn pull_all_stargates(client: Client, graph: Arc) -> Result<(), ReplicationError> { + println!("Pulling all stargates from ESI"); let mut set = JoinSet::new(); get_all_system_ids(graph.clone()) @@ -223,7 +236,74 @@ async fn pull_all_stargates(client: Client, graph: Arc) -> Result<(), Rep error_if_any_member_has_error(&mut set).await.unwrap() } +const EXPECTED_ESI_SYSTEM_COUNT: i64 = 8436; +async fn synchronize_esi_systems( + client: Client, + graph: Arc, +) -> Result<(), ReplicationError> { + println!("Synchronizing systems with ESI"); + let mut saved_count = get_saved_system_count(&graph).await?; + let max_attempts = 5; + + for _attempt in 1..=max_attempts { + match saved_count.cmp(&EXPECTED_ESI_SYSTEM_COUNT) { + Ordering::Less => { + pull_all_systems(client.clone(), graph.clone()).await?; + saved_count = get_saved_system_count(&graph).await?; + } + Ordering::Equal => { + println!("Systems synchronized"); + return Ok(()); + } + Ordering::Greater => { + println!("Database has more systems than expected, removing any duplicates"); + remove_duplicate_systems(graph.clone()).await?; + } + } + } + + println!( + "Failed to synchronize saved system count {} to expected count {}", + saved_count, EXPECTED_ESI_SYSTEM_COUNT + ); + Ok(()) +} + +const EXPECTED_ESI_STARGATE_COUNT: i64 = 13776; +async fn synchronize_esi_stargates( + client: Client, + graph: Arc, +) -> Result<(), ReplicationError> { + println!("Synchronizing stargates with ESI"); + let mut saved_count = get_saved_stargate_count(&graph).await?; + let max_attempts = 5; + + for _attempt in 1..=max_attempts { + match saved_count.cmp(&EXPECTED_ESI_STARGATE_COUNT) { + Ordering::Less => { + pull_all_stargates(client.clone(), graph.clone()).await?; + saved_count = get_saved_stargate_count(&graph).await?; + } + Ordering::Equal => { + println!("Stargates synchronized"); + return Ok(()); + } + Ordering::Greater => { + println!("Database has more stargates than expected, removing any duplicates"); + remove_duplicate_stargates(graph.clone()).await?; + } + } + } + + println!( + "Failed to synchronize saved stargate count {} to expected count {}", + saved_count, EXPECTED_ESI_STARGATE_COUNT + ); + Ok(()) +} + async fn pull_all_systems(client: Client, graph: Arc) -> Result<(), ReplicationError> { + println!("Pulling all systems from ESI"); let mut set = JoinSet::new(); get_system_ids(&client) @@ -231,7 +311,6 @@ async fn pull_all_systems(client: Client, graph: Arc) -> Result<(), Repli .unwrap() .iter() .for_each(|&system_id| { - println!("Spawning task to pull {} system if its missing", system_id); set.spawn(pull_system_if_missing( client.clone(), graph.clone(), @@ -247,23 +326,14 @@ async fn pull_system_if_missing( graph: Arc, system_id: i64, ) -> Result<(), ReplicationError> { - println!("Checking if system_id {} exists in the database", system_id); - match system_id_exists(graph.clone(), system_id).await { Ok(exists) => { if !exists { - println!( - "System {} does not already exist in the database", - system_id - ); pull_system(client, graph.clone(), system_id).await?; } Ok(()) } - Err(_) => { - println!("Error checking if system_id {} exists", system_id); - Err(TargetError(Error::ConnectionError)) - } + Err(_) => Err(TargetError(Error::ConnectionError)), } } @@ -309,10 +379,8 @@ async fn pull_system( graph: Arc, system_id: i64, ) -> Result<(), ReplicationError> { - println!("Getting system {} from ESI", system_id); let system_response = get_system_details(&client, system_id).await?; let system = System::from(system_response); - println!("Saving system {} to database", system_id); save_system(&graph, &system).await.map_err(TargetError) }