Skip to content

[ENH]: make log materialization result owned #3227

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions rust/types/src/data_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,29 @@ pub struct DataRecord<'a> {
pub document: Option<&'a str>,
}

#[derive(Debug, Clone)]
pub struct OwnedDataRecord {
pub id: String,
pub embedding: Vec<f32>,
pub metadata: Option<Metadata>,
pub document: Option<String>,
}

impl<'a> From<&DataRecord<'a>> for OwnedDataRecord {
fn from(data_record: &DataRecord<'a>) -> Self {
let id = data_record.id.to_string();
let embedding = data_record.embedding.to_vec();
let metadata = data_record.metadata.clone();
let document = data_record.document.map(|doc| doc.to_string());
OwnedDataRecord {
id,
embedding,
metadata,
document,
}
}
}

impl DataRecord<'_> {
pub fn get_size(&self) -> usize {
let id_size = self.id.len();
Expand All @@ -28,4 +51,8 @@ impl DataRecord<'_> {
};
id_size + embedding_size + metadata_size + document_size
}

pub fn to_owned(&self) -> OwnedDataRecord {
self.into()
}
}
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operators/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub(crate) struct MetadataLogReader<'me> {
}

impl<'me> MetadataLogReader<'me> {
pub(crate) fn new(logs: &'me Chunk<MaterializedLogRecord<'me>>) -> Self {
pub(crate) fn new(logs: &'me Chunk<MaterializedLogRecord>) -> Self {
let mut compact_metadata: HashMap<_, BTreeMap<&MetadataValue, RoaringBitmap>> =
HashMap::new();
let mut document = HashMap::new();
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operators/hnsw_knn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl ChromaError for HnswKnnOperatorError {
impl HnswKnnOperator {
async fn get_disallowed_ids<'referred_data>(
&self,
logs: Chunk<MaterializedLogRecord<'_>>,
logs: Chunk<MaterializedLogRecord>,
record_segment_reader: &RecordSegmentReader<'_>,
) -> Result<Vec<u32>, Box<dyn ChromaError>> {
let mut disallowed_ids = Vec::new();
Expand Down
4 changes: 2 additions & 2 deletions rust/worker/src/segment/distributed_hnsw_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,10 @@ impl DistributedHNSWSegmentWriter {
}
}

impl<'a> SegmentWriter<'a> for DistributedHNSWSegmentWriter {
impl SegmentWriter for DistributedHNSWSegmentWriter {
async fn apply_materialized_log_chunk(
&self,
records: chroma_types::Chunk<super::MaterializedLogRecord<'a>>,
records: chroma_types::Chunk<super::MaterializedLogRecord>,
) -> Result<(), ApplyMaterializedLogError> {
for (record, _) in records.iter() {
match record.final_operation {
Expand Down
12 changes: 8 additions & 4 deletions rust/worker/src/segment/metadata_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,16 +530,20 @@ impl<'me> MetadataSegmentWriter<'me> {
}
}

impl<'log_records> SegmentWriter<'log_records> for MetadataSegmentWriter<'_> {
impl SegmentWriter for MetadataSegmentWriter<'_> {
async fn apply_materialized_log_chunk(
&self,
records: Chunk<MaterializedLogRecord<'log_records>>,
records: Chunk<MaterializedLogRecord>,
) -> Result<(), ApplyMaterializedLogError> {
let mut count = 0u64;
let full_text_writer_batch = records.iter().filter_map(|record| {
let offset_id = record.0.offset_id;
let old_document = record.0.data_record.as_ref().and_then(|r| r.document);
let new_document = record.0.final_document;
let old_document = record
.0
.data_record
.as_ref()
.and_then(|r| r.document.as_deref());
let new_document = &record.0.final_document;

if matches!(
record.0.final_operation,
Expand Down
26 changes: 17 additions & 9 deletions rust/worker/src/segment/record_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ pub enum RecordSegmentWriterCreationError {
}

impl RecordSegmentWriter {
async fn construct_and_set_data_record<'a>(
async fn construct_and_set_data_record(
&self,
mat_record: &MaterializedLogRecord<'a>,
mat_record: &MaterializedLogRecord,
user_id: &str,
offset_id: u32,
) -> Result<(), ApplyMaterializedLogError> {
Expand Down Expand Up @@ -337,10 +337,10 @@ impl ChromaError for ApplyMaterializedLogError {
}
}

impl<'a> SegmentWriter<'a> for RecordSegmentWriter {
impl SegmentWriter for RecordSegmentWriter {
async fn apply_materialized_log_chunk(
&self,
records: Chunk<MaterializedLogRecord<'a>>,
records: Chunk<MaterializedLogRecord>,
) -> Result<(), ApplyMaterializedLogError> {
// The max new offset id introduced by materialized logs is initialized as zero
// Since offset id should start from 1, we use this to indicate no new offset id
Expand All @@ -357,7 +357,11 @@ impl<'a> SegmentWriter<'a> for RecordSegmentWriter {
.user_id_to_id
.as_ref()
.unwrap()
.set::<&str, u32>("", log_record.user_id.unwrap(), log_record.offset_id)
.set::<&str, u32>(
"",
log_record.user_id.as_ref().unwrap(),
log_record.offset_id,
)
.await
{
Ok(()) => (),
Expand All @@ -370,7 +374,11 @@ impl<'a> SegmentWriter<'a> for RecordSegmentWriter {
.id_to_user_id
.as_ref()
.unwrap()
.set::<u32, String>("", log_record.offset_id, log_record.user_id.unwrap().to_string())
.set::<u32, String>(
"",
log_record.offset_id,
log_record.user_id.clone().unwrap(),
)
.await
{
Ok(()) => (),
Expand All @@ -382,7 +390,7 @@ impl<'a> SegmentWriter<'a> for RecordSegmentWriter {
match self
.construct_and_set_data_record(
log_record,
log_record.user_id.unwrap(),
log_record.user_id.as_ref().unwrap(),
log_record.offset_id,
)
.await
Expand Down Expand Up @@ -415,7 +423,7 @@ impl<'a> SegmentWriter<'a> for RecordSegmentWriter {
match self
.construct_and_set_data_record(
log_record,
log_record.data_record.as_ref().unwrap().id,
&log_record.data_record.as_ref().unwrap().id,
log_record.offset_id,
)
.await
Expand All @@ -432,7 +440,7 @@ impl<'a> SegmentWriter<'a> for RecordSegmentWriter {
.user_id_to_id
.as_ref()
.unwrap()
.delete::<&str, u32>("", log_record.data_record.as_ref().unwrap().id)
.delete::<&str, u32>("", &log_record.data_record.as_ref().unwrap().id)
.await
{
Ok(()) => (),
Expand Down
Loading
Loading