Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt(torii-grpc): channel based approach for entity updates #2453

Closed
wants to merge 3 commits into from

Conversation

Larkooo
Copy link
Collaborator

@Larkooo Larkooo commented Sep 19, 2024

Summary by CodeRabbit

  • New Features

    • Introduced asynchronous mechanisms for handling entity and event updates, improving responsiveness and efficiency.
    • Added channels for processing updates at regular intervals, enhancing scalability.
    • Implemented a structured approach for managing event message updates, ensuring timely publishing.
  • Bug Fixes

    • Improved the decoupling of update logic from polling mechanisms, reducing potential blocking issues.

Copy link

coderabbitai bot commented Sep 19, 2024

Walkthrough

Ohayo, sensei! The changes in this pull request enhance the Service struct across multiple managers by introducing asynchronous mechanisms for handling updates related to entities, events, and event messages. Each manager now utilizes a Sender to facilitate the sending of updates through channels, with a dedicated asynchronous task processing these updates at regular intervals. The modifications decouple the update logic from the polling mechanisms, improving the overall structure and efficiency of the update handling.

Changes

File Path Change Summary
crates/torii/grpc/src/server/subscriptions/entity.rs Introduced Sender<Entity> for asynchronous entity updates; modified publish_updates visibility.
crates/torii/grpc/src/server/subscriptions/event.rs Added Sender<Event> for asynchronous event updates; modified new constructor and added process_updates.
crates/torii/grpc/src/server/subscriptions/event_message.rs Added Sender<EventMessage> for asynchronous event message updates; introduced process_updates.

Possibly related PRs


Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    -- I pushed a fix in commit <commit_id>, please review it.
    -- Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    -- @coderabbitai generate unit testing code for this file.
    -- @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    -- @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    -- @coderabbitai read src/utils.ts and generate unit testing code.
    -- @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    -- @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 5777c00 and 5cd1fa5.

Files selected for processing (3)
  • crates/torii/grpc/src/server/subscriptions/entity.rs (5 hunks)
  • crates/torii/grpc/src/server/subscriptions/event.rs (3 hunks)
  • crates/torii/grpc/src/server/subscriptions/event_message.rs (4 hunks)
Additional comments not posted (4)
crates/torii/grpc/src/server/subscriptions/event.rs (2)

68-68: Ohayo sensei!

Addition of update_sender field is appropriate.

The inclusion of update_sender in the Service struct correctly facilitates event transmission through the channel.


83-102: Ohayo sensei!

Asynchronous event processing is well-implemented.

The process_updates function efficiently batches and processes events at regular intervals, enhancing performance.

crates/torii/grpc/src/server/subscriptions/event_message.rs (1)

90-94: Ohayo, sensei! Ensure proper initialization of Service fields

I noticed that in the new method, the fields pool, subs_manager, and model_cache are no longer stored within the Service struct after introducing update_sender. Please verify that removing these fields doesn't affect other parts of the Service implementation that might rely on them.

crates/torii/grpc/src/server/subscriptions/entity.rs (1)

Line range hint 138-172: Ohayo sensei! Exposing internal method publish_updates as public API.

The publish_updates function has been changed from private to public. Making it public exposes internal mechanics and could lead to misuse from other modules.

Please verify if publish_updates needs to be public. If it's only used internally, consider keeping it private to maintain encapsulation.

- pub async fn publish_updates(
+ async fn publish_updates(

Comment on lines +73 to +81
let (update_sender, update_receiver) = channel(100);
let service =
Self { simple_broker: Box::pin(SimpleBroker::<Event>::subscribe()), update_sender };

// Spawn a task to process event updates
tokio::spawn(Self::process_updates(Arc::clone(&subs_manager), update_receiver));

service
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohayo sensei!

Consider handling the JoinHandle from tokio::spawn.

In the new function, when spawning the process_updates task, the returned JoinHandle is not stored or managed. If you need to control or monitor the spawned task (such as for graceful shutdown or error handling), consider keeping the JoinHandle.

Comment on lines +185 to 191
let sender = pin.update_sender.clone();
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, &event).await {
error!(target = LOG_TARGET, error = %e, "Publishing events update.");
if let Err(e) = sender.send(event).await {
error!(target = LOG_TARGET, error = %e, "Sending event update to channel.");
}
});
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohayo sensei!

Potential issue with unbounded task spawning in poll method.

In the poll method, spawning a new task for each event to send it via the channel may lead to excessive task creation if events are frequent. This could impact performance.

Consider refactoring to avoid spawning a task per event. One approach is to use try_send on the update_sender without spawning a task. However, be mindful of handling cases where the channel buffer is full. Alternatively, accumulate events and send them in batches if possible.

Comment on lines +96 to +103
// Spawn a task to process event message updates
tokio::spawn(Self::process_updates(
Arc::clone(&subs_manager),
Arc::clone(&model_cache),
pool,
update_receiver,
));

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle potential errors when spawning process_updates task

When spawning the process_updates asynchronous task, any errors that occur within the task might go unnoticed. Consider adding error handling or logging within the process_updates function to ensure that any unexpected errors are captured and addressed.

Comment on lines +107 to +130
async fn process_updates(
subs: Arc<EventMessageManager>,
cache: Arc<ModelCache>,
pool: Pool<Sqlite>,
mut update_receiver: Receiver<EventMessage>,
) {
let mut interval = interval(Duration::from_millis(100));
let mut pending_updates = Vec::new();

loop {
tokio::select! {
_ = interval.tick() => {
if !pending_updates.is_empty() {
for event_message in pending_updates.drain(..) {
if let Err(e) = Self::publish_updates(Arc::clone(&subs), Arc::clone(&cache), pool.clone(), &event_message).await {
error!(target = LOG_TARGET, error = %e, "Publishing event message update.");
}
}
}
}
Some(event_message) = update_receiver.recv() => {
pending_updates.push(event_message);
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential accumulation of pending updates may affect timing

The process_updates function processes pending_updates every 100 milliseconds. If publish_updates takes a significant amount of time, there might be a delay in processing new event messages, leading to a backlog. To improve efficiency, consider processing updates concurrently or optimizing the publishing logic.

Comment on lines +121 to +124
if let Err(e) = Self::publish_updates(Arc::clone(&subs), Arc::clone(&cache), pool.clone(), &event_message).await {
error!(target = LOG_TARGET, error = %e, "Publishing event message update.");
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve error handling in publish_updates

In the process_updates loop, when calling publish_updates, errors are logged but not handled further. If an error occurs, it might affect the processing of subsequent updates. Consider implementing retry logic or more comprehensive error handling to ensure reliability.

Comment on lines +280 to +284
while let Poll::Ready(Some(event_message)) = pin.simple_broker.poll_next_unpin(cx) {
let sender = pin.update_sender.clone();
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
if let Err(e) = sender.send(event_message).await {
error!(target = LOG_TARGET, error = %e, "Sending event message update to channel.");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid spawning a new task for each event message to enhance performance

Currently, a new asynchronous task is spawned for each event message received in the poll implementation. This could introduce overhead under high throughput. To improve performance, consider sending the event message directly without spawning a new task.

Apply this diff to refactor the code:

 while let Poll::Ready(Some(event_message)) = pin.simple_broker.poll_next_unpin(cx) {
-    let sender = pin.update_sender.clone();
-    tokio::spawn(async move {
-        if let Err(e) = sender.send(event_message).await {
-            error!(target = LOG_TARGET, error = %e, "Sending event message update to channel.");
-        }
-    });
+    if let Err(e) = pin.update_sender.try_send(event_message) {
+        error!(target = LOG_TARGET, error = %e, "Sending event message update to channel.");
+    }
 }

This modification replaces the spawned task with a direct try_send, eliminating unnecessary task creation and reducing overhead.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
while let Poll::Ready(Some(event_message)) = pin.simple_broker.poll_next_unpin(cx) {
let sender = pin.update_sender.clone();
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
if let Err(e) = sender.send(event_message).await {
error!(target = LOG_TARGET, error = %e, "Sending event message update to channel.");
while let Poll::Ready(Some(event_message)) = pin.simple_broker.poll_next_unpin(cx) {
if let Err(e) = pin.update_sender.try_send(event_message) {
error!(target = LOG_TARGET, error = %e, "Sending event message update to channel.");
}
}

Comment on lines +96 to +109
let (update_sender, update_receiver) = channel(100);
let service =
Self { simple_broker: Box::pin(SimpleBroker::<Entity>::subscribe()), update_sender };

// Spawn a task to process entity updates
tokio::spawn(Self::process_updates(
Arc::clone(&subs_manager),
Arc::clone(&model_cache),
pool,
subs_manager,
model_cache,
simple_broker: Box::pin(SimpleBroker::<Entity>::subscribe()),
update_receiver,
));

service
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohayo sensei! Potential unbounded task spawning without backpressure control.

In the new method, the process_updates task is spawned to handle entity updates. However, since it's consuming from a channel with a buffer size of 100 (channel(100)), there's a risk of the update_sender becoming saturated if entities are produced faster than they are processed, which could lead to unbounded memory growth.

Consider implementing backpressure or increasing the channel's capacity to prevent potential bottlenecks. You might also monitor the channel's usage to adjust its size dynamically based on load.

Comment on lines +296 to +303
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let pin = self.get_mut();

while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
let cache = Arc::clone(&pin.model_cache);
let pool = pin.pool.clone();
let sender = pin.update_sender.clone();
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
if let Err(e) = sender.send(entity).await {
error!(target = LOG_TARGET, error = %e, "Sending entity update to channel.");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohayo sensei! Avoid spawning tasks inside a loop in the poll method.

In the poll method, you're spawning a new task for each entity polled from simple_broker. This could lead to a high number of spawned tasks if entities arrive rapidly, impacting performance.

Consider processing the entities directly without spawning new tasks:

while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
-     let sender = pin.update_sender.clone();
-     tokio::spawn(async move {
-         if let Err(e) = sender.send(entity).await {
-             error!(target = LOG_TARGET, error = %e, "Sending entity update to channel.");
-         }
-     });
+     if let Err(e) = pin.update_sender.try_send(entity) {
+         error!(target = LOG_TARGET, error = %e, "Sending entity update to channel.");
+     }
}

Alternatively, if asynchronous sending is required, batch the entities and process them in a controlled manner.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let pin = self.get_mut();
while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
let cache = Arc::clone(&pin.model_cache);
let pool = pin.pool.clone();
let sender = pin.update_sender.clone();
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
if let Err(e) = sender.send(entity).await {
error!(target = LOG_TARGET, error = %e, "Sending entity update to channel.");
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let pin = self.get_mut();
while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
if let Err(e) = pin.update_sender.try_send(entity) {
error!(target = LOG_TARGET, error = %e, "Sending entity update to channel.");
}
}

Comment on lines +111 to 136
async fn process_updates(
subs: Arc<EntityManager>,
cache: Arc<ModelCache>,
pool: Pool<Sqlite>,
mut update_receiver: Receiver<Entity>,
) {
let mut interval = interval(Duration::from_millis(100));
let mut pending_updates = Vec::new();

loop {
tokio::select! {
_ = interval.tick() => {
if !pending_updates.is_empty() {
for entity in pending_updates.drain(..) {
if let Err(e) = Self::publish_updates(Arc::clone(&subs), Arc::clone(&cache), pool.clone(), &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
}
}
}
}
Some(entity) = update_receiver.recv() => {
pending_updates.push(entity);
}
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohayo sensei! Inefficient processing in process_updates function.

In the process_updates function, you're processing updates every 100 milliseconds using an interval. If the publish_updates function takes longer than the interval duration, it could cause overlapping executions or delays.

Consider adjusting the design to process updates as they arrive or ensuring that publish_updates completes before the next interval tick. Alternatively, you could use a mechanism like tokio::sync::Notify to trigger processing when new entities are added.

Apply this diff to process updates immediately upon receiving them:

- let mut interval = interval(Duration::from_millis(100));
  loop {
-     tokio::select! {
-         _ = interval.tick() => {
-             if !pending_updates.is_empty() {
-                 // Process pending updates
-             }
-         }
-         Some(entity) = update_receiver.recv() => {
-             pending_updates.push(entity);
-         }
-     }
+     if let Some(entity) = update_receiver.recv().await {
+         if let Err(e) = Self::publish_updates(
+             Arc::clone(&subs),
+             Arc::clone(&cache),
+             pool.clone(),
+             &entity
+         ).await {
+             error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
+         }
+     } else {
+         break; // Channel closed
+     }
  }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async fn process_updates(
subs: Arc<EntityManager>,
cache: Arc<ModelCache>,
pool: Pool<Sqlite>,
mut update_receiver: Receiver<Entity>,
) {
let mut interval = interval(Duration::from_millis(100));
let mut pending_updates = Vec::new();
loop {
tokio::select! {
_ = interval.tick() => {
if !pending_updates.is_empty() {
for entity in pending_updates.drain(..) {
if let Err(e) = Self::publish_updates(Arc::clone(&subs), Arc::clone(&cache), pool.clone(), &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
}
}
}
}
Some(entity) = update_receiver.recv() => {
pending_updates.push(entity);
}
}
}
}
async fn process_updates(
subs: Arc<EntityManager>,
cache: Arc<ModelCache>,
pool: Pool<Sqlite>,
mut update_receiver: Receiver<Entity>,
) {
loop {
if let Some(entity) = update_receiver.recv().await {
if let Err(e) = Self::publish_updates(
Arc::clone(&subs),
Arc::clone(&cache),
pool.clone(),
&entity
).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
}
} else {
break; // Channel closed
}
}
}

Copy link

codecov bot commented Sep 19, 2024

Codecov Report

Attention: Patch coverage is 87.17949% with 10 lines in your changes missing coverage. Please review.

Project coverage is 68.42%. Comparing base (5777c00) to head (5cd1fa5).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...ates/torii/grpc/src/server/subscriptions/entity.rs 86.20% 4 Missing ⚠️
...rates/torii/grpc/src/server/subscriptions/event.rs 84.21% 3 Missing ⚠️
...rii/grpc/src/server/subscriptions/event_message.rs 90.00% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2453      +/-   ##
==========================================
+ Coverage   68.37%   68.42%   +0.04%     
==========================================
  Files         365      365              
  Lines       47973    48023      +50     
==========================================
+ Hits        32801    32859      +58     
+ Misses      15172    15164       -8     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Larkooo
Copy link
Collaborator Author

Larkooo commented Sep 19, 2024

Closing this in favor of #2455

@Larkooo Larkooo closed this Sep 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant