Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to send generated readers across threads #479

Open
f1recracker opened this issue Jan 27, 2024 · 3 comments
Open

How to send generated readers across threads #479

f1recracker opened this issue Jan 27, 2024 · 3 comments

Comments

@f1recracker
Copy link

f1recracker commented Jan 27, 2024

Hello,

Thank you for this project! Also, apologies if this is a duplicate - I've found similar questions but I'm not quite sure if we're solving the same problem.

So I'm writing a program that consists of two threads:

  • Thread 1 deserializes data that's read from a socket, and stores some state in a shared collection. Note that the state written need not be the root of the message.
  • Thread 2 reads data from the shared collection.

I've been able to make an example of this using rust-native structs, but I just can't seem to figure out how to move to capnproto generated code. Here's a minimal example using the addressbook schema.

struct People {
    id: u32,
    // ...
}

pub fn main() {
    let result_map = std::sync::Arc::new(dashmap::DashMap::<String, People>::new());
    {
        // Thread 1
        let result_map = result_map.clone();
        let mut buffer = vec![];
        {
            let builder = build_address_book();
            serialize_packed::write_message(&mut buffer, &builder).unwrap();
        }
        {
            let reader = serialize_packed::read_message(
                std::io::Cursor::new(buffer), capnp::message::ReaderOptions::new()).unwrap();
            let address_book = reader.get_root::<address_book::Reader>().unwrap();
            let _people = address_book.get_people().unwrap().get(0);
            // How to avoid using a rust-native struct here and use `_people`?
            result_map.insert("foo".into(), People { id: _people.get_id() });

        }
    }
    {
        // Thread 2
        let result_map = result_map.clone();
        std::thread::spawn(move || {
            loop {
                match result_map.get("foo") {
                    Some(person) => {
                        println!("Person: {:?}", person.id)
                    },
                    None => {
                        std::thread::sleep(std::time::Duration::from_secs(1));
                    }
                }
            }
        });
    }
}

Specifically I have two questions:

  • How do I pass around generated readers (and any associated buffers) into collections to Send between threads?

  • Does this access pattern make sense in a capnproto world? Essentially I'm trying to manage the lifecycle of the buffer and associated Readers / Builders to provide views over an Arc<RwLock<_>>'d buffer.

Thank you!

@dwrensha
Copy link
Member

You can't put _people into result_map because the lifetime of _people is scoped to the borrow at

let address_book = reader.get_root::<address_book::Reader>().unwrap();

I don't know of a good way to avoid copying the data in this situation.

One thing you can do is have your shared hashmap store values of type message::TypedBuilder<People::owned, HeapAllocator>. You would fill each such value via set_root(), which would copy the data, but should be fast.

/// Stongly typed variant of the [Builder]
///
/// Generic type parameters:
/// - `T` - type of the capnp message which this builder is specialized on. Please see
/// [module documentation](self) for more info about builder type specialization.
/// - `A` - type of allocator
#[cfg(feature = "alloc")]
pub struct TypedBuilder<T, A = HeapAllocator>
where
T: Owned,
A: Allocator,
{
marker: ::core::marker::PhantomData<T>,
message: Builder<A>,
}

To get this approach to perform optimally, you'll probably also need to adjust the segment size of the allocator given to message::TypedBuilder, so that it's just big enough to store _people. The _people.total_size() method should help with this.

@f1recracker
Copy link
Author

f1recracker commented Jan 30, 2024

Thank you! This was indeed helpful. I also had to wrap the TypedBuilder in an Arc<Mutex<>> to make it Sync. This does have some overhead versus rust-native structs as expected.

I think I have a clearer vision of what I was originally hoping to achieve. I might create a SharedReader for write-once-read-many use-cases, that contains:

  • An Arc'd buffer of the full message. I'm trying to do away with a mutex and make the buffer immutable.
  • A "pointer" (eg address_book->persons->[0] would reference the first person).

This is roughly the api I have in mind:

// Producer 
let builder = build_address_book();
let mut buffer = Vec::new();
serialize_packed::write_message(&mut buffer, &builder).unwrap();
let buffer = Arc::new(buffer);

let person_0_ptr = addressbook::root_ptr().get_people().get(0);
let person_0_ref = SharedReader::new(buffer.clone(), person_0_ptr); // Send + Sync + Clone

let person_1_id_ptr = addressbook::root_ptr().get_people().get(1).get_id();
let person_1_id_ref = SharedReader::new(buffer.clone(), person_1_id_ptr); // Send + Sync + Clone

// Consumer 1
let person = person_0_ref.value();
foo(person.get_name(), person.get_id());

// Consumer 2
let person = person_1_id_ref.value();
...

I have a working implementation albeit without the pointers so clients explicitly need to dereference objects they're interested in. However, from a separation of concern perspective, I'm hoping to set this at the producer level.

I'm going to try see if I can hack something together after looking at your repository.

Also, out of curiosity, do you think this might be a good addition to capnp's API?

@tv42
Copy link

tv42 commented Feb 1, 2024

I believe this is a duplicate of #256 (though with maintainer response!).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants