Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor-opt(torii-grpc): subscriptions no db fetch #2455

Merged
merged 4 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions crates/torii/core/src/query_queue.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::VecDeque;

use anyhow::{Context, Result};
use dojo_types::schema::Ty;
use dojo_types::schema::{Struct, Ty};
use sqlx::{FromRow, Pool, Sqlite};
use starknet::core::types::Felt;

Expand Down Expand Up @@ -42,7 +42,7 @@ pub struct DeleteEntityQuery {
pub entity_id: String,
pub event_id: String,
pub block_timestamp: String,
pub entity: Ty,
pub ty: Ty,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -115,7 +115,8 @@ impl QueryQueue {
.fetch_one(&mut *tx)
.await?;
let mut entity_updated = EntityUpdated::from_row(&row)?;
entity_updated.updated_model = Some(entity.entity);
entity_updated.updated_model =
Some(Ty::Struct(Struct { name: entity.ty.name(), children: vec![] }));

let count = sqlx::query_scalar::<_, i64>(
"SELECT count(*) FROM entity_model WHERE entity_id = ?",
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ impl Sql {
entity_id: entity_id.clone(),
event_id: event_id.to_string(),
block_timestamp: utc_dt_string_from_timestamp(block_timestamp),
entity: entity.clone(),
ty: entity.clone(),
}),
);

Expand Down
14 changes: 4 additions & 10 deletions crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,11 @@ impl DojoWorld {
Arc::clone(&state_diff_manager),
));

tokio::task::spawn(subscriptions::entity::Service::new(
pool.clone(),
Arc::clone(&entity_manager),
Arc::clone(&model_cache),
));
tokio::task::spawn(subscriptions::entity::Service::new(Arc::clone(&entity_manager)));

tokio::task::spawn(subscriptions::event_message::Service::new(
pool.clone(),
Arc::clone(&event_message_manager),
Arc::clone(&model_cache),
));
tokio::task::spawn(subscriptions::event_message::Service::new(Arc::clone(
&event_message_manager,
)));

tokio::task::spawn(subscriptions::event::Service::new(Arc::clone(&event_manager)));

Expand Down
70 changes: 10 additions & 60 deletions crates/torii/grpc/src/server/subscriptions/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,17 @@
use futures::Stream;
use futures_util::StreamExt;
use rand::Rng;
use sqlx::{Pool, Sqlite};
use starknet::core::types::Felt;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock;
use torii_core::cache::ModelCache;
use torii_core::error::{Error, ParseError};
use torii_core::model::build_sql_query;
use torii_core::simple_broker::SimpleBroker;
use torii_core::sql::FELT_DELIMITER;
use torii_core::types::Entity;
use tracing::{error, trace};

use crate::proto;
use crate::proto::world::SubscribeEntityResponse;
use crate::server::map_row_to_entity;
use crate::types::{EntityKeysClause, PatternMatching};

pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::entity";
Expand Down Expand Up @@ -81,32 +77,16 @@
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
pool: Pool<Sqlite>,
subs_manager: Arc<EntityManager>,
model_cache: Arc<ModelCache>,
simple_broker: Pin<Box<dyn Stream<Item = Entity> + Send>>,
}

impl Service {
pub fn new(
pool: Pool<Sqlite>,
subs_manager: Arc<EntityManager>,
model_cache: Arc<ModelCache>,
) -> Self {
Self {
pool,
subs_manager,
model_cache,
simple_broker: Box::pin(SimpleBroker::<Entity>::subscribe()),
}
pub fn new(subs_manager: Arc<EntityManager>) -> Self {
Self { subs_manager, simple_broker: Box::pin(SimpleBroker::<Entity>::subscribe()) }
}

async fn publish_updates(
subs: Arc<EntityManager>,
cache: Arc<ModelCache>,
pool: Pool<Sqlite>,
entity: &Entity,
) -> Result<(), Error> {
async fn publish_updates(subs: Arc<EntityManager>, entity: &Entity) -> Result<(), Error> {

Check warning on line 89 in crates/torii/grpc/src/server/subscriptions/entity.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/entity.rs#L89

Added line #L89 was not covered by tests
let mut closed_stream = Vec::new();
let hashed = Felt::from_str(&entity.id).map_err(ParseError::FromStr)?;
let keys = entity
Expand Down Expand Up @@ -204,41 +184,13 @@
continue;
}

let models_query = r#"
SELECT group_concat(entity_model.model_id) as model_ids
FROM entities
JOIN entity_model ON entities.id = entity_model.entity_id
WHERE entities.id = ?
GROUP BY entities.id
"#;
let (model_ids,): (String,) =
sqlx::query_as(models_query).bind(&entity.id).fetch_one(&pool).await?;
let model_ids: Vec<Felt> = model_ids
.split(',')
.map(Felt::from_str)
.collect::<Result<_, _>>()
.map_err(ParseError::FromStr)?;
let schemas = cache.models(&model_ids).await?.into_iter().map(|m| m.schema).collect();

let (entity_query, arrays_queries, _) = build_sql_query(
&schemas,
"entities",
"entity_id",
Some("entities.id = ?"),
Some("entities.id = ?"),
None,
None,
)?;

let row = sqlx::query(&entity_query).bind(&entity.id).fetch_one(&pool).await?;
let mut arrays_rows = HashMap::new();
for (name, query) in arrays_queries {
let row = sqlx::query(&query).bind(&entity.id).fetch_all(&pool).await?;
arrays_rows.insert(name, row);
}

// This should NEVER be None
let model = entity.updated_model.as_ref().unwrap().as_struct().unwrap().clone();

Check warning on line 188 in crates/torii/grpc/src/server/subscriptions/entity.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/entity.rs#L187-L188

Added lines #L187 - L188 were not covered by tests
let resp = proto::world::SubscribeEntityResponse {
entity: Some(map_row_to_entity(&row, &arrays_rows, schemas.clone())?),
entity: Some(proto::types::Entity {
hashed_keys: hashed.to_bytes_be().to_vec(),
models: vec![model.into()],
}),

Check warning on line 193 in crates/torii/grpc/src/server/subscriptions/entity.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/entity.rs#L190-L193

Added lines #L190 - L193 were not covered by tests
subscription_id: *idx,
};

Expand All @@ -264,10 +216,8 @@

while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
let cache = Arc::clone(&pin.model_cache);
let pool = pin.pool.clone();
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await {
if let Err(e) = Service::publish_updates(subs, &entity).await {

Check warning on line 220 in crates/torii/grpc/src/server/subscriptions/entity.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/entity.rs#L220

Added line #L220 was not covered by tests
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
}
});
Expand Down
66 changes: 9 additions & 57 deletions crates/torii/grpc/src/server/subscriptions/event_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,10 @@
use futures::Stream;
use futures_util::StreamExt;
use rand::Rng;
use sqlx::{Pool, Sqlite};
use starknet::core::types::Felt;
use tokio::sync::mpsc::{channel, Receiver};
use tokio::sync::RwLock;
use torii_core::cache::ModelCache;
use torii_core::error::{Error, ParseError};
use torii_core::model::build_sql_query;
use torii_core::simple_broker::SimpleBroker;
use torii_core::sql::FELT_DELIMITER;
use torii_core::types::EventMessage;
Expand All @@ -23,7 +20,6 @@
use super::entity::EntitiesSubscriber;
use crate::proto;
use crate::proto::world::SubscribeEntityResponse;
use crate::server::map_row_to_entity;
use crate::types::{EntityKeysClause, PatternMatching};

pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::event_message";
Expand Down Expand Up @@ -75,30 +71,17 @@
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
pool: Pool<Sqlite>,
subs_manager: Arc<EventMessageManager>,
model_cache: Arc<ModelCache>,
simple_broker: Pin<Box<dyn Stream<Item = EventMessage> + Send>>,
}

impl Service {
pub fn new(
pool: Pool<Sqlite>,
subs_manager: Arc<EventMessageManager>,
model_cache: Arc<ModelCache>,
) -> Self {
Self {
pool,
subs_manager,
model_cache,
simple_broker: Box::pin(SimpleBroker::<EventMessage>::subscribe()),
}
pub fn new(subs_manager: Arc<EventMessageManager>) -> Self {
Self { subs_manager, simple_broker: Box::pin(SimpleBroker::<EventMessage>::subscribe()) }
}

async fn publish_updates(
subs: Arc<EventMessageManager>,
cache: Arc<ModelCache>,
pool: Pool<Sqlite>,
entity: &EventMessage,
) -> Result<(), Error> {
let mut closed_stream = Vec::new();
Expand Down Expand Up @@ -182,42 +165,13 @@
continue;
}

// publish all updates if ids is empty or only ids that are subscribed to
let models_query = r#"
SELECT group_concat(event_model.model_id) as model_ids
FROM event_messages
JOIN event_model ON event_messages.id = event_model.entity_id
WHERE event_messages.id = ?
GROUP BY event_messages.id
"#;
let (model_ids,): (String,) =
sqlx::query_as(models_query).bind(&entity.id).fetch_one(&pool).await?;
let model_ids: Vec<Felt> = model_ids
.split(',')
.map(Felt::from_str)
.collect::<Result<_, _>>()
.map_err(ParseError::FromStr)?;
let schemas = cache.models(&model_ids).await?.into_iter().map(|m| m.schema).collect();

let (entity_query, arrays_queries, _) = build_sql_query(
&schemas,
"event_messages",
"event_message_id",
Some("event_messages.id = ?"),
Some("event_messages.id = ?"),
None,
None,
)?;

let row = sqlx::query(&entity_query).bind(&entity.id).fetch_one(&pool).await?;
let mut arrays_rows = HashMap::new();
for (name, query) in arrays_queries {
let rows = sqlx::query(&query).bind(&entity.id).fetch_all(&pool).await?;
arrays_rows.insert(name, rows);
}

// This should NEVER be None
let model = entity.updated_model.as_ref().unwrap().as_struct().unwrap().clone();

Check warning on line 169 in crates/torii/grpc/src/server/subscriptions/event_message.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/event_message.rs#L168-L169

Added lines #L168 - L169 were not covered by tests
let resp = proto::world::SubscribeEntityResponse {
entity: Some(map_row_to_entity(&row, &arrays_rows, schemas.clone())?),
entity: Some(proto::types::Entity {
hashed_keys: hashed.to_bytes_be().to_vec(),
models: vec![model.into()],
}),

Check warning on line 174 in crates/torii/grpc/src/server/subscriptions/event_message.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/event_message.rs#L171-L174

Added lines #L171 - L174 were not covered by tests
Larkooo marked this conversation as resolved.
Show resolved Hide resolved
subscription_id: *idx,
};

Expand All @@ -243,10 +197,8 @@

while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
let cache = Arc::clone(&pin.model_cache);
let pool = pin.pool.clone();
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await {
if let Err(e) = Service::publish_updates(subs, &entity).await {

Check warning on line 201 in crates/torii/grpc/src/server/subscriptions/event_message.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/event_message.rs#L201

Added line #L201 was not covered by tests
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
}
});
Expand Down
Loading