Skip to content

Commit

Permalink
Merge pull request #162 from ZhongFuze/graphql/domain-search
Browse files Browse the repository at this point in the history
[!] Fix #160 changing fetching mode `join_all`
  • Loading branch information
nykma authored Aug 12, 2024
2 parents a094794 + f0a5c22 commit ac2a2a9
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 79 deletions.
7 changes: 7 additions & 0 deletions src/config/tdb/migrations/LoadingJob_SocialGraph.gsql
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,13 @@ CREATE OR REPLACE QUERY delete_domain_collection(VERTEX<DomainCollection> p) FOR
DELETE s FROM vertex2delete:s;
}

CREATE OR REPLACE QUERY clear_domain_search_cache() FOR GRAPH SocialGraph {
vertex2delete = SELECT v FROM DomainCollection:v;
PRINT vertex2delete.size();
DELETE e FROM vertex2delete:s-((PartOfCollection>):e)-Identities:tgt;
DELETE s FROM vertex2delete:s;
}

CREATE OR REPLACE QUERY domain_available_search(STRING id) FOR GRAPH SocialGraph {
TYPEDEF TUPLE< STRING platform, STRING name, STRING tld, STRING status, BOOL availability, DATETIME expired_at > DomainResult;
SetAccum<DomainResult> @@domain_result;
Expand Down
8 changes: 4 additions & 4 deletions src/controller/tigergraphql/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
vertex::{DomainCollection, IdentityRecord},
},
upstream::{
fetch_all, fetch_all_domains, trim_name, Chain, ContractCategory, DataFetcher, DataSource,
fetch_all, fetch_domains, trim_name, Chain, ContractCategory, DataFetcher, DataSource,
DomainNameSystem, DomainStatus, Platform, Target,
},
util::make_http_client,
Expand Down Expand Up @@ -173,13 +173,13 @@ impl ResolveQuery {
// Check name if exists in storage
match DomainCollection::domain_available_search(&client, &process_name).await? {
None => {
let fetch_result = fetch_all_domains(&process_name).await;
let fetch_result = fetch_domains(&process_name).await;
if fetch_result.is_err() {
event!(
Level::WARN,
process_name,
err = fetch_result.unwrap_err().to_string(),
"Failed to fetch_all_domains"
"Failed to fetch_domains"
);
}
match DomainCollection::domain_available_search(&client, &process_name).await? {
Expand All @@ -198,7 +198,7 @@ impl ResolveQuery {
// Delete and Refetch in the background
sleep(Duration::from_secs(10)).await;
delete_domain_collection(&client, &process_name).await?;
fetch_all_domains(&name).await?;
fetch_domains(&name).await?;
Ok::<_, Error>(())
});
}
Expand Down
88 changes: 24 additions & 64 deletions src/upstream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use async_trait::async_trait;
use futures::{future::join_all, StreamExt};
use std::{collections::HashSet, sync::Arc};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::{event, info, warn, Level};

pub(crate) use types::vec_string_to_vec_datasource;
Expand Down Expand Up @@ -381,70 +380,31 @@ pub async fn batch_fetch_upstream(
Ok((up_next, all_edges))
}

pub async fn fetch_all_domains(name: &str) -> Result<(), Error> {
let mut handles: Vec<JoinHandle<Result<EdgeList, Error>>> = Vec::new();

handles.push(tokio::spawn({
let name = name.to_string();
async move { TheGraph::domain_search(&name).await }
}));

handles.push(tokio::spawn({
let name = name.to_string();
async move { Farcaster::domain_search(&name).await }
}));

handles.push(tokio::spawn({
let name = name.to_string();
async move { LensV2::domain_search(&name).await }
}));

handles.push(tokio::spawn({
let name = name.to_string();
async move { DotBit::domain_search(&name).await }
}));

handles.push(tokio::spawn({
let name = name.to_string();
async move { UnstoppableDomains::domain_search(&name).await }
}));

handles.push(tokio::spawn({
let name = name.to_string();
async move { Genome::domain_search(&name).await }
}));

handles.push(tokio::spawn({
let name = name.to_string();
async move { Crossbell::domain_search(&name).await }
}));

handles.push(tokio::spawn({
let name = name.to_string();
async move { Solana::domain_search(&name).await }
}));

handles.push(tokio::spawn({
let name = name.to_string();
async move { Clusters::domain_search(&name).await }
}));

handles.push(tokio::spawn({
let name = name.to_string();
async move { SpaceIdV3::domain_search(&name).await }
}));

event!(Level::INFO, "DomainSearch Pushed all tasks...");

let mut all_edges: EdgeList = Vec::new();

for handle in handles {
match handle.await {
Ok(Ok(edges)) => all_edges.extend(edges),
Ok(Err(err)) => warn!("Error happened when fetching name({}): {}", name, err),
Err(join_err) => warn!("Task failed to join: {}", join_err),
pub async fn fetch_domains(name: &str) -> Result<(), Error> {
let all_edges: EdgeList = join_all(vec![
TheGraph::domain_search(name), // ens
Farcaster::domain_search(name), // farcaster
LensV2::domain_search(name), // lens
DotBit::domain_search(name), // dotbit
UnstoppableDomains::domain_search(name), // unstoppabledomains
Genome::domain_search(name), // gnosis
Crossbell::domain_search(name), // crossbell
Solana::domain_search(name), // sns
Clusters::domain_search(name), // clusters
SpaceIdV3::domain_search(name), // space_id
])
.await
.into_iter()
.flat_map(|res| {
match res {
Ok(edges) => edges,
Err(err) => {
warn!("Error happened when fetching name({}): {}", name, err);
vec![] // Don't break the procedure
}
}
}
})
.collect();

// Upsert all edges after fetching completes
let gsql_cli = make_http_client();
Expand Down
15 changes: 4 additions & 11 deletions src/upstream/tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::error::Error;
use crate::upstream::{
batch_fetch_upstream, fetch_all, fetch_all_domains, fetch_one, Chain, ContractCategory,
Platform, Target,
batch_fetch_upstream, fetch_all, fetch_domains, fetch_one, Chain, ContractCategory, Platform,
Target,
};

#[tokio::test]
Expand Down Expand Up @@ -125,16 +125,9 @@ async fn test_fetch_all_ens() -> Result<(), Error> {
Ok(())
}

// #[tokio::test]
// async fn test_fetch_domains() -> Result<(), Error> {
// let name = "vitalik";
// fetch_domains(name).await?;
// Ok(())
// }

#[tokio::test]
async fn test_fetch_all_domains() -> Result<(), Error> {
async fn test_fetch_domains() -> Result<(), Error> {
let name = "vitalik";
fetch_all_domains(name).await?;
fetch_domains(name).await?;
Ok(())
}

0 comments on commit ac2a2a9

Please sign in to comment.