Skip to content

Commit e198444

Browse files
committed
Make data_record owned
1 parent e72a222 commit e198444

File tree

7 files changed

+63
-34
lines changed

7 files changed

+63
-34
lines changed

rust/types/src/data_record.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,29 @@ pub struct DataRecord<'a> {
1010
pub document: Option<&'a str>,
1111
}
1212

13+
#[derive(Debug, Clone)]
14+
pub struct OwnedDataRecord {
15+
pub id: String,
16+
pub embedding: Vec<f32>,
17+
pub metadata: Option<Metadata>,
18+
pub document: Option<String>,
19+
}
20+
21+
impl<'a> From<&DataRecord<'a>> for OwnedDataRecord {
22+
fn from(data_record: &DataRecord<'a>) -> Self {
23+
let id = data_record.id.to_string();
24+
let embedding = data_record.embedding.to_vec();
25+
let metadata = data_record.metadata.clone();
26+
let document = data_record.document.map(|doc| doc.to_string());
27+
OwnedDataRecord {
28+
id,
29+
embedding,
30+
metadata,
31+
document,
32+
}
33+
}
34+
}
35+
1336
impl DataRecord<'_> {
1437
pub fn get_size(&self) -> usize {
1538
let id_size = self.id.len();
@@ -28,4 +51,8 @@ impl DataRecord<'_> {
2851
};
2952
id_size + embedding_size + metadata_size + document_size
3053
}
54+
55+
pub fn to_owned(&self) -> OwnedDataRecord {
56+
self.into()
57+
}
3158
}

rust/worker/src/execution/operators/filter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ pub(crate) struct MetadataLogReader<'me> {
106106
}
107107

108108
impl<'me> MetadataLogReader<'me> {
109-
pub(crate) fn new(logs: &'me Chunk<MaterializedLogRecord<'me>>) -> Self {
109+
pub(crate) fn new(logs: &'me Chunk<MaterializedLogRecord>) -> Self {
110110
let mut compact_metadata: HashMap<_, BTreeMap<&MetadataValue, RoaringBitmap>> =
111111
HashMap::new();
112112
let mut document = HashMap::new();

rust/worker/src/execution/operators/hnsw_knn.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ impl ChromaError for HnswKnnOperatorError {
6565
impl HnswKnnOperator {
6666
async fn get_disallowed_ids<'referred_data>(
6767
&self,
68-
logs: Chunk<MaterializedLogRecord<'_>>,
68+
logs: Chunk<MaterializedLogRecord>,
6969
record_segment_reader: &RecordSegmentReader<'_>,
7070
) -> Result<Vec<u32>, Box<dyn ChromaError>> {
7171
let mut disallowed_ids = Vec::new();

rust/worker/src/segment/distributed_hnsw_segment.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,10 +238,10 @@ impl DistributedHNSWSegmentWriter {
238238
}
239239
}
240240

241-
impl<'a> SegmentWriter<'a> for DistributedHNSWSegmentWriter {
241+
impl SegmentWriter for DistributedHNSWSegmentWriter {
242242
async fn apply_materialized_log_chunk(
243243
&self,
244-
records: chroma_types::Chunk<super::MaterializedLogRecord<'a>>,
244+
records: chroma_types::Chunk<super::MaterializedLogRecord>,
245245
) -> Result<(), ApplyMaterializedLogError> {
246246
for (record, _) in records.iter() {
247247
match record.final_operation {

rust/worker/src/segment/metadata_segment.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -530,15 +530,19 @@ impl<'me> MetadataSegmentWriter<'me> {
530530
}
531531
}
532532

533-
impl<'log_records> SegmentWriter<'log_records> for MetadataSegmentWriter<'_> {
533+
impl SegmentWriter for MetadataSegmentWriter<'_> {
534534
async fn apply_materialized_log_chunk(
535535
&self,
536-
records: Chunk<MaterializedLogRecord<'log_records>>,
536+
records: Chunk<MaterializedLogRecord>,
537537
) -> Result<(), ApplyMaterializedLogError> {
538538
let mut count = 0u64;
539539
let full_text_writer_batch = records.iter().filter_map(|record| {
540540
let offset_id = record.0.offset_id;
541-
let old_document = record.0.data_record.as_ref().and_then(|r| r.document);
541+
let old_document = record
542+
.0
543+
.data_record
544+
.as_ref()
545+
.and_then(|r| r.document.as_ref().map(|d| d.as_str()));
542546
let new_document = &record.0.final_document;
543547

544548
if matches!(

rust/worker/src/segment/record_segment.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@ pub enum RecordSegmentWriterCreationError {
6161
}
6262

6363
impl RecordSegmentWriter {
64-
async fn construct_and_set_data_record<'a>(
64+
async fn construct_and_set_data_record(
6565
&self,
66-
mat_record: &MaterializedLogRecord<'a>,
66+
mat_record: &MaterializedLogRecord,
6767
user_id: &str,
6868
offset_id: u32,
6969
) -> Result<(), ApplyMaterializedLogError> {
@@ -337,10 +337,10 @@ impl ChromaError for ApplyMaterializedLogError {
337337
}
338338
}
339339

340-
impl<'a> SegmentWriter<'a> for RecordSegmentWriter {
340+
impl SegmentWriter for RecordSegmentWriter {
341341
async fn apply_materialized_log_chunk(
342342
&self,
343-
records: Chunk<MaterializedLogRecord<'a>>,
343+
records: Chunk<MaterializedLogRecord>,
344344
) -> Result<(), ApplyMaterializedLogError> {
345345
// The max new offset id introduced by materialized logs is initialized as zero
346346
// Since offset id should start from 1, we use this to indicate no new offset id
@@ -423,7 +423,7 @@ impl<'a> SegmentWriter<'a> for RecordSegmentWriter {
423423
match self
424424
.construct_and_set_data_record(
425425
log_record,
426-
log_record.data_record.as_ref().unwrap().id,
426+
&log_record.data_record.as_ref().unwrap().id,
427427
log_record.offset_id,
428428
)
429429
.await
@@ -440,7 +440,7 @@ impl<'a> SegmentWriter<'a> for RecordSegmentWriter {
440440
.user_id_to_id
441441
.as_ref()
442442
.unwrap()
443-
.delete::<&str, u32>("", log_record.data_record.as_ref().unwrap().id)
443+
.delete::<&str, u32>("", &log_record.data_record.as_ref().unwrap().id)
444444
.await
445445
{
446446
Ok(()) => (),

rust/worker/src/segment/types.rs

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use chroma_error::{ChromaError, ErrorCodes};
33
use chroma_types::{
44
Chunk, DataRecord, DeletedMetadata, LogRecord, MaterializedLogOperation, Metadata,
55
MetadataDelta, MetadataValue, MetadataValueConversionError, Operation, OperationRecord,
6-
UpdateMetadata, UpdateMetadataValue,
6+
OwnedDataRecord, UpdateMetadata, UpdateMetadataValue,
77
};
88
use std::collections::{HashMap, HashSet};
99
use std::sync::atomic::AtomicU32;
@@ -113,10 +113,10 @@ impl ChromaError for LogMaterializerError {
113113
}
114114

115115
#[derive(Debug, Clone)]
116-
pub struct MaterializedLogRecord<'referred_data> {
116+
pub struct MaterializedLogRecord {
117117
// This is the data record read from the record segment for this id.
118118
// None if the record exists only in the log.
119-
pub(crate) data_record: Option<DataRecord<'referred_data>>,
119+
pub(crate) data_record: Option<OwnedDataRecord>,
120120
// If present in the record segment then it is the offset id
121121
// in the record segment at which the record was found.
122122
// If not present in the segment then it is the offset id
@@ -157,7 +157,7 @@ pub struct MaterializedLogRecord<'referred_data> {
157157
pub(crate) final_embedding: Option<Vec<f32>>,
158158
}
159159

160-
impl<'referred_data> MaterializedLogRecord<'referred_data> {
160+
impl MaterializedLogRecord {
161161
// Performs a deep copy of the document so only use it if really
162162
// needed. If you only need a reference then use merged_document_ref
163163
// defined below.
@@ -170,7 +170,7 @@ impl<'referred_data> MaterializedLogRecord<'referred_data> {
170170
return match self.final_document.clone() {
171171
Some(doc) => Some(doc),
172172
None => match self.data_record.as_ref() {
173-
Some(data_record) => data_record.document.map(|doc| doc.to_string()),
173+
Some(data_record) => data_record.document.clone(),
174174
None => None,
175175
},
176176
};
@@ -185,10 +185,7 @@ impl<'referred_data> MaterializedLogRecord<'referred_data> {
185185
return match &self.final_document {
186186
Some(doc) => Some(doc),
187187
None => match self.data_record.as_ref() {
188-
Some(data_record) => match data_record.document {
189-
Some(doc) => Some(doc),
190-
None => None,
191-
},
188+
Some(data_record) => data_record.document.as_deref(),
192189
None => None,
193190
},
194191
};
@@ -211,7 +208,7 @@ impl<'referred_data> MaterializedLogRecord<'referred_data> {
211208
match &self.user_id {
212209
Some(id) => id.as_str(),
213210
None => match &self.data_record {
214-
Some(data_record) => data_record.id,
211+
Some(data_record) => &data_record.id,
215212
None => panic!("Expected at least one user id to be set"),
216213
},
217214
}
@@ -247,7 +244,7 @@ impl<'referred_data> MaterializedLogRecord<'referred_data> {
247244
final_metadata
248245
}
249246

250-
pub(crate) fn metadata_delta(&'referred_data self) -> MetadataDelta<'referred_data> {
247+
pub(crate) fn metadata_delta(&self) -> MetadataDelta<'_> {
251248
let mut metadata_delta = MetadataDelta::new();
252249
let mut base_metadata: HashMap<&str, &MetadataValue> = HashMap::new();
253250
if let Some(data_record) = &self.data_record {
@@ -327,21 +324,19 @@ impl<'referred_data> MaterializedLogRecord<'referred_data> {
327324
return match &self.final_embedding {
328325
Some(embed) => embed,
329326
None => match self.data_record.as_ref() {
330-
Some(data_record) => data_record.embedding,
327+
Some(data_record) => &data_record.embedding,
331328
None => panic!("Expected at least one source of embedding"),
332329
},
333330
};
334331
}
335332
}
336333

337-
impl<'referred_data> From<(DataRecord<'referred_data>, u32)>
338-
for MaterializedLogRecord<'referred_data>
339-
{
334+
impl<'referred_data> From<(DataRecord<'referred_data>, u32)> for MaterializedLogRecord {
340335
fn from(data_record_info: (DataRecord<'referred_data>, u32)) -> Self {
341336
let data_record = data_record_info.0;
342337
let offset_id = data_record_info.1;
343338
Self {
344-
data_record: Some(data_record),
339+
data_record: Some(data_record.to_owned()),
345340
offset_id,
346341
user_id: None,
347342
final_operation: MaterializedLogOperation::Initial,
@@ -357,7 +352,7 @@ impl<'referred_data> From<(DataRecord<'referred_data>, u32)>
357352
// in the log (OperationRecord), offset id in storage where it will be stored (u32)
358353
// and user id (str).
359354
impl<'referred_data> TryFrom<(&'referred_data OperationRecord, u32, &'referred_data str)>
360-
for MaterializedLogRecord<'referred_data>
355+
for MaterializedLogRecord
361356
{
362357
type Error = LogMaterializerError;
363358

@@ -414,7 +409,7 @@ pub async fn materialize_logs<'me>(
414409
// for materializing. Writers pass this value to the materializer
415410
// because they need to share this across all log partitions.
416411
next_offset_id: Option<Arc<AtomicU32>>,
417-
) -> Result<Chunk<MaterializedLogRecord<'me>>, LogMaterializerError> {
412+
) -> Result<Chunk<MaterializedLogRecord>, LogMaterializerError> {
418413
// Trace the total_len since len() iterates over the entire chunk
419414
// and we don't want to do that just to trace the length.
420415
tracing::info!("Total length of logs in materializer: {}", logs.total_len());
@@ -748,10 +743,10 @@ pub async fn materialize_logs<'me>(
748743

749744
// This needs to be public for testing
750745
#[allow(async_fn_in_trait)]
751-
pub trait SegmentWriter<'a> {
746+
pub trait SegmentWriter {
752747
async fn apply_materialized_log_chunk(
753748
&self,
754-
records: Chunk<MaterializedLogRecord<'a>>,
749+
records: Chunk<MaterializedLogRecord>,
755750
) -> Result<(), ApplyMaterializedLogError>;
756751
async fn commit(self) -> Result<impl SegmentFlusher, Box<dyn ChromaError>>;
757752
}
@@ -1880,7 +1875,10 @@ mod tests {
18801875
assert_eq!(hello_found, 1);
18811876
assert_eq!(hello_again_found, 1);
18821877
assert!(log.data_record.is_some());
1883-
assert_eq!(log.data_record.as_ref().unwrap().document, Some("doc1"));
1878+
assert_eq!(
1879+
log.data_record.as_ref().unwrap().document,
1880+
Some("doc1".to_string())
1881+
);
18841882
assert_eq!(
18851883
log.data_record.as_ref().unwrap().embedding,
18861884
vec![1.0, 2.0, 3.0].as_slice()

0 commit comments

Comments
 (0)