-
Notifications
You must be signed in to change notification settings - Fork 494
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Porting persistence layer to Rust #705
Comments
Hi @twitu As always, its a pleasure reading your detailed write up. Interface layerYour statement here is also exactly what I was thinking, as we're currently handling filtering in pure Python as it is anyway:
Consumer layerThen as far as the options, I understand things are not as simple as read all structs in Rust, pass back to Cython and done. Due to the flow of the batch processing, there are lifetime issues around the file handle in Rust. One thing I can say is we should try wherever possible at this point to avoid calling Python from Rust, as it really complicates the Rust libraries interaction with the Python runtime. The hope is as more of the system moves to Rust we just have less and less need for Cython wrappers, and more Rust calling Rust. In this case we're looking at Option 2, keeping the flow on the Python side for now. I think boxing the reader is completely reasonable and shouldn't matter performance wise. Moving forward
|
Yeah no problem, please go ahead with merging #699. I've updated the PR with the latest changes. I'll continue using I missed one additional detail in the above comment related to Interface layer projections. The problem with projections is that the projected values need to correspond to the fields of a struct. Suppose for e.g. struct QuoteTick has 4 fields but only 3 of them are projected from parquet, then the decoder method will have to fill in the 4th field with a default value. If it's not implemented in such a way then it will fail to construct quote ticks and the parquet reader will fail. Now more generally any combination of projected fields is possible but the decoder can only handle decoding the fetched columns to one structure which has been defined in the trait implementation. This means valid projections are directly tied to the decoder implementation which will have to convert the projected columns into structs. This will only be a problem if users try to use the same parquet reader with different projections. Do you see something like this happening? What are the common use cases for projections currently? I'll look at implementing option 2 i.e ParquetReader with Pin Box so that it can be passed around. On the Python side it will behave as an iterator over a list of python objects (which are rust structs converted to python objects). @ghill2 do you want to take a look at implementing this wrapper as a python iterator. It might look something like this.
|
Thanks @twitu I've merged #699 and will have a tinker to see how best to integrate with Cython from the Python catalog level. Will wait to hear from @ghill2 here or on Discord re the iterator design you mentioned. For now we don't need to handle projections, it could be a future feature if it's valuable to enough users. |
Thanks for the detailed study @twitu |
#718 completes the port and has examples, tests and patterns for the integration. Consumer layerThe consumer layer problem was not difficult to solve and actually did not require Pin Box complexity. I missed the fact that the underlying parquet reader takes a type that implements The current solution is the same as option 2 without Pin Box.
Generic FFIThe more important challenge was to do ffi without creating a huge number of functions specializing readers and vectors for each possible data type. This solved by dropping to c level and using raw pointers. The technique is powerful and is useful in multiple places but still offers a degree of safety. The key idea is that reference to a heap allocated value can be a coerced into a void pointer (i.e. a non-type specific pointer). And it's memory can be "leaked" as in we can tell rust not to drop it. This makes it possible to pass the data as a pointer to python side. However to ensure that memory is not truly leaked, python needs to pass back the pointer to Rust (after the work is done) so that it can be freed/dropped. Now comes the hairy part, when the pointer is returned to Rust. It does not know what is the type of the data the pointer is pointing to. Here Python also needs to pass a discriminant (an enum) so that rust can match on in it and coerce the void pointer to a pointer of the correct type and then drop it. These are the signatures involved #[repr(C)]
pub enum ParquetReaderType {
QuoteTick,
}
pub unsafe extern "C" fn parquet_reader_new(
file_path: PyObject,
reader_type: ParquetReaderType,
) -> *mut c_void;
pub unsafe extern "C" fn parquet_reader_drop(reader: *mut c_void, reader_type: ParquetReaderType);
pub unsafe extern "C" fn parquet_reader_next_chunk(
reader: *mut c_void,
reader_type: ParquetReaderType,
) -> CVec;
pub unsafe extern "C" fn parquet_reader_drop_chunk(chunk: CVec, reader_type: ParquetReaderType); The key idea being that Python tells Rust what type it needs, a void pointer is returned by the function, then Python type casts it to a suitable type since it knows what type is needed at that point. We can control the degree of unsafeness by allowing Python to pass Rust enums to tell it the type of the underlying pointer. This will locally restrict the setup of possible values to only a few types. In the case of The below diagram illustrates the control flow. sequenceDiagram
participant P as Python
participant R as Rust
P->>R: parquet_reader_new(ReaderType::QuoteTick)
R->>P: *mut c_void (pointer to leaked ParquetReader<QuoteTick>)
P->>R: parquet_reader_next_chunk(*mut c_void, ReaderType::QuoteTick)
R->>P: *mut c_void (pointer to leaked underlying QuoteTick vec)
P->>R: parquet_reader_drop_chunk(*mut c_void, ReaderType::QuoteTick)
P->>R: parquet_reader_drop(*mut c_void, ReaderType::QuoteTick)
|
Great work as always @twitu Ishan 👏 So my understand is this efficient parquet reading and writing at the Rust level will be available for the standard built-in types initially. Users implementing their own custom data types would have to make do with the pure Python implementation for now, or implement the We'll move forward with the integration from the Python side soon. |
In recent changes we've made
Currenlty we specialize For The functionality for reader and writer is exposed through unsafe FFI functions. The ones below are for Cython but with a few changes they can work with Python too. /// # Safety
/// - Assumes `file_path` is a valid `*mut ParquetReader<QuoteTick>`.
#[no_mangle]
pub unsafe extern "C" fn parquet_reader_file_new(
file_path: *mut ffi::PyObject,
parquet_type: ParquetType,
chunk_size: usize,
// group_filter_arg: GroupFilterArg, TODO: Comment out for now
) -> *mut c_void
/// # Safety
/// - Assumes `data` is a valid CVec with an underlying byte buffer.
#[no_mangle]
pub unsafe extern "C" fn parquet_reader_buffer_new(
data: CVec,
parquet_type: ParquetType,
chunk_size: usize,
// group_filter_arg: GroupFilterArg, TODO: Comment out for now
) -> *mut c_void
/// # Safety
/// - Assumes `reader` is a valid `*mut ParquetReader<Struct>` where the struct.
/// has a corresponding [ParquetType] enum.
#[no_mangle]
pub unsafe extern "C" fn parquet_reader_free(
reader: *mut c_void,
parquet_type: ParquetType,
reader_type: ParquetReaderType,
)
/// # Safety
/// - Assumes `reader` is a valid `*mut ParquetReader<Struct>` where the struct.
/// has a corresponding ParquetType enum.
#[no_mangle]
pub unsafe extern "C" fn parquet_reader_next_chunk(
reader: *mut c_void,
parquet_type: ParquetType,
reader_type: ParquetReaderType,
) -> CVec
/// # Safety
/// - Assumes `chunk` is a valid `ptr` pointer to a contiguous array.
#[no_mangle]
pub unsafe extern "C" fn parquet_reader_drop_chunk(chunk: CVec, parquet_type: ParquetType)
pub struct CVec {
/// Opaque pointer to block of memory storing elements to access the
/// elements cast it to the underlying type.
pub ptr: *mut c_void,
/// The number of elements in the block.
#[pyo3(get, set)]
pub len: usize,
/// The capacity of vector from which it was allocated.
/// Used when deallocating the memory
pub cap: usize,
}
/// ParquetWriter is generic for any writer however for ffi it only supports
/// byte buffer writers. This is so that the byte buffer can be returned after
/// the writer is ended.
///
/// # Safety
/// - Assumes `file_path` is borrowed from a valid Python UTF-8 `str`.
/// - Assumes `metadata` is borrowed from a valid Python `dict`.
#[no_mangle]
pub unsafe extern "C" fn parquet_writer_new(
parquet_type: ParquetType,
metadata: *mut ffi::PyObject,
) -> *mut c_void
/// Writer is flushed, consumed and dropped. The underlying writer is returned.
/// While this is generic for FFI it only considers and returns a vector of bytes
/// if the underlying writer is anything else it will fail.
///
/// # Safety
/// - Assumes `writer` is a valid `*mut ParquetWriter<Struct>` where the struct
/// has a corresponding `ParquetType` enum.
#[no_mangle]
pub unsafe extern "C" fn parquet_writer_flush(
writer: *mut c_void,
parquet_type: ParquetType,
) -> CVec
#[no_mangle]
/// # Safety
/// - Assumes `writer` is a valid `*mut ParquetWriter<Struct>` where the struct
/// has a corresponding ParquetType enum.
/// - Assumes `data` is a non-null valid pointer to a contiguous block of
/// C-style structs with `len` number of elements
pub unsafe extern "C" fn parquet_writer_write(
writer: *mut c_void,
parquet_type: ParquetType,
data: *mut c_void,
len: usize,
)
Because these functions deal with pointers, they are inherently unsafe and have a few invariants to maintain.
Still there are currently a few cases where the interaction with Cython/Python segfaults -
Both these segfaults are where Python and Rust interface with lists or buffers of items. We need a robust way to interconvert between them.
One good point of this approach is that it can easily be removed when the Rust <-> Python interface is no longer needed. |
The next step is to reduce the amount of Cython code by implementing logic as a pyo3 module. We do this by moving the reader/writer logic and interface to Rust using Pyo3. The current version has the interfacing for reader and writer implemented in Cython which we move to Rust as a pyo3 module. However we still need a thin layer of Cython to convert the data passed from Rust. The data is a buffer of Rust structs which need to be converted back to Cython objects, and this can be done only in Cython because it can directly interface with pointers. There is one additional challenge. Since the control flow is in Python, we need to pass the buffer of structs through Python into Cython. However there is no way to manipulate pointers in Python directly. Here's an illustration of the problem. The "HOW??" is solved using PyCapsule12 which are opaque objects meant to transfer data among Cython extensions through Python. These changes are implemented in this PR: #938 Footnotes |
Here's a high level picture of the current state. Firstly what is the source and sink of data. The source is a bunch of partitioned parquet files and the sink is an engine that can consume a list python objects. We want to go from source to sink efficiently. The current rust implementation of parquet reader (using cython) improves on the python implementation by almost 25 times. Which looks something like this. But the reader currently only works for one file. So we're implementing an abstraction on top of this that can read from multiple files in #986. This catalog unifies/splices together the stream from multiple readers, filters based on multiple conditions and feeds it into the engine. However as this recent issue #939 showed a major factor in performance is good filtering. The current filtering has limited filtering abilities. All the filtering is done after the parquet has been read, decoded and converted to python objects. This is incredibly inefficient because of a lot of wasted io operations, and allocations. To make this comparably efficient we need to push the filtering down as much as possible. Ideally down to the io level where unneeded pages are not read. The rust parquet reader supports this in a limited way, by allowing filters on the ts_init field. However we need to support a much richer query syntax that users are more familiar with. Here as @limx0 pointed out we should consider the additional maintenance burden as well. And that since this is a solved problem we should consider leveraging other libraries like datafusion. This will require a change in the conversion layer. Currently parquet reader implemented on with arrow2 converts a |
So my understanding is: Incorporating a DataFusion type solution will improve the efficiency of query/filtering and reading bytes up from disk. We would then still stream these |
Yes datafusion or similar libraries will help with fully featured query/filtering without additional maintenance burden. Cython PyCapsule approach will have to stay for now, since we need to convert rust structs to python objects. The exact conversion will depend on what the library returns. For e.g. the arrow2 library returns a chunk of arrays (i.e. a row group of columns) which we're decoding in this trait. This might or might not have to change depending on the library. pub trait DecodeFromChunk
where
Self: Sized,
{
fn decode(schema: &Schema, cols: Chunk<Box<dyn Array>>) -> Vec<Self>;
} |
Yes it goes through with all files (to read the filter columns), although when parititoning is specified it shall look inside that Partition only otherwise there is no reasonable purpose of Partitions. I am not sure if we are missing any parameter during query or it is actually a bug with I have done testing using DataFusion in Rust, just now and it tries to load all files in the main directory |
You're right this is a sore point with the datafusion library. Push down predicates aren't working well. However the good thing is the maintainers are aware of this and are working on it. Here's the tracking issue for enabling push down predicates - apache/datafusion#3463. There are some additional performance improvements being discussed here - apache/datafusion#4028. Hopefully this will get merged soon and they'll we'll be see performance at par with more mature implementations. |
Based on the guiding principles in #991 the catalog v2 is focusing on performance, usability and most importantly reduced maintenance burden. To that end this version aims to port the whole persistence read layer to Rust. Here's an overview starting from the consumer - the nautilus
This pipeline can be represented by this diagram. |
These are the structs for the query engine and query results implemented in #997. The structs also implement pyo3's pyclass and expose relevant python methods as well. The query engine takes a file path and retrieves all it's records or it takes a file and a custom sql query to run on it. The queries are not executed immediately. Once all the relevant queries have been registered, you can get the QueryResult. pub struct PersistenceCatalog {
session_ctx: SessionContext,
batch_streams: Vec<Box<dyn Stream<Item = IntoIter<Data>> + Unpin>>,
}
impl PersistenceCatalog {
// query a file for all it's records. the caller must specify `T` to indicate
// what kind of data is expected from this query.
pub async fn add_file<T>(&mut self, table_name: &str, file_path: &str) -> Result<()>
where
T: DecodeDataFromRecordBatch + Into<Data>;
// query a file for all it's records with a custom query
// The query should ensure the records are ordered by the
// ts_init field in ascending order. the caller must specify `T` to indicate
// what kind of data is expected from this query.
pub async fn add_file_with_query<T>(
&mut self,
table_name: &str,
file_path: &str,
sql_query: &str,
) -> Result<()>
where
T: DecodeDataFromRecordBatch + Into<Data>;
// Return the query result for the all the queries made uptil now
pub fn get_query_result(&mut self) -> QueryResult;
} The QueryResult is an iterator and the queries are actually executed only when the next batch of data is pulled from the QueryResult. Behind the scenes, QueryResult merges the results from all the queries into sorted batches of data using a heap based k stream merge. pub struct QueryResult {
data: Box<dyn Stream<Item = Vec<Data>> + Unpin>,
}
impl Iterator for QueryResult {
type Item = Vec<Data>;
fn next(&mut self) -> Option<Vec<Data>>;
} |
Here's a test that demonstrates how this interface can be used. It can also be found in use nautilus_model::data::tick::{Data, QuoteTick};
use nautilus_persistence::session::{PersistenceCatalog, QueryResult};
#[tokio::test]
async fn test_quote_ticks() {
let mut catalog = PersistenceCatalog::default();
catalog
.add_file::<QuoteTick>(
"quote_tick",
"../../tests/test_data/quote_tick_data.parquet",
)
.await
.unwrap();
catalog
.add_file::<QuoteTick>(
"quote_tick_2",
"../../tests/test_data/quote_tick_data.parquet",
)
.await
.unwrap();
let query_result: QueryResult<Data> = catalog.to_query_result();
// NOTE: is_sorted_by_key is unstable otherwise use
// ticks.is_sorted_by_key(|tick| tick.ts_init)
// https://github.com/rust-lang/rust/issues/53485
let is_ascending_by_init = |ticks: &Vec<Data>| {
for i in 1..ticks.len() {
// previous tick is more recent than current tick
// this is not ascending order
if ticks[i - 1].get_ts_init() > ticks[i].get_ts_init() {
return false;
}
}
true
};
let ticks: Vec<Data> = query_result.flatten().collect();
match &ticks[0] {
Data::Trade(_) => assert!(false),
Data::Quote(q) => assert_eq!("EUR/USD.SIM", q.instrument_id.to_string()),
}
assert_eq!(ticks.len(), 19000);
assert!(is_ascending_by_init(&ticks));
} |
Benchmarking results
The python tests are expected to take slightly more time because of the overhead of initializing python objects and acquiring GIL. The multi-stream python bench requires 8+ gb of ram and could not be run. An appropriate measure for the catalog is its throughput which can be measured in terms of ticks per second.
|
This issue is to track the discussion and PRs for this large-ish port of the persistence layer into Rust.
Parquet layer
In the parquet layer, the key component here is the parquet reader/writer that reads/writes to arrow format which gets converted to/from chunks of a Rust structs. The current working example in #699 introduces the following traits and structs. They are parameterized over a lifetime (explained below) and the struct which they are decoding encoding.
Interface layer
We need another layer on top of this to interface with Python/Cython bindings. Current persistence layer allows passing queries that project certain columns and filter certain rows. Since the parquet file reader is now provided by the rust arrow2 library, the python project filter expressions will have to be converted in this interfacing layer so that parquet layer (above) can understand it.
This is the function that creates a parquet file reader.
Python style projections must be converted to a
Option<&[usize]>
i.e. a slice containing indexes of columns that the reader should expose. This part is related to (WIP) Rust Catalog: Expression filter #700. @ghill2 you can consider adding this conversion function to your PR.Filtering is more involved as it needs to be passed a Rust function that returns true or false to accept or reject a row. pyo3 docs offers no way of converting a python function to a rust function. And logically to it sounds like a hard problem to interconvert functions between languages. The only way to do this is to decide on the set of required filter operations which can be passed to Rust as an enums and then be converted into actual functions. @cjdsellers we'll need your help here to figure if we can indeed narrow down such a set of operations.
On the other hand, a simpler (less performant) solution is to simply fetch all the rows and filter them on the python side. It's a valid workaround if there's no better solution. I think we can go with this approach for now, benchmark and see the performance number before adding a layer of complexity.
Consumer layer
Next we come to problem of actually consuming the vectors. This problem is specific to how
ParquetReader
will be used and will dictate the control flow.Option 1: Pass strategy to Rust
This problem arises because of the lifetime parameter
'a
onParquetReader
. This happens because the underlying arrow2 parquet file reader, takes the reference to a file i.e. it can only exist in the scope the file has been created. Now sinceParquetReader
is consumed as an iterator. The consuming logic must also exist within the scope of the file. Here's a diagram to demonstrate.In code, at a high level it would look something like this.
This is the simplest approach and puts the entire control flow within rust, once the file name and strategy has been passed to it.
Option 2: Pass parquet reader to Python
We want to the control flow to stay on the Python side. In this case we'll need to use
Pin
to create a self-referential structure forParquetReader
. This is because we'll need to keep the file and the arrow2 file reader which depends on the reference to file being alive together. It'll look roughly like this.This struct can then be wrapped in a Box and then passed over ffi. In this approach we'll need to implement an ffi
next
method forParquetReader
that calls the internalnext
method of the iterator. It'll also need to convert the returned vector into something ffi safe. Here @ghill2 has already done the vector conversion work in #699.Non-exhaustive list of helpful reads:
Passing generics across ffi
Finally we come to the issue of polymorphic structure crossing the ffi boundaries. Currently the generic functions need to be mangled, i.e. uniquely named for each possible type parameter we want to support. The below example does not work. It will have to be uniquely named.
However because lifetimes are special kind of generic parameter where mangling cannot help. We cannot send the ParquetReader across the ffi boundary without implementing option 2 given above.
#700 already has some work done in passing vectors of structs across ffi boundaries
TL;DR
To complete the port, we need to figure out the following -
The text was updated successfully, but these errors were encountered: