From 3929494c00cf6c530950ead1b249139641a77928 Mon Sep 17 00:00:00 2001 From: Nasr Date: Thu, 19 Sep 2024 16:50:43 -0400 Subject: [PATCH 1/4] refactor-opt(torii-grpc): subscriptions no db fetch --- crates/torii/core/src/query_queue.rs | 9 ++- crates/torii/core/src/sql.rs | 2 +- .../grpc/src/server/subscriptions/entity.rs | 57 +++--------------- .../src/server/subscriptions/event_message.rs | 58 +++---------------- 4 files changed, 23 insertions(+), 103 deletions(-) diff --git a/crates/torii/core/src/query_queue.rs b/crates/torii/core/src/query_queue.rs index 589035ca4e..3362a77caf 100644 --- a/crates/torii/core/src/query_queue.rs +++ b/crates/torii/core/src/query_queue.rs @@ -1,7 +1,7 @@ use std::collections::VecDeque; use anyhow::{Context, Result}; -use dojo_types::schema::Ty; +use dojo_types::schema::{Struct, Ty}; use sqlx::{FromRow, Pool, Sqlite}; use starknet::core::types::Felt; @@ -42,7 +42,7 @@ pub struct DeleteEntityQuery { pub entity_id: String, pub event_id: String, pub block_timestamp: String, - pub entity: Ty, + pub ty: Ty, } #[derive(Debug, Clone)] @@ -115,7 +115,10 @@ impl QueryQueue { .fetch_one(&mut *tx) .await?; let mut entity_updated = EntityUpdated::from_row(&row)?; - entity_updated.updated_model = Some(entity.entity); + entity_updated.updated_model = Some(Ty::Struct(Struct { + name: entity.ty.name(), + children: vec![], + })); let count = sqlx::query_scalar::<_, i64>( "SELECT count(*) FROM entity_model WHERE entity_id = ?", diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index ccca4f4c7d..249a3c4fef 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -361,7 +361,7 @@ impl Sql { entity_id: entity_id.clone(), event_id: event_id.to_string(), block_timestamp: utc_dt_string_from_timestamp(block_timestamp), - entity: entity.clone(), + ty: entity.clone(), }), ); diff --git a/crates/torii/grpc/src/server/subscriptions/entity.rs b/crates/torii/grpc/src/server/subscriptions/entity.rs index 7c03d096a7..3217d88992 100644 --- a/crates/torii/grpc/src/server/subscriptions/entity.rs +++ b/crates/torii/grpc/src/server/subscriptions/entity.rs @@ -8,13 +8,10 @@ use std::task::{Context, Poll}; use futures::Stream; use futures_util::StreamExt; use rand::Rng; -use sqlx::{Pool, Sqlite}; use starknet::core::types::Felt; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::RwLock; -use torii_core::cache::ModelCache; use torii_core::error::{Error, ParseError}; -use torii_core::model::build_sql_query; use torii_core::simple_broker::SimpleBroker; use torii_core::sql::FELT_DELIMITER; use torii_core::types::Entity; @@ -22,7 +19,6 @@ use tracing::{error, trace}; use crate::proto; use crate::proto::world::SubscribeEntityResponse; -use crate::server::map_row_to_entity; use crate::types::{EntityKeysClause, PatternMatching}; pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::entity"; @@ -81,30 +77,22 @@ impl EntityManager { #[must_use = "Service does nothing unless polled"] #[allow(missing_debug_implementations)] pub struct Service { - pool: Pool, subs_manager: Arc, - model_cache: Arc, simple_broker: Pin + Send>>, } impl Service { pub fn new( - pool: Pool, subs_manager: Arc, - model_cache: Arc, ) -> Self { Self { - pool, subs_manager, - model_cache, simple_broker: Box::pin(SimpleBroker::::subscribe()), } } async fn publish_updates( subs: Arc, - cache: Arc, - pool: Pool, entity: &Entity, ) -> Result<(), Error> { let mut closed_stream = Vec::new(); @@ -204,41 +192,14 @@ impl Service { continue; } - let models_query = r#" - SELECT group_concat(entity_model.model_id) as model_ids - FROM entities - JOIN entity_model ON entities.id = entity_model.entity_id - WHERE entities.id = ? - GROUP BY entities.id - "#; - let (model_ids,): (String,) = - sqlx::query_as(models_query).bind(&entity.id).fetch_one(&pool).await?; - let model_ids: Vec = model_ids - .split(',') - .map(Felt::from_str) - .collect::>() - .map_err(ParseError::FromStr)?; - let schemas = cache.models(&model_ids).await?.into_iter().map(|m| m.schema).collect(); - - let (entity_query, arrays_queries, _) = build_sql_query( - &schemas, - "entities", - "entity_id", - Some("entities.id = ?"), - Some("entities.id = ?"), - None, - None, - )?; - - let row = sqlx::query(&entity_query).bind(&entity.id).fetch_one(&pool).await?; - let mut arrays_rows = HashMap::new(); - for (name, query) in arrays_queries { - let row = sqlx::query(&query).bind(&entity.id).fetch_all(&pool).await?; - arrays_rows.insert(name, row); - } - + let model = entity.updated_model.as_ref().unwrap().as_struct().unwrap().clone(); let resp = proto::world::SubscribeEntityResponse { - entity: Some(map_row_to_entity(&row, &arrays_rows, schemas.clone())?), + entity: Some(proto::types::Entity { + hashed_keys: hashed.to_bytes_be().to_vec(), + models: vec![ + model.into() + ], + }), subscription_id: *idx, }; @@ -264,10 +225,8 @@ impl Future for Service { while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) { let subs = Arc::clone(&pin.subs_manager); - let cache = Arc::clone(&pin.model_cache); - let pool = pin.pool.clone(); tokio::spawn(async move { - if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await { + if let Err(e) = Service::publish_updates(subs, &entity).await { error!(target = LOG_TARGET, error = %e, "Publishing entity update."); } }); diff --git a/crates/torii/grpc/src/server/subscriptions/event_message.rs b/crates/torii/grpc/src/server/subscriptions/event_message.rs index fabd5510aa..c08d4f08a5 100644 --- a/crates/torii/grpc/src/server/subscriptions/event_message.rs +++ b/crates/torii/grpc/src/server/subscriptions/event_message.rs @@ -8,13 +8,10 @@ use std::task::{Context, Poll}; use futures::Stream; use futures_util::StreamExt; use rand::Rng; -use sqlx::{Pool, Sqlite}; use starknet::core::types::Felt; use tokio::sync::mpsc::{channel, Receiver}; use tokio::sync::RwLock; -use torii_core::cache::ModelCache; use torii_core::error::{Error, ParseError}; -use torii_core::model::build_sql_query; use torii_core::simple_broker::SimpleBroker; use torii_core::sql::FELT_DELIMITER; use torii_core::types::EventMessage; @@ -23,7 +20,6 @@ use tracing::{error, trace}; use super::entity::EntitiesSubscriber; use crate::proto; use crate::proto::world::SubscribeEntityResponse; -use crate::server::map_row_to_entity; use crate::types::{EntityKeysClause, PatternMatching}; pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::event_message"; @@ -75,30 +71,22 @@ impl EventMessageManager { #[must_use = "Service does nothing unless polled"] #[allow(missing_debug_implementations)] pub struct Service { - pool: Pool, subs_manager: Arc, - model_cache: Arc, simple_broker: Pin + Send>>, } impl Service { pub fn new( - pool: Pool, subs_manager: Arc, - model_cache: Arc, ) -> Self { Self { - pool, subs_manager, - model_cache, simple_broker: Box::pin(SimpleBroker::::subscribe()), } } async fn publish_updates( subs: Arc, - cache: Arc, - pool: Pool, entity: &EventMessage, ) -> Result<(), Error> { let mut closed_stream = Vec::new(); @@ -182,42 +170,14 @@ impl Service { continue; } - // publish all updates if ids is empty or only ids that are subscribed to - let models_query = r#" - SELECT group_concat(event_model.model_id) as model_ids - FROM event_messages - JOIN event_model ON event_messages.id = event_model.entity_id - WHERE event_messages.id = ? - GROUP BY event_messages.id - "#; - let (model_ids,): (String,) = - sqlx::query_as(models_query).bind(&entity.id).fetch_one(&pool).await?; - let model_ids: Vec = model_ids - .split(',') - .map(Felt::from_str) - .collect::>() - .map_err(ParseError::FromStr)?; - let schemas = cache.models(&model_ids).await?.into_iter().map(|m| m.schema).collect(); - - let (entity_query, arrays_queries, _) = build_sql_query( - &schemas, - "event_messages", - "event_message_id", - Some("event_messages.id = ?"), - Some("event_messages.id = ?"), - None, - None, - )?; - - let row = sqlx::query(&entity_query).bind(&entity.id).fetch_one(&pool).await?; - let mut arrays_rows = HashMap::new(); - for (name, query) in arrays_queries { - let rows = sqlx::query(&query).bind(&entity.id).fetch_all(&pool).await?; - arrays_rows.insert(name, rows); - } - + let model = entity.updated_model.as_ref().unwrap().as_struct().unwrap().clone(); let resp = proto::world::SubscribeEntityResponse { - entity: Some(map_row_to_entity(&row, &arrays_rows, schemas.clone())?), + entity: Some(proto::types::Entity { + hashed_keys: hashed.to_bytes_be().to_vec(), + models: vec![ + model.into() + ], + }), subscription_id: *idx, }; @@ -243,10 +203,8 @@ impl Future for Service { while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) { let subs = Arc::clone(&pin.subs_manager); - let cache = Arc::clone(&pin.model_cache); - let pool = pin.pool.clone(); tokio::spawn(async move { - if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await { + if let Err(e) = Service::publish_updates(subs, &entity).await { error!(target = LOG_TARGET, error = %e, "Publishing entity update."); } }); From 51d507bddddd12627a28663529a84fe1ab8df363 Mon Sep 17 00:00:00 2001 From: Nasr Date: Thu, 19 Sep 2024 16:55:20 -0400 Subject: [PATCH 2/4] update grpc --- crates/torii/grpc/src/server/mod.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/crates/torii/grpc/src/server/mod.rs b/crates/torii/grpc/src/server/mod.rs index 3b497c6def..ffd4059621 100644 --- a/crates/torii/grpc/src/server/mod.rs +++ b/crates/torii/grpc/src/server/mod.rs @@ -107,15 +107,11 @@ impl DojoWorld { )); tokio::task::spawn(subscriptions::entity::Service::new( - pool.clone(), Arc::clone(&entity_manager), - Arc::clone(&model_cache), )); tokio::task::spawn(subscriptions::event_message::Service::new( - pool.clone(), Arc::clone(&event_message_manager), - Arc::clone(&model_cache), )); tokio::task::spawn(subscriptions::event::Service::new(Arc::clone(&event_manager))); From f795c3af6634c3a8c1bd26ed13667e29d065abbe Mon Sep 17 00:00:00 2001 From: Nasr Date: Thu, 19 Sep 2024 16:55:32 -0400 Subject: [PATCH 3/4] fmt --- crates/torii/core/src/query_queue.rs | 6 ++---- crates/torii/grpc/src/server/mod.rs | 10 ++++------ .../grpc/src/server/subscriptions/entity.rs | 18 ++++-------------- .../src/server/subscriptions/event_message.rs | 13 +++---------- 4 files changed, 13 insertions(+), 34 deletions(-) diff --git a/crates/torii/core/src/query_queue.rs b/crates/torii/core/src/query_queue.rs index 3362a77caf..5dfad77113 100644 --- a/crates/torii/core/src/query_queue.rs +++ b/crates/torii/core/src/query_queue.rs @@ -115,10 +115,8 @@ impl QueryQueue { .fetch_one(&mut *tx) .await?; let mut entity_updated = EntityUpdated::from_row(&row)?; - entity_updated.updated_model = Some(Ty::Struct(Struct { - name: entity.ty.name(), - children: vec![], - })); + entity_updated.updated_model = + Some(Ty::Struct(Struct { name: entity.ty.name(), children: vec![] })); let count = sqlx::query_scalar::<_, i64>( "SELECT count(*) FROM entity_model WHERE entity_id = ?", diff --git a/crates/torii/grpc/src/server/mod.rs b/crates/torii/grpc/src/server/mod.rs index ffd4059621..4dd245d03a 100644 --- a/crates/torii/grpc/src/server/mod.rs +++ b/crates/torii/grpc/src/server/mod.rs @@ -106,13 +106,11 @@ impl DojoWorld { Arc::clone(&state_diff_manager), )); - tokio::task::spawn(subscriptions::entity::Service::new( - Arc::clone(&entity_manager), - )); + tokio::task::spawn(subscriptions::entity::Service::new(Arc::clone(&entity_manager))); - tokio::task::spawn(subscriptions::event_message::Service::new( - Arc::clone(&event_message_manager), - )); + tokio::task::spawn(subscriptions::event_message::Service::new(Arc::clone( + &event_message_manager, + ))); tokio::task::spawn(subscriptions::event::Service::new(Arc::clone(&event_manager))); diff --git a/crates/torii/grpc/src/server/subscriptions/entity.rs b/crates/torii/grpc/src/server/subscriptions/entity.rs index 3217d88992..6511e218ec 100644 --- a/crates/torii/grpc/src/server/subscriptions/entity.rs +++ b/crates/torii/grpc/src/server/subscriptions/entity.rs @@ -82,19 +82,11 @@ pub struct Service { } impl Service { - pub fn new( - subs_manager: Arc, - ) -> Self { - Self { - subs_manager, - simple_broker: Box::pin(SimpleBroker::::subscribe()), - } + pub fn new(subs_manager: Arc) -> Self { + Self { subs_manager, simple_broker: Box::pin(SimpleBroker::::subscribe()) } } - async fn publish_updates( - subs: Arc, - entity: &Entity, - ) -> Result<(), Error> { + async fn publish_updates(subs: Arc, entity: &Entity) -> Result<(), Error> { let mut closed_stream = Vec::new(); let hashed = Felt::from_str(&entity.id).map_err(ParseError::FromStr)?; let keys = entity @@ -196,9 +188,7 @@ impl Service { let resp = proto::world::SubscribeEntityResponse { entity: Some(proto::types::Entity { hashed_keys: hashed.to_bytes_be().to_vec(), - models: vec![ - model.into() - ], + models: vec![model.into()], }), subscription_id: *idx, }; diff --git a/crates/torii/grpc/src/server/subscriptions/event_message.rs b/crates/torii/grpc/src/server/subscriptions/event_message.rs index c08d4f08a5..d2260b0044 100644 --- a/crates/torii/grpc/src/server/subscriptions/event_message.rs +++ b/crates/torii/grpc/src/server/subscriptions/event_message.rs @@ -76,13 +76,8 @@ pub struct Service { } impl Service { - pub fn new( - subs_manager: Arc, - ) -> Self { - Self { - subs_manager, - simple_broker: Box::pin(SimpleBroker::::subscribe()), - } + pub fn new(subs_manager: Arc) -> Self { + Self { subs_manager, simple_broker: Box::pin(SimpleBroker::::subscribe()) } } async fn publish_updates( @@ -174,9 +169,7 @@ impl Service { let resp = proto::world::SubscribeEntityResponse { entity: Some(proto::types::Entity { hashed_keys: hashed.to_bytes_be().to_vec(), - models: vec![ - model.into() - ], + models: vec![model.into()], }), subscription_id: *idx, }; From d67dc7558597553a25868e804e38ac463cf97ed1 Mon Sep 17 00:00:00 2001 From: Nasr Date: Thu, 19 Sep 2024 16:59:04 -0400 Subject: [PATCH 4/4] cmt --- crates/torii/grpc/src/server/subscriptions/entity.rs | 1 + crates/torii/grpc/src/server/subscriptions/event_message.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/crates/torii/grpc/src/server/subscriptions/entity.rs b/crates/torii/grpc/src/server/subscriptions/entity.rs index 6511e218ec..d7b03ae26e 100644 --- a/crates/torii/grpc/src/server/subscriptions/entity.rs +++ b/crates/torii/grpc/src/server/subscriptions/entity.rs @@ -184,6 +184,7 @@ impl Service { continue; } + // This should NEVER be None let model = entity.updated_model.as_ref().unwrap().as_struct().unwrap().clone(); let resp = proto::world::SubscribeEntityResponse { entity: Some(proto::types::Entity { diff --git a/crates/torii/grpc/src/server/subscriptions/event_message.rs b/crates/torii/grpc/src/server/subscriptions/event_message.rs index d2260b0044..9bac36fb84 100644 --- a/crates/torii/grpc/src/server/subscriptions/event_message.rs +++ b/crates/torii/grpc/src/server/subscriptions/event_message.rs @@ -165,6 +165,7 @@ impl Service { continue; } + // This should NEVER be None let model = entity.updated_model.as_ref().unwrap().as_struct().unwrap().clone(); let resp = proto::world::SubscribeEntityResponse { entity: Some(proto::types::Entity {