Skip to content

[PERF]: parallelize applying log to segment types #3134

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ opentelemetry-otlp = "0.26"
opentelemetry_sdk = { version = "0.26", features = ["rt-tokio"] }
regex = "1.10.5"
figment = { version = "0.10.12", features = ["env", "yaml", "test"] }
ouroboros = "0.18.4"

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.6"
Expand Down
4 changes: 0 additions & 4 deletions rust/worker/src/compactor/compaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tracing::instrument;
Expand Down Expand Up @@ -123,8 +121,6 @@ impl CompactionManager {
self.hnsw_index_provider.clone(),
dispatcher,
None,
None,
Arc::new(AtomicU32::new(0)),
self.max_compaction_size,
self.max_partition_size,
);
Expand Down
106 changes: 106 additions & 0 deletions rust/worker/src/execution/operators/apply_log_to_segment_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use super::materialize_logs::MaterializeLogOutput;
use crate::execution::operator::Operator;
use crate::segment::metadata_segment::MetadataSegmentError;
use crate::segment::record_segment::ApplyMaterializedLogError;
use crate::segment::record_segment::RecordSegmentReaderCreationError;
use crate::segment::LogMaterializerError;
use crate::segment::SegmentWriter;
use async_trait::async_trait;
use chroma_error::ChromaError;
use chroma_error::ErrorCodes;
use std::sync::Arc;
use thiserror::Error;
use tracing::Instrument;

#[derive(Error, Debug)]
pub enum ApplyLogToSegmentWriterOperatorError {
#[error("Preparation for log materialization failed {0}")]
LogMaterializationPreparationError(#[from] RecordSegmentReaderCreationError),
#[error("Log materialization failed {0}")]
LogMaterializationError(#[from] LogMaterializerError),
#[error("Materialized logs failed to apply {0}")]
ApplyMaterializedLogsError(#[from] ApplyMaterializedLogError),
#[error("Materialized logs failed to apply {0}")]
ApplyMaterializedLogsErrorMetadataSegment(#[from] MetadataSegmentError),
}

impl ChromaError for ApplyLogToSegmentWriterOperatorError {
fn code(&self) -> ErrorCodes {
match self {
ApplyLogToSegmentWriterOperatorError::LogMaterializationPreparationError(e) => e.code(),
ApplyLogToSegmentWriterOperatorError::LogMaterializationError(e) => e.code(),
ApplyLogToSegmentWriterOperatorError::ApplyMaterializedLogsError(e) => e.code(),
ApplyLogToSegmentWriterOperatorError::ApplyMaterializedLogsErrorMetadataSegment(e) => {
e.code()
}
}
}
}

#[derive(Debug)]
pub struct ApplyLogToSegmentWriterOperator {}

impl ApplyLogToSegmentWriterOperator {
pub fn new() -> Box<Self> {
Box::new(ApplyLogToSegmentWriterOperator {})
}
}

#[derive(Debug)]
pub struct ApplyLogToSegmentWriterInput<Writer: SegmentWriter> {
segment_writer: Writer,
materialize_log_output: Arc<MaterializeLogOutput>,
}

impl<Writer: SegmentWriter> ApplyLogToSegmentWriterInput<Writer> {
pub fn new(segment_writer: Writer, materialize_log_output: Arc<MaterializeLogOutput>) -> Self {
ApplyLogToSegmentWriterInput {
segment_writer,
materialize_log_output,
}
}
}

#[derive(Debug)]
pub struct ApplyLogToSegmentWriterOutput {}

#[async_trait]
impl<Writer: SegmentWriter + Send + Sync + Clone>
Operator<ApplyLogToSegmentWriterInput<Writer>, ApplyLogToSegmentWriterOutput>
for ApplyLogToSegmentWriterOperator
{
type Error = ApplyLogToSegmentWriterOperatorError;

fn get_name(&self) -> &'static str {
"ApplyLogToSegmentWriterOperator"
}

async fn run(
&self,
input: &ApplyLogToSegmentWriterInput<Writer>,
) -> Result<ApplyLogToSegmentWriterOutput, Self::Error> {
let materialized_chunk = input.materialize_log_output.get_materialized_records();

// Apply materialized records.
match input
.segment_writer
.apply_materialized_log_chunk(materialized_chunk.clone())
.instrument(tracing::trace_span!(
"Apply materialized logs",
otel.name = format!(
"Apply materialized logs to segment writer {}",
input.segment_writer.get_name()
),
segment = input.segment_writer.get_name()
))
.await
{
Ok(()) => (),
Err(e) => {
return Err(ApplyLogToSegmentWriterOperatorError::ApplyMaterializedLogsError(e));
}
}

Ok(ApplyLogToSegmentWriterOutput {})
}
}
7 changes: 2 additions & 5 deletions rust/worker/src/execution/operators/brute_force_knn.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::execution::operator::Operator;
use crate::execution::operators::normalize_vectors::normalize;
use crate::segment::record_segment::RecordSegmentReader;
use crate::segment::LogMaterializer;
use crate::segment::LogMaterializerError;
use crate::segment::{materialize_logs, LogMaterializerError};
use async_trait::async_trait;
use chroma_blockstore::provider::BlockfileProvider;
use chroma_distance::DistanceFunction;
Expand Down Expand Up @@ -134,9 +133,7 @@ impl Operator<BruteForceKnnOperatorInput, BruteForceKnnOperatorOutput> for Brute
}
}
};
let log_materializer = LogMaterializer::new(record_segment_reader, input.log.clone(), None);
let logs = match log_materializer
.materialize()
let logs = match materialize_logs(&record_segment_reader, &input.log, None)
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
.await
{
Expand Down
14 changes: 10 additions & 4 deletions rust/worker/src/execution/operators/count_records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ impl Operator<CountRecordsInput, CountRecordsOutput> for CountRecordsOperator {
RecordSegmentReaderCreationError::BlockfileOpenError(_) => {
return Err(CountRecordsError::RecordSegmentCreateError(*e));
}
RecordSegmentReaderCreationError::BlockfileReadError(_) => {
return Err(CountRecordsError::RecordSegmentCreateError(*e));
}
RecordSegmentReaderCreationError::InvalidNumberOfFiles => {
return Err(CountRecordsError::RecordSegmentCreateError(*e));
}
Expand Down Expand Up @@ -199,9 +202,9 @@ impl Operator<CountRecordsInput, CountRecordsOutput> for CountRecordsOperator {

#[cfg(test)]
mod tests {
use crate::segment::materialize_logs;
use crate::segment::record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError};
use crate::segment::types::SegmentFlusher;
use crate::segment::LogMaterializer;
use crate::{
execution::{
operator::Operator,
Expand Down Expand Up @@ -281,6 +284,11 @@ mod tests {
"Error creating record segment reader. Blockfile open error."
);
}
RecordSegmentReaderCreationError::BlockfileReadError(_) => {
panic!(
"Error creating record segment reader. Blockfile read error."
);
}
RecordSegmentReaderCreationError::InvalidNumberOfFiles => {
panic!("Error creating record segment reader. Invalid number of files.");
}
Expand All @@ -293,9 +301,7 @@ mod tests {
}
}
};
let materializer = LogMaterializer::new(record_segment_reader, data, None);
let mat_records = materializer
.materialize()
let mat_records = materialize_logs(&record_segment_reader, &data, None)
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
.await
.expect("Log materialization failed");
Expand Down
9 changes: 4 additions & 5 deletions rust/worker/src/execution/operators/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ use tracing::{trace, Instrument, Span};
use crate::{
execution::operator::Operator,
segment::{
materialize_logs,
metadata_segment::{MetadataSegmentError, MetadataSegmentReader},
record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError},
LogMaterializer, LogMaterializerError, MaterializedLogRecord,
LogMaterializerError, MaterializedLogRecord,
},
};

Expand Down Expand Up @@ -416,10 +417,8 @@ impl Operator<FilterInput, FilterOutput> for FilterOperator {
}
Err(e) => Err(*e),
}?;
let materializer =
LogMaterializer::new(record_segment_reader.clone(), input.logs.clone(), None);
let materialized_logs = materializer
.materialize()
let cloned_record_segment_reader = record_segment_reader.clone();
let materialized_logs = materialize_logs(&cloned_record_segment_reader, &input.logs, None)
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
.await?;
let metadata_log_reader = MetadataLogReader::new(&materialized_logs);
Expand Down
24 changes: 12 additions & 12 deletions rust/worker/src/execution/operators/get_vectors_operator.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::{
execution::operator::Operator,
segment::{
materialize_logs,
record_segment::{self, RecordSegmentReader},
LogMaterializer, LogMaterializerError,
LogMaterializerError,
},
};
use async_trait::async_trait;
Expand Down Expand Up @@ -108,6 +109,9 @@ impl Operator<GetVectorsOperatorInput, GetVectorsOperatorOutput> for GetVectorsO
record_segment::RecordSegmentReaderCreationError::BlockfileOpenError(_) => {
return Err(GetVectorsOperatorError::RecordSegmentReaderCreation(*e))
}
record_segment::RecordSegmentReaderCreationError::BlockfileReadError(_) => {
return Err(GetVectorsOperatorError::RecordSegmentReaderCreation(*e))
}
record_segment::RecordSegmentReaderCreationError::InvalidNumberOfFiles => {
return Err(GetVectorsOperatorError::RecordSegmentReaderCreation(*e))
}
Expand All @@ -120,17 +124,13 @@ impl Operator<GetVectorsOperatorInput, GetVectorsOperatorOutput> for GetVectorsO
},
};
// Step 1: Materialize the logs.
let materializer = LogMaterializer::new(
record_segment_reader.clone(),
input.log_records.clone(),
None,
);
let mat_records = match materializer.materialize().await {
Ok(records) => records,
Err(e) => {
return Err(GetVectorsOperatorError::LogMaterialization(e));
}
};
let mat_records =
match materialize_logs(&record_segment_reader, &input.log_records, None).await {
Ok(records) => records,
Err(e) => {
return Err(GetVectorsOperatorError::LogMaterialization(e));
}
};

// Search the log records for the user ids
let mut remaining_search_user_ids: HashSet<String> =
Expand Down
11 changes: 3 additions & 8 deletions rust/worker/src/execution/operators/hnsw_knn.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::segment::record_segment::RecordSegmentReaderCreationError;
use crate::segment::{LogMaterializer, LogMaterializerError, MaterializedLogRecord};
use crate::segment::{materialize_logs, LogMaterializerError, MaterializedLogRecord};
use crate::{
execution::operator::Operator,
segment::{
Expand Down Expand Up @@ -147,13 +147,8 @@ impl Operator<HnswKnnOperatorInput, HnswKnnOperatorOutput> for HnswKnnOperator {
}
},
};
let log_materializer = LogMaterializer::new(
Some(record_segment_reader.clone()),
input.logs.clone(),
None,
);
let logs = match log_materializer
.materialize()
let some_reader = Some(record_segment_reader.clone());
let logs = match materialize_logs(&some_reader, &input.logs, None)
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
.await
{
Expand Down
6 changes: 3 additions & 3 deletions rust/worker/src/execution/operators/knn_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use tonic::async_trait;
use crate::{
execution::operator::Operator,
segment::{
materialize_logs,
record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError},
LogMaterializer, LogMaterializerError,
LogMaterializerError,
},
};

Expand Down Expand Up @@ -72,8 +73,7 @@ impl Operator<KnnLogInput, KnnLogOutput> for KnnOperator {
Err(e) => Err(*e),
}?;

let materializer = LogMaterializer::new(record_segment_reader, input.logs.clone(), None);
let logs = materializer.materialize().await?;
let logs = materialize_logs(&record_segment_reader, &input.logs, None).await?;

let target_vector;
let target_embedding = if let DistanceFunction::Cosine = input.distance_function {
Expand Down
Loading
Loading