Skip to content

Commit 02dd75b

Browse files
committed
Move materialization into operator
1 parent a501edd commit 02dd75b

File tree

5 files changed

+426
-328
lines changed

5 files changed

+426
-328
lines changed

rust/worker/src/compactor/compaction_manager.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ use futures::stream::FuturesUnordered;
2222
use futures::StreamExt;
2323
use std::fmt::Debug;
2424
use std::fmt::Formatter;
25-
use std::sync::atomic::AtomicU32;
26-
use std::sync::Arc;
2725
use std::time::Duration;
2826
use thiserror::Error;
2927
use tracing::instrument;
@@ -123,8 +121,6 @@ impl CompactionManager {
123121
self.hnsw_index_provider.clone(),
124122
dispatcher,
125123
None,
126-
None,
127-
Arc::new(AtomicU32::new(1)),
128124
self.max_compaction_size,
129125
self.max_partition_size,
130126
);
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
use crate::execution::operator::Operator;
2+
use crate::segment::record_segment::RecordSegmentReaderCreationError;
3+
use crate::segment::{materialize_logs, record_segment::RecordSegmentReader};
4+
use crate::segment::{LogMaterializerError, MaterializedLogRecord};
5+
use async_trait::async_trait;
6+
use chroma_blockstore::provider::BlockfileProvider;
7+
use chroma_error::ChromaError;
8+
use chroma_types::{Chunk, LogRecord, Segment};
9+
use futures::TryFutureExt;
10+
use std::sync::atomic::AtomicU32;
11+
use std::sync::Arc;
12+
use thiserror::Error;
13+
14+
#[derive(Error, Debug)]
15+
pub enum MaterializeLogOperatorError {
16+
#[error("Could not create record segment reader: {0}")]
17+
RecordSegmentReaderCreationFailed(#[from] RecordSegmentReaderCreationError),
18+
#[error("Log materialization failed: {0}")]
19+
LogMaterializationFailed(#[from] LogMaterializerError),
20+
}
21+
22+
impl ChromaError for MaterializeLogOperatorError {
23+
fn code(&self) -> chroma_error::ErrorCodes {
24+
match self {
25+
MaterializeLogOperatorError::RecordSegmentReaderCreationFailed(e) => e.code(),
26+
MaterializeLogOperatorError::LogMaterializationFailed(e) => e.code(),
27+
}
28+
}
29+
}
30+
31+
#[derive(Debug)]
32+
pub struct MaterializeLogOperator {}
33+
34+
impl MaterializeLogOperator {
35+
pub fn new() -> Box<Self> {
36+
Box::new(MaterializeLogOperator {})
37+
}
38+
}
39+
40+
#[derive(Debug)]
41+
pub struct MaterializeLogInput {
42+
logs: Chunk<LogRecord>,
43+
provider: BlockfileProvider,
44+
record_segment: Segment,
45+
offset_id: Arc<AtomicU32>,
46+
}
47+
48+
impl MaterializeLogInput {
49+
pub fn new(
50+
logs: Chunk<LogRecord>,
51+
provider: BlockfileProvider,
52+
record_segment: Segment,
53+
offset_id: Arc<AtomicU32>,
54+
) -> Self {
55+
MaterializeLogInput {
56+
logs,
57+
provider,
58+
record_segment,
59+
offset_id,
60+
}
61+
}
62+
}
63+
64+
pub struct MaterializeLogOutput {
65+
pub(crate) result: Chunk<MaterializedLogRecord>,
66+
}
67+
68+
impl std::fmt::Debug for MaterializeLogOutput {
69+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70+
f.debug_struct("MaterializeLogOutput")
71+
.field("# of materialized records", &self.result.total_len())
72+
.finish()
73+
}
74+
}
75+
76+
#[async_trait]
77+
impl Operator<MaterializeLogInput, MaterializeLogOutput> for MaterializeLogOperator {
78+
type Error = MaterializeLogOperatorError;
79+
80+
async fn run(&self, input: &MaterializeLogInput) -> Result<MaterializeLogOutput, Self::Error> {
81+
tracing::debug!("Materializing {} log entries", input.logs.total_len());
82+
83+
let record_segment_reader =
84+
match RecordSegmentReader::from_segment(&input.record_segment, &input.provider).await {
85+
Ok(reader) => Some(reader),
86+
Err(e) => {
87+
match *e {
88+
// Uninitialized segment is fine and means that the record
89+
// segment is not yet initialized in storage.
90+
RecordSegmentReaderCreationError::UninitializedSegment => None,
91+
err => {
92+
tracing::error!("Error creating record segment reader: {:?}", err);
93+
return Err(
94+
MaterializeLogOperatorError::RecordSegmentReaderCreationFailed(err),
95+
);
96+
}
97+
}
98+
}
99+
};
100+
101+
let result = materialize_logs(
102+
&record_segment_reader,
103+
&input.logs,
104+
Some(input.offset_id.clone()),
105+
)
106+
.map_err(MaterializeLogOperatorError::LogMaterializationFailed)
107+
.await?;
108+
109+
Ok(MaterializeLogOutput { result })
110+
}
111+
}

rust/worker/src/execution/operators/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ pub(super) mod count_records;
33
pub(super) mod flush_s3;
44
pub(super) mod get_vectors_operator;
55
pub(super) mod hnsw_knn;
6+
pub(super) mod materialize_logs;
67
pub(super) mod merge_knn_results;
78
pub(super) mod normalize_vectors;
89
pub(super) mod partition;

0 commit comments

Comments
 (0)