diff --git a/rust/worker/src/execution/operators/brute_force_knn.rs b/rust/worker/src/execution/operators/brute_force_knn.rs index a549a2968da..ab1e0f5f2ec 100644 --- a/rust/worker/src/execution/operators/brute_force_knn.rs +++ b/rust/worker/src/execution/operators/brute_force_knn.rs @@ -1,8 +1,7 @@ use crate::execution::operator::Operator; use crate::execution::operators::normalize_vectors::normalize; use crate::segment::record_segment::RecordSegmentReader; -use crate::segment::LogMaterializer; -use crate::segment::LogMaterializerError; +use crate::segment::{materialize_logs, LogMaterializerError}; use async_trait::async_trait; use chroma_blockstore::provider::BlockfileProvider; use chroma_distance::DistanceFunction; @@ -134,9 +133,7 @@ impl Operator for Brute } } }; - let log_materializer = LogMaterializer::new(record_segment_reader, input.log.clone(), None); - let logs = match log_materializer - .materialize() + let logs = match materialize_logs(&record_segment_reader, &input.log, None) .instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs")) .await { diff --git a/rust/worker/src/execution/operators/count_records.rs b/rust/worker/src/execution/operators/count_records.rs index ef3d4607d61..cf67764c2dd 100644 --- a/rust/worker/src/execution/operators/count_records.rs +++ b/rust/worker/src/execution/operators/count_records.rs @@ -199,9 +199,9 @@ impl Operator for CountRecordsOperator { #[cfg(test)] mod tests { + use crate::segment::materialize_logs; use crate::segment::record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError}; use crate::segment::types::SegmentFlusher; - use crate::segment::LogMaterializer; use crate::{ execution::{ operator::Operator, @@ -293,9 +293,7 @@ mod tests { } } }; - let materializer = LogMaterializer::new(record_segment_reader, data, None); - let mat_records = materializer - .materialize() + let mat_records = materialize_logs(&record_segment_reader, &data, None) .instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs")) .await .expect("Log materialization failed"); diff --git a/rust/worker/src/execution/operators/filter.rs b/rust/worker/src/execution/operators/filter.rs index 7148a8bfd22..635636b1137 100644 --- a/rust/worker/src/execution/operators/filter.rs +++ b/rust/worker/src/execution/operators/filter.rs @@ -19,9 +19,10 @@ use tracing::{trace, Instrument, Span}; use crate::{ execution::operator::Operator, segment::{ + materialize_logs, metadata_segment::{MetadataSegmentError, MetadataSegmentReader}, record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError}, - LogMaterializer, LogMaterializerError, MaterializedLogRecord, + LogMaterializerError, MaterializedLogRecord, }, }; @@ -416,10 +417,8 @@ impl Operator for FilterOperator { } Err(e) => Err(*e), }?; - let materializer = - LogMaterializer::new(record_segment_reader.clone(), input.logs.clone(), None); - let materialized_logs = materializer - .materialize() + let cloned_record_segment_reader = record_segment_reader.clone(); + let materialized_logs = materialize_logs(&cloned_record_segment_reader, &input.logs, None) .instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs")) .await?; let metadata_log_reader = MetadataLogReader::new(&materialized_logs); diff --git a/rust/worker/src/execution/operators/get_vectors_operator.rs b/rust/worker/src/execution/operators/get_vectors_operator.rs index 4cd652d2f83..fb06e267c9d 100644 --- a/rust/worker/src/execution/operators/get_vectors_operator.rs +++ b/rust/worker/src/execution/operators/get_vectors_operator.rs @@ -1,8 +1,9 @@ use crate::{ execution::operator::Operator, segment::{ + materialize_logs, record_segment::{self, RecordSegmentReader}, - LogMaterializer, LogMaterializerError, + LogMaterializerError, }, }; use async_trait::async_trait; @@ -120,17 +121,13 @@ impl Operator for GetVectorsO }, }; // Step 1: Materialize the logs. - let materializer = LogMaterializer::new( - record_segment_reader.clone(), - input.log_records.clone(), - None, - ); - let mat_records = match materializer.materialize().await { - Ok(records) => records, - Err(e) => { - return Err(GetVectorsOperatorError::LogMaterialization(e)); - } - }; + let mat_records = + match materialize_logs(&record_segment_reader, &input.log_records, None).await { + Ok(records) => records, + Err(e) => { + return Err(GetVectorsOperatorError::LogMaterialization(e)); + } + }; // Search the log records for the user ids let mut remaining_search_user_ids: HashSet = diff --git a/rust/worker/src/execution/operators/hnsw_knn.rs b/rust/worker/src/execution/operators/hnsw_knn.rs index fbdbf11ce3a..c5e57467d5d 100644 --- a/rust/worker/src/execution/operators/hnsw_knn.rs +++ b/rust/worker/src/execution/operators/hnsw_knn.rs @@ -1,5 +1,5 @@ use crate::segment::record_segment::RecordSegmentReaderCreationError; -use crate::segment::{LogMaterializer, LogMaterializerError, MaterializedLogRecord}; +use crate::segment::{materialize_logs, LogMaterializerError, MaterializedLogRecord}; use crate::{ execution::operator::Operator, segment::{ @@ -147,13 +147,8 @@ impl Operator for HnswKnnOperator { } }, }; - let log_materializer = LogMaterializer::new( - Some(record_segment_reader.clone()), - input.logs.clone(), - None, - ); - let logs = match log_materializer - .materialize() + let some_reader = Some(record_segment_reader.clone()); + let logs = match materialize_logs(&some_reader, &input.logs, None) .instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs")) .await { diff --git a/rust/worker/src/execution/operators/knn_log.rs b/rust/worker/src/execution/operators/knn_log.rs index f854d126f68..aee11654529 100644 --- a/rust/worker/src/execution/operators/knn_log.rs +++ b/rust/worker/src/execution/operators/knn_log.rs @@ -10,8 +10,9 @@ use tonic::async_trait; use crate::{ execution::operator::Operator, segment::{ + materialize_logs, record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError}, - LogMaterializer, LogMaterializerError, + LogMaterializerError, }, }; @@ -72,8 +73,7 @@ impl Operator for KnnOperator { Err(e) => Err(*e), }?; - let materializer = LogMaterializer::new(record_segment_reader, input.logs.clone(), None); - let logs = materializer.materialize().await?; + let logs = materialize_logs(&record_segment_reader, &input.logs, None).await?; let target_vector; let target_embedding = if let DistanceFunction::Cosine = input.distance_function { diff --git a/rust/worker/src/execution/operators/limit.rs b/rust/worker/src/execution/operators/limit.rs index 200a6fc787d..b5709dcddbd 100644 --- a/rust/worker/src/execution/operators/limit.rs +++ b/rust/worker/src/execution/operators/limit.rs @@ -11,8 +11,9 @@ use tracing::{trace, Instrument, Span}; use crate::{ execution::operator::Operator, segment::{ + materialize_logs, record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError}, - LogMaterializer, LogMaterializerError, + LogMaterializerError, }, }; @@ -213,10 +214,7 @@ impl Operator for LimitOperator { let mut materialized_log_offset_ids = match &input.log_offset_ids { SignedRoaringBitmap::Include(rbm) => rbm.clone(), SignedRoaringBitmap::Exclude(rbm) => { - let materializer = - LogMaterializer::new(record_segment_reader.clone(), input.logs.clone(), None); - let materialized_logs = materializer - .materialize() + let materialized_logs = materialize_logs(&record_segment_reader, &input.logs, None) .instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs")) .await?; diff --git a/rust/worker/src/execution/operators/prefetch_record.rs b/rust/worker/src/execution/operators/prefetch_record.rs index f42494b8fd3..b97594d778b 100644 --- a/rust/worker/src/execution/operators/prefetch_record.rs +++ b/rust/worker/src/execution/operators/prefetch_record.rs @@ -10,8 +10,9 @@ use tracing::{trace, Instrument, Span}; use crate::{ execution::operator::Operator, segment::{ + materialize_logs, record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError}, - LogMaterializer, LogMaterializerError, + LogMaterializerError, }, }; @@ -84,13 +85,8 @@ impl Operator for PrefetchRecordOpera Err(e) => return Err((*e).into()), }; - let materializer = LogMaterializer::new( - Some(record_segment_reader.clone()), - input.logs.clone(), - None, - ); - let materialized_logs = materializer - .materialize() + let some_reader = Some(record_segment_reader.clone()); + let materialized_logs = materialize_logs(&some_reader, &input.logs, None) .instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs")) .await?; diff --git a/rust/worker/src/execution/operators/projection.rs b/rust/worker/src/execution/operators/projection.rs index b9342aea3ad..f02f89acbfe 100644 --- a/rust/worker/src/execution/operators/projection.rs +++ b/rust/worker/src/execution/operators/projection.rs @@ -10,8 +10,9 @@ use tracing::{error, trace, Instrument, Span}; use crate::{ execution::operator::Operator, segment::{ + materialize_logs, record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError}, - LogMaterializer, LogMaterializerError, + LogMaterializerError, }, }; @@ -104,10 +105,8 @@ impl Operator for ProjectionOperator { } Err(e) => Err(*e), }?; - let materializer = - LogMaterializer::new(record_segment_reader.clone(), input.logs.clone(), None); - let materialized_logs = materializer - .materialize() + + let materialized_logs = materialize_logs(&record_segment_reader, &input.logs, None) .instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs")) .await?; diff --git a/rust/worker/src/execution/operators/write_segments.rs b/rust/worker/src/execution/operators/write_segments.rs index 1d125a41ed3..508cee2d383 100644 --- a/rust/worker/src/execution/operators/write_segments.rs +++ b/rust/worker/src/execution/operators/write_segments.rs @@ -1,9 +1,9 @@ +use crate::segment::materialize_logs; use crate::segment::metadata_segment::MetadataSegmentError; use crate::segment::metadata_segment::MetadataSegmentWriter; use crate::segment::record_segment::ApplyMaterializedLogError; use crate::segment::record_segment::RecordSegmentReader; use crate::segment::record_segment::RecordSegmentReaderCreationError; -use crate::segment::LogMaterializer; use crate::segment::LogMaterializerError; use crate::segment::SegmentWriter; use crate::{ @@ -160,16 +160,14 @@ impl Operator for WriteSegmentsOperator }; } }; - let materializer = LogMaterializer::new( - record_segment_reader, - input.chunk.clone(), - Some(input.next_offset_id.clone()), - ); // Materialize the logs. - let res = match materializer - .materialize() - .instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs")) - .await + let res = match materialize_logs( + &record_segment_reader, + &input.chunk, + Some(input.next_offset_id.clone()), + ) + .instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs")) + .await { Ok(records) => records, Err(e) => { diff --git a/rust/worker/src/segment/metadata_segment.rs b/rust/worker/src/segment/metadata_segment.rs index 5f43e540482..c5410479cf9 100644 --- a/rust/worker/src/segment/metadata_segment.rs +++ b/rust/worker/src/segment/metadata_segment.rs @@ -1075,11 +1075,12 @@ mod test { #![allow(deprecated)] use crate::segment::{ + materialize_logs, metadata_segment::{MetadataSegmentReader, MetadataSegmentWriter}, record_segment::{ RecordSegmentReader, RecordSegmentReaderCreationError, RecordSegmentWriter, }, - LogMaterializer, SegmentFlusher, SegmentWriter, + SegmentFlusher, SegmentWriter, }; use chroma_blockstore::{ arrow::{config::TEST_MAX_BLOCK_SIZE_BYTES, provider::ArrowBlockfileProvider}, @@ -1193,9 +1194,7 @@ mod test { } } }; - let materializer = LogMaterializer::new(record_segment_reader, data, None); - let mat_records = materializer - .materialize() + let mat_records = materialize_logs(&record_segment_reader, &data, None) .await .expect("Log materialization failed"); metadata_writer @@ -1265,9 +1264,8 @@ mod test { MetadataSegmentWriter::from_segment(&metadata_segment, &blockfile_provider) .await .expect("Error creating segment writer"); - let materializer = LogMaterializer::new(Some(record_segment_reader), data, None); - let mat_records = materializer - .materialize() + let some_reader = Some(record_segment_reader); + let mat_records = materialize_logs(&some_reader, &data, None) .await .expect("Log materialization failed"); metadata_writer @@ -1347,9 +1345,8 @@ mod test { MetadataSegmentWriter::from_segment(&metadata_segment, &blockfile_provider) .await .expect("Error creating segment writer"); - let materializer = LogMaterializer::new(Some(record_segment_reader), data, None); - let mat_records = materializer - .materialize() + let some_reader = Some(record_segment_reader); + let mat_records = materialize_logs(&some_reader, &data, None) .await .expect("Log materialization failed"); metadata_writer @@ -1487,9 +1484,7 @@ mod test { } } }; - let materializer = LogMaterializer::new(record_segment_reader, data, None); - let mat_records = materializer - .materialize() + let mat_records = materialize_logs(&record_segment_reader, &data, None) .await .expect("Log materialization failed"); metadata_writer @@ -1566,9 +1561,8 @@ mod test { MetadataSegmentWriter::from_segment(&metadata_segment, &blockfile_provider) .await .expect("Error creating segment writer"); - let materializer = LogMaterializer::new(Some(record_segment_reader), data, None); - let mat_records = materializer - .materialize() + let some_reader = Some(record_segment_reader); + let mat_records = materialize_logs(&some_reader, &data, None) .await .expect("Log materialization failed"); metadata_writer @@ -1740,9 +1734,7 @@ mod test { } } }; - let materializer = LogMaterializer::new(record_segment_reader, data, None); - let mat_records = materializer - .materialize() + let mat_records = materialize_logs(&record_segment_reader, &data, None) .await .expect("Log materialization failed"); metadata_writer @@ -1801,9 +1793,8 @@ mod test { MetadataSegmentWriter::from_segment(&metadata_segment, &blockfile_provider) .await .expect("Error creating segment writer"); - let materializer = LogMaterializer::new(Some(record_segment_reader), data, None); - let mat_records = materializer - .materialize() + let some_reader = Some(record_segment_reader); + let mat_records = materialize_logs(&some_reader, &data, None) .await .expect("Log materialization failed"); metadata_writer @@ -1962,9 +1953,7 @@ mod test { } } }; - let materializer = LogMaterializer::new(record_segment_reader, data, None); - let mat_records = materializer - .materialize() + let mat_records = materialize_logs(&record_segment_reader, &data, None) .await .expect("Log materialization failed"); metadata_writer @@ -2021,9 +2010,8 @@ mod test { MetadataSegmentWriter::from_segment(&metadata_segment, &blockfile_provider) .await .expect("Error creating segment writer"); - let materializer = LogMaterializer::new(Some(record_segment_reader), data, None); - let mat_records = materializer - .materialize() + let some_reader = Some(record_segment_reader); + let mat_records = materialize_logs(&some_reader, &data, None) .await .expect("Log materialization failed"); metadata_writer diff --git a/rust/worker/src/segment/record_segment.rs b/rust/worker/src/segment/record_segment.rs index 32e550ec82c..379cc918749 100644 --- a/rust/worker/src/segment/record_segment.rs +++ b/rust/worker/src/segment/record_segment.rs @@ -955,7 +955,7 @@ mod tests { use crate::{ log::test::{upsert_generator, LogGenerator}, segment::{ - record_segment::MAX_OFFSET_ID, test::TestSegment, LogMaterializer, SegmentWriter, + materialize_logs, record_segment::MAX_OFFSET_ID, test::TestSegment, SegmentWriter, }, }; @@ -999,10 +999,12 @@ mod tests { .stack_size(stack_size) .spawn(move || { let log_chunk = Chunk::new(batch.into()); - let materializer = - LogMaterializer::new(None, log_chunk, Some(curr_offset_id)); - let materialized_logs = future::block_on(materializer.materialize()) - .expect("Should be able to materialize log"); + let materialized_logs = future::block_on(materialize_logs( + &None, + &log_chunk, + Some(curr_offset_id), + )) + .expect("Should be able to materialize log"); future::block_on( record_writer.apply_materialized_log_chunk(materialized_logs), ) diff --git a/rust/worker/src/segment/test.rs b/rust/worker/src/segment/test.rs index c70952be0b4..25b9e8cbb90 100644 --- a/rust/worker/src/segment/test.rs +++ b/rust/worker/src/segment/test.rs @@ -9,7 +9,7 @@ use chroma_types::{ use crate::log::test::{LogGenerator, TEST_EMBEDDING_DIMENSION}; use super::{ - metadata_segment::MetadataSegmentWriter, record_segment::RecordSegmentWriter, LogMaterializer, + materialize_logs, metadata_segment::MetadataSegmentWriter, record_segment::RecordSegmentWriter, SegmentFlusher, SegmentWriter, }; @@ -24,12 +24,13 @@ pub struct TestSegment { impl TestSegment { // WARN: The size of the log chunk should not be too large async fn compact_log(&mut self, logs: Chunk, next_offset: usize) { - let materializer = - LogMaterializer::new(None, logs, Some(AtomicU32::new(next_offset as u32).into())); - let materialized_logs = materializer - .materialize() - .await - .expect("Should be able to materialize log."); + let materialized_logs = materialize_logs( + &None, + &logs, + Some(AtomicU32::new(next_offset as u32).into()), + ) + .await + .expect("Should be able to materialize log."); let mut metadata_writer = MetadataSegmentWriter::from_segment(&self.metadata_segment, &self.blockfile_provider) diff --git a/rust/worker/src/segment/types.rs b/rust/worker/src/segment/types.rs index f81194d86ba..11e1845741f 100644 --- a/rust/worker/src/segment/types.rs +++ b/rust/worker/src/segment/types.rs @@ -408,109 +408,87 @@ impl<'referred_data> TryFrom<(&'referred_data OperationRecord, u32, &'referred_d } } -pub struct LogMaterializer<'me> { +pub async fn materialize_logs<'me>( // Is None when record segment is uninitialized. - pub(crate) record_segment_reader: Option>, - pub(crate) logs: Chunk, + record_segment_reader: &'me Option>, + logs: &'me Chunk, // Is None for readers. In that case, the materializer reads // the current maximum from the record segment and uses that // for materializing. Writers pass this value to the materializer // because they need to share this across all log partitions. - pub(crate) next_offset_id: Option>, -} - -impl<'me> LogMaterializer<'me> { - pub fn new( - record_segment_reader: Option>, - logs: Chunk, - next_offset_id: Option>, - ) -> Self { - Self { - record_segment_reader, - logs, - next_offset_id, - } - } - pub async fn materialize( - &'me self, - ) -> Result>, LogMaterializerError> { - // Trace the total_len since len() iterates over the entire chunk - // and we don't want to do that just to trace the length. - tracing::info!( - "Total length of logs in materializer: {}", - self.logs.total_len() - ); - let next_offset_id; - match self.next_offset_id.as_ref() { - Some(next_oid) => { - next_offset_id = next_oid.clone(); + next_offset_id: Option>, +) -> Result>, LogMaterializerError> { + // Trace the total_len since len() iterates over the entire chunk + // and we don't want to do that just to trace the length. + tracing::info!("Total length of logs in materializer: {}", logs.total_len()); + // The offset ID that should be used for the next record + let next_offset_id = match next_offset_id.as_ref() { + Some(next_offset_id) => next_offset_id.clone(), + None => { + match record_segment_reader.as_ref() { + Some(reader) => { + let offset_id = reader.get_current_max_offset_id(); + offset_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + offset_id + } + // This means that the segment is uninitialized so counting starts from 1. + None => Arc::new(AtomicU32::new(1)), } - None => { - match self.record_segment_reader.as_ref() { - Some(reader) => { - next_offset_id = reader.get_current_max_offset_id(); - next_offset_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - } - // This means that the segment is uninitialized so counting starts - // from 1. - None => { - next_offset_id = Arc::new(AtomicU32::new(1)); + } + }; + + // Populate entries that are present in the record segment. + let mut existing_id_to_materialized: HashMap<&str, MaterializedLogRecord> = HashMap::new(); + let mut new_id_to_materialized: HashMap<&str, MaterializedLogRecord> = HashMap::new(); + if let Some(reader) = &record_segment_reader { + async { + for (log_record, _) in logs.iter() { + let exists = match reader + .data_exists_for_user_id(log_record.record.id.as_str()) + .await + { + Ok(res) => res, + Err(e) => { + return Err(LogMaterializerError::RecordSegment(e)); } }; - } - } - // Populate entries that are present in the record segment. - let mut existing_id_to_materialized: HashMap<&str, MaterializedLogRecord> = HashMap::new(); - let mut new_id_to_materialized: HashMap<&str, MaterializedLogRecord> = HashMap::new(); - if let Some(reader) = &self.record_segment_reader { - async { - for (log_record, _) in self.logs.iter() { - let exists = match reader - .data_exists_for_user_id(log_record.record.id.as_str()) + if exists { + match reader + .get_data_and_offset_id_for_user_id(log_record.record.id.as_str()) .await { - Ok(res) => res, + Ok(Some((data_record, offset_id))) => { + existing_id_to_materialized.insert( + log_record.record.id.as_str(), + MaterializedLogRecord::from((data_record, offset_id)), + ); + } + Ok(None) => { + return Err(LogMaterializerError::RecordSegment(Box::new( + RecordSegmentReaderCreationError::UserRecordNotFound(format!( + "not found: {}", + log_record.record.id, + )), + ) + as _)); + } Err(e) => { return Err(LogMaterializerError::RecordSegment(e)); } - }; - if exists { - match reader - .get_data_and_offset_id_for_user_id(log_record.record.id.as_str()) - .await - { - Ok(Some((data_record, offset_id))) => { - existing_id_to_materialized.insert( - log_record.record.id.as_str(), - MaterializedLogRecord::from((data_record, offset_id)), - ); - } - Ok(None) => { - return Err(LogMaterializerError::RecordSegment(Box::new( - RecordSegmentReaderCreationError::UserRecordNotFound(format!( - "not found: {}", - log_record.record.id, - )), - ) - as _)); - } - Err(e) => { - return Err(LogMaterializerError::RecordSegment(e)); - } - } } } - Ok(()) } - .instrument( - tracing::info_span!(parent: Span::current(), "Materialization read from storage"), - ) - .await?; + Ok(()) } - // Populate updates to these and fresh records that are being - // inserted for the first time. - async { - for (log_record, _) in self.logs.iter() { + .instrument( + tracing::info_span!(parent: Span::current(), "Materialization read from storage"), + ) + .await?; + } + // Populate updates to these and fresh records that are being + // inserted for the first time. + async { + for (log_record, _) in logs.iter() { match log_record.record.operation { Operation::Add => { // If this is an add of a record present in the segment then add @@ -768,20 +746,19 @@ impl<'me> LogMaterializer<'me> { } Ok(()) }.instrument(tracing::info_span!(parent: Span::current(), "Materialization main iteration")).await?; - let mut res = vec![]; - for (_key, value) in existing_id_to_materialized { - // Ignore records that only had invalid ADDS on the log. - if value.final_operation == MaterializedLogOperation::Initial { - continue; - } - res.push(value); - } - for (_key, value) in new_id_to_materialized { - res.push(value); + let mut res = vec![]; + for (_key, value) in existing_id_to_materialized { + // Ignore records that only had invalid ADDS on the log. + if value.final_operation == MaterializedLogOperation::Initial { + continue; } - res.sort_by(|x, y| x.offset_id.cmp(&y.offset_id)); - Ok(Chunk::new(res.into())) + res.push(value); + } + for (_key, value) in new_id_to_materialized { + res.push(value); } + res.sort_by(|x, y| x.offset_id.cmp(&y.offset_id)); + Ok(Chunk::new(res.into())) } // This needs to be public for testing @@ -907,9 +884,7 @@ mod tests { } } }; - let materializer = LogMaterializer::new(record_segment_reader, data, None); - let mat_records = materializer - .materialize() + let mat_records = materialize_logs(&record_segment_reader, &data, None) .await .expect("Log materialization failed"); metadata_writer @@ -971,13 +946,8 @@ mod tests { let reader = RecordSegmentReader::from_segment(&record_segment, &blockfile_provider) .await .expect("Error creating segment reader"); - let materializer = LogMaterializer { - record_segment_reader: Some(reader), - logs: data, - next_offset_id: None, - }; - let res = materializer - .materialize() + let some_reader = Some(reader); + let res = materialize_logs(&some_reader, &data, None) .await .expect("Error materializing logs"); let mut res_vec = vec![]; @@ -1206,9 +1176,7 @@ mod tests { } } }; - let materializer = LogMaterializer::new(record_segment_reader, data, None); - let mat_records = materializer - .materialize() + let mat_records = materialize_logs(&record_segment_reader, &data, None) .await .expect("Log materialization failed"); metadata_writer @@ -1257,13 +1225,8 @@ mod tests { let reader = RecordSegmentReader::from_segment(&record_segment, &blockfile_provider) .await .expect("Error creating segment reader"); - let materializer = LogMaterializer { - record_segment_reader: Some(reader), - logs: data, - next_offset_id: None, - }; - let res = materializer - .materialize() + let some_reader = Some(reader); + let res = materialize_logs(&some_reader, &data, None) .await .expect("Error materializing logs"); let mut res_vec = vec![]; @@ -1497,9 +1460,7 @@ mod tests { } } }; - let materializer = LogMaterializer::new(record_segment_reader, data, None); - let mat_records = materializer - .materialize() + let mat_records = materialize_logs(&record_segment_reader, &data, None) .await .expect("Log materialization failed"); metadata_writer @@ -1572,13 +1533,8 @@ mod tests { let reader = RecordSegmentReader::from_segment(&record_segment, &blockfile_provider) .await .expect("Error creating segment reader"); - let materializer = LogMaterializer { - record_segment_reader: Some(reader), - logs: data, - next_offset_id: None, - }; - let res = materializer - .materialize() + let some_reader = Some(reader); + let res = materialize_logs(&some_reader, &data, None) .await .expect("Error materializing logs"); let mut res_vec = vec![]; @@ -1807,9 +1763,7 @@ mod tests { } } }; - let materializer = LogMaterializer::new(record_segment_reader, data, None); - let mat_records = materializer - .materialize() + let mat_records = materialize_logs(&record_segment_reader, &data, None) .await .expect("Log materialization failed"); segment_writer @@ -1870,13 +1824,8 @@ mod tests { let reader = RecordSegmentReader::from_segment(&record_segment, &blockfile_provider) .await .expect("Error creating segment reader"); - let materializer = LogMaterializer { - record_segment_reader: Some(reader), - logs: data, - next_offset_id: None, - }; - let res = materializer - .materialize() + let some_reader = Some(reader); + let res = materialize_logs(&some_reader, &data, None) .await .expect("Error materializing logs"); assert_eq!(3, res.len());