Skip to content

Commit db8bf19

Browse files
committed
[CLN]: make materialization function rather than struct
1 parent 337fe73 commit db8bf19

14 files changed

+160
-248
lines changed

rust/worker/src/execution/operators/brute_force_knn.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
use crate::execution::operator::Operator;
22
use crate::execution::operators::normalize_vectors::normalize;
33
use crate::segment::record_segment::RecordSegmentReader;
4-
use crate::segment::LogMaterializer;
5-
use crate::segment::LogMaterializerError;
4+
use crate::segment::{materialize_logs, LogMaterializerError};
65
use async_trait::async_trait;
76
use chroma_blockstore::provider::BlockfileProvider;
87
use chroma_distance::DistanceFunction;
@@ -134,9 +133,7 @@ impl Operator<BruteForceKnnOperatorInput, BruteForceKnnOperatorOutput> for Brute
134133
}
135134
}
136135
};
137-
let log_materializer = LogMaterializer::new(record_segment_reader, input.log.clone(), None);
138-
let logs = match log_materializer
139-
.materialize()
136+
let logs = match materialize_logs(&record_segment_reader, &input.log, None)
140137
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
141138
.await
142139
{

rust/worker/src/execution/operators/count_records.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,9 @@ impl Operator<CountRecordsInput, CountRecordsOutput> for CountRecordsOperator {
199199

200200
#[cfg(test)]
201201
mod tests {
202+
use crate::segment::materialize_logs;
202203
use crate::segment::record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError};
203204
use crate::segment::types::SegmentFlusher;
204-
use crate::segment::LogMaterializer;
205205
use crate::{
206206
execution::{
207207
operator::Operator,
@@ -293,9 +293,7 @@ mod tests {
293293
}
294294
}
295295
};
296-
let materializer = LogMaterializer::new(record_segment_reader, data, None);
297-
let mat_records = materializer
298-
.materialize()
296+
let mat_records = materialize_logs(&record_segment_reader, &data, None)
299297
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
300298
.await
301299
.expect("Log materialization failed");

rust/worker/src/execution/operators/filter.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ use tracing::{trace, Instrument, Span};
1919
use crate::{
2020
execution::operator::Operator,
2121
segment::{
22+
materialize_logs,
2223
metadata_segment::{MetadataSegmentError, MetadataSegmentReader},
2324
record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError},
24-
LogMaterializer, LogMaterializerError, MaterializedLogRecord,
25+
LogMaterializerError, MaterializedLogRecord,
2526
},
2627
};
2728

@@ -416,10 +417,8 @@ impl Operator<FilterInput, FilterOutput> for FilterOperator {
416417
}
417418
Err(e) => Err(*e),
418419
}?;
419-
let materializer =
420-
LogMaterializer::new(record_segment_reader.clone(), input.logs.clone(), None);
421-
let materialized_logs = materializer
422-
.materialize()
420+
let cloned_record_segment_reader = record_segment_reader.clone();
421+
let materialized_logs = materialize_logs(&cloned_record_segment_reader, &input.logs, None)
423422
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
424423
.await?;
425424
let metadata_log_reader = MetadataLogReader::new(&materialized_logs);

rust/worker/src/execution/operators/get_vectors_operator.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
use crate::{
22
execution::operator::Operator,
33
segment::{
4+
materialize_logs,
45
record_segment::{self, RecordSegmentReader},
5-
LogMaterializer, LogMaterializerError,
6+
LogMaterializerError,
67
},
78
};
89
use async_trait::async_trait;
@@ -120,17 +121,13 @@ impl Operator<GetVectorsOperatorInput, GetVectorsOperatorOutput> for GetVectorsO
120121
},
121122
};
122123
// Step 1: Materialize the logs.
123-
let materializer = LogMaterializer::new(
124-
record_segment_reader.clone(),
125-
input.log_records.clone(),
126-
None,
127-
);
128-
let mat_records = match materializer.materialize().await {
129-
Ok(records) => records,
130-
Err(e) => {
131-
return Err(GetVectorsOperatorError::LogMaterialization(e));
132-
}
133-
};
124+
let mat_records =
125+
match materialize_logs(&record_segment_reader, &input.log_records, None).await {
126+
Ok(records) => records,
127+
Err(e) => {
128+
return Err(GetVectorsOperatorError::LogMaterialization(e));
129+
}
130+
};
134131

135132
// Search the log records for the user ids
136133
let mut remaining_search_user_ids: HashSet<String> =

rust/worker/src/execution/operators/hnsw_knn.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::segment::record_segment::RecordSegmentReaderCreationError;
2-
use crate::segment::{LogMaterializer, LogMaterializerError, MaterializedLogRecord};
2+
use crate::segment::{materialize_logs, LogMaterializerError, MaterializedLogRecord};
33
use crate::{
44
execution::operator::Operator,
55
segment::{
@@ -147,13 +147,8 @@ impl Operator<HnswKnnOperatorInput, HnswKnnOperatorOutput> for HnswKnnOperator {
147147
}
148148
},
149149
};
150-
let log_materializer = LogMaterializer::new(
151-
Some(record_segment_reader.clone()),
152-
input.logs.clone(),
153-
None,
154-
);
155-
let logs = match log_materializer
156-
.materialize()
150+
let some_reader = Some(record_segment_reader.clone());
151+
let logs = match materialize_logs(&some_reader, &input.logs, None)
157152
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
158153
.await
159154
{

rust/worker/src/execution/operators/knn_log.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ use tonic::async_trait;
1010
use crate::{
1111
execution::operator::Operator,
1212
segment::{
13+
materialize_logs,
1314
record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError},
14-
LogMaterializer, LogMaterializerError,
15+
LogMaterializerError,
1516
},
1617
};
1718

@@ -72,8 +73,7 @@ impl Operator<KnnLogInput, KnnLogOutput> for KnnOperator {
7273
Err(e) => Err(*e),
7374
}?;
7475

75-
let materializer = LogMaterializer::new(record_segment_reader, input.logs.clone(), None);
76-
let logs = materializer.materialize().await?;
76+
let logs = materialize_logs(&record_segment_reader, &input.logs, None).await?;
7777

7878
let target_vector;
7979
let target_embedding = if let DistanceFunction::Cosine = input.distance_function {

rust/worker/src/execution/operators/limit.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ use tracing::{trace, Instrument, Span};
1111
use crate::{
1212
execution::operator::Operator,
1313
segment::{
14+
materialize_logs,
1415
record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError},
15-
LogMaterializer, LogMaterializerError,
16+
LogMaterializerError,
1617
},
1718
};
1819

@@ -213,10 +214,7 @@ impl Operator<LimitInput, LimitOutput> for LimitOperator {
213214
let mut materialized_log_offset_ids = match &input.log_offset_ids {
214215
SignedRoaringBitmap::Include(rbm) => rbm.clone(),
215216
SignedRoaringBitmap::Exclude(rbm) => {
216-
let materializer =
217-
LogMaterializer::new(record_segment_reader.clone(), input.logs.clone(), None);
218-
let materialized_logs = materializer
219-
.materialize()
217+
let materialized_logs = materialize_logs(&record_segment_reader, &input.logs, None)
220218
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
221219
.await?;
222220

rust/worker/src/execution/operators/prefetch_record.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ use tracing::{trace, Instrument, Span};
1010
use crate::{
1111
execution::operator::Operator,
1212
segment::{
13+
materialize_logs,
1314
record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError},
14-
LogMaterializer, LogMaterializerError,
15+
LogMaterializerError,
1516
},
1617
};
1718

@@ -84,13 +85,8 @@ impl Operator<PrefetchRecordInput, PrefetchRecordOutput> for PrefetchRecordOpera
8485
Err(e) => return Err((*e).into()),
8586
};
8687

87-
let materializer = LogMaterializer::new(
88-
Some(record_segment_reader.clone()),
89-
input.logs.clone(),
90-
None,
91-
);
92-
let materialized_logs = materializer
93-
.materialize()
88+
let some_reader = Some(record_segment_reader.clone());
89+
let materialized_logs = materialize_logs(&some_reader, &input.logs, None)
9490
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
9591
.await?;
9692

rust/worker/src/execution/operators/projection.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ use tracing::{error, trace, Instrument, Span};
1010
use crate::{
1111
execution::operator::Operator,
1212
segment::{
13+
materialize_logs,
1314
record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError},
14-
LogMaterializer, LogMaterializerError,
15+
LogMaterializerError,
1516
},
1617
};
1718

@@ -104,10 +105,8 @@ impl Operator<ProjectionInput, ProjectionOutput> for ProjectionOperator {
104105
}
105106
Err(e) => Err(*e),
106107
}?;
107-
let materializer =
108-
LogMaterializer::new(record_segment_reader.clone(), input.logs.clone(), None);
109-
let materialized_logs = materializer
110-
.materialize()
108+
109+
let materialized_logs = materialize_logs(&record_segment_reader, &input.logs, None)
111110
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
112111
.await?;
113112

rust/worker/src/execution/operators/write_segments.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1+
use crate::segment::materialize_logs;
12
use crate::segment::metadata_segment::MetadataSegmentError;
23
use crate::segment::metadata_segment::MetadataSegmentWriter;
34
use crate::segment::record_segment::ApplyMaterializedLogError;
45
use crate::segment::record_segment::RecordSegmentReader;
56
use crate::segment::record_segment::RecordSegmentReaderCreationError;
6-
use crate::segment::LogMaterializer;
77
use crate::segment::LogMaterializerError;
88
use crate::segment::SegmentWriter;
99
use crate::{
@@ -160,16 +160,14 @@ impl Operator<WriteSegmentsInput, WriteSegmentsOutput> for WriteSegmentsOperator
160160
};
161161
}
162162
};
163-
let materializer = LogMaterializer::new(
164-
record_segment_reader,
165-
input.chunk.clone(),
166-
Some(input.next_offset_id.clone()),
167-
);
168163
// Materialize the logs.
169-
let res = match materializer
170-
.materialize()
171-
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
172-
.await
164+
let res = match materialize_logs(
165+
&record_segment_reader,
166+
&input.chunk,
167+
Some(input.next_offset_id.clone()),
168+
)
169+
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
170+
.await
173171
{
174172
Ok(records) => records,
175173
Err(e) => {

rust/worker/src/segment/metadata_segment.rs

Lines changed: 16 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,11 +1075,12 @@ mod test {
10751075
#![allow(deprecated)]
10761076

10771077
use crate::segment::{
1078+
materialize_logs,
10781079
metadata_segment::{MetadataSegmentReader, MetadataSegmentWriter},
10791080
record_segment::{
10801081
RecordSegmentReader, RecordSegmentReaderCreationError, RecordSegmentWriter,
10811082
},
1082-
LogMaterializer, SegmentFlusher, SegmentWriter,
1083+
SegmentFlusher, SegmentWriter,
10831084
};
10841085
use chroma_blockstore::{
10851086
arrow::{config::TEST_MAX_BLOCK_SIZE_BYTES, provider::ArrowBlockfileProvider},
@@ -1193,9 +1194,7 @@ mod test {
11931194
}
11941195
}
11951196
};
1196-
let materializer = LogMaterializer::new(record_segment_reader, data, None);
1197-
let mat_records = materializer
1198-
.materialize()
1197+
let mat_records = materialize_logs(&record_segment_reader, &data, None)
11991198
.await
12001199
.expect("Log materialization failed");
12011200
metadata_writer
@@ -1265,9 +1264,8 @@ mod test {
12651264
MetadataSegmentWriter::from_segment(&metadata_segment, &blockfile_provider)
12661265
.await
12671266
.expect("Error creating segment writer");
1268-
let materializer = LogMaterializer::new(Some(record_segment_reader), data, None);
1269-
let mat_records = materializer
1270-
.materialize()
1267+
let some_reader = Some(record_segment_reader);
1268+
let mat_records = materialize_logs(&some_reader, &data, None)
12711269
.await
12721270
.expect("Log materialization failed");
12731271
metadata_writer
@@ -1347,9 +1345,8 @@ mod test {
13471345
MetadataSegmentWriter::from_segment(&metadata_segment, &blockfile_provider)
13481346
.await
13491347
.expect("Error creating segment writer");
1350-
let materializer = LogMaterializer::new(Some(record_segment_reader), data, None);
1351-
let mat_records = materializer
1352-
.materialize()
1348+
let some_reader = Some(record_segment_reader);
1349+
let mat_records = materialize_logs(&some_reader, &data, None)
13531350
.await
13541351
.expect("Log materialization failed");
13551352
metadata_writer
@@ -1487,9 +1484,7 @@ mod test {
14871484
}
14881485
}
14891486
};
1490-
let materializer = LogMaterializer::new(record_segment_reader, data, None);
1491-
let mat_records = materializer
1492-
.materialize()
1487+
let mat_records = materialize_logs(&record_segment_reader, &data, None)
14931488
.await
14941489
.expect("Log materialization failed");
14951490
metadata_writer
@@ -1566,9 +1561,8 @@ mod test {
15661561
MetadataSegmentWriter::from_segment(&metadata_segment, &blockfile_provider)
15671562
.await
15681563
.expect("Error creating segment writer");
1569-
let materializer = LogMaterializer::new(Some(record_segment_reader), data, None);
1570-
let mat_records = materializer
1571-
.materialize()
1564+
let some_reader = Some(record_segment_reader);
1565+
let mat_records = materialize_logs(&some_reader, &data, None)
15721566
.await
15731567
.expect("Log materialization failed");
15741568
metadata_writer
@@ -1740,9 +1734,7 @@ mod test {
17401734
}
17411735
}
17421736
};
1743-
let materializer = LogMaterializer::new(record_segment_reader, data, None);
1744-
let mat_records = materializer
1745-
.materialize()
1737+
let mat_records = materialize_logs(&record_segment_reader, &data, None)
17461738
.await
17471739
.expect("Log materialization failed");
17481740
metadata_writer
@@ -1801,9 +1793,8 @@ mod test {
18011793
MetadataSegmentWriter::from_segment(&metadata_segment, &blockfile_provider)
18021794
.await
18031795
.expect("Error creating segment writer");
1804-
let materializer = LogMaterializer::new(Some(record_segment_reader), data, None);
1805-
let mat_records = materializer
1806-
.materialize()
1796+
let some_reader = Some(record_segment_reader);
1797+
let mat_records = materialize_logs(&some_reader, &data, None)
18071798
.await
18081799
.expect("Log materialization failed");
18091800
metadata_writer
@@ -1962,9 +1953,7 @@ mod test {
19621953
}
19631954
}
19641955
};
1965-
let materializer = LogMaterializer::new(record_segment_reader, data, None);
1966-
let mat_records = materializer
1967-
.materialize()
1956+
let mat_records = materialize_logs(&record_segment_reader, &data, None)
19681957
.await
19691958
.expect("Log materialization failed");
19701959
metadata_writer
@@ -2021,9 +2010,8 @@ mod test {
20212010
MetadataSegmentWriter::from_segment(&metadata_segment, &blockfile_provider)
20222011
.await
20232012
.expect("Error creating segment writer");
2024-
let materializer = LogMaterializer::new(Some(record_segment_reader), data, None);
2025-
let mat_records = materializer
2026-
.materialize()
2013+
let some_reader = Some(record_segment_reader);
2014+
let mat_records = materialize_logs(&some_reader, &data, None)
20272015
.await
20282016
.expect("Log materialization failed");
20292017
metadata_writer

0 commit comments

Comments
 (0)