@@ -86,8 +86,7 @@ enum ExecutionState {
86
86
Pending ,
87
87
PullLogs ,
88
88
Partition ,
89
- Materialize ,
90
- ApplyingCommittingFlushing ,
89
+ MaterializeApplyCommitFlush ,
91
90
Register ,
92
91
}
93
92
@@ -108,8 +107,9 @@ pub struct CompactOrchestrator {
108
107
pulled_log_offset : Option < i64 > ,
109
108
// Dispatcher
110
109
dispatcher : ComponentHandle < Dispatcher > ,
111
- // number of write segments tasks // todo
110
+ // Tracks the total remaining number of ApplyLogToSegmentWriter tasks per segment
112
111
num_uncompleted_log_apply_tasks : HashMap < SegmentUuid , usize > ,
112
+ // Tracks the total remaining number of FlushSegmentWriter tasks
113
113
num_uncompleted_flush_tasks : usize ,
114
114
// Result Channel
115
115
result_channel :
@@ -124,6 +124,7 @@ pub struct CompactOrchestrator {
124
124
MetadataSegmentWriter < ' static > ,
125
125
) > ,
126
126
flush_results : Vec < SegmentFlushInfo > ,
127
+ // We track a parent span for each segment type so we can group all the spans for a given segment type (makes the resulting trace much easier to read)
127
128
segment_spans : HashMap < SegmentUuid , Span > ,
128
129
}
129
130
@@ -157,6 +158,8 @@ enum CompactionError {
157
158
SystemTimeError ( #[ from] std:: time:: SystemTimeError ) ,
158
159
#[ error( "Result channel dropped" ) ]
159
160
ResultChannelDropped ,
161
+ #[ error( "Invariant violation: {}" , . 0 ) ]
162
+ InvariantViolation ( & ' static str ) ,
160
163
}
161
164
162
165
impl ChromaError for CompactionError {
@@ -290,7 +293,7 @@ impl CompactOrchestrator {
290
293
> ,
291
294
ctx : & crate :: system:: ComponentContext < CompactOrchestrator > ,
292
295
) {
293
- self . state = ExecutionState :: Materialize ;
296
+ self . state = ExecutionState :: MaterializeApplyCommitFlush ;
294
297
295
298
let record_segment = match self . get_segment ( SegmentType :: BlockfileRecord ) . await {
296
299
Ok ( segment) => segment,
@@ -389,8 +392,16 @@ impl CompactOrchestrator {
389
392
}
390
393
391
394
fn get_segment_flusher_span ( & mut self , flusher : & ChromaSegmentFlusher ) -> Span {
392
- let span = self . segment_spans . get ( & flusher. get_id ( ) ) . unwrap ( ) ; // todo
393
- span. clone ( )
395
+ match self . segment_spans . get ( & flusher. get_id ( ) ) {
396
+ Some ( span) => span. clone ( ) ,
397
+ None => {
398
+ tracing:: error!(
399
+ "No span found for segment: {:?}. This should never happen because get_segment_writer_span() should have previously created a span." ,
400
+ flusher. get_name( )
401
+ ) ;
402
+ Span :: current ( )
403
+ }
404
+ }
394
405
}
395
406
396
407
async fn dispatch_apply_log_to_segment_writer_task (
@@ -452,7 +463,7 @@ impl CompactOrchestrator {
452
463
453
464
async fn dispatch_segment_flush (
454
465
& mut self ,
455
- segment_flusher : ChromaSegmentFlusher , // todo: rename
466
+ segment_flusher : ChromaSegmentFlusher ,
456
467
self_address : Box <
457
468
dyn ReceiverForMessage <
458
469
TaskResult < FlushSegmentWriterOutput , FlushSegmentWriterOperatorError > ,
@@ -806,10 +817,23 @@ impl
806
817
. entry ( message. segment_writer . get_id ( ) )
807
818
. and_modify ( |e| * e -= 1 ) ;
808
819
809
- let num_tasks_left = self
820
+ let num_tasks_left = match self
810
821
. num_uncompleted_log_apply_tasks
811
822
. get ( & message. segment_writer . get_id ( ) )
812
- . unwrap ( ) ; // todo
823
+ {
824
+ Some ( num_tasks_left) => num_tasks_left,
825
+ None => {
826
+ terminate_with_error (
827
+ self . result_channel . take ( ) ,
828
+ Box :: new ( CompactionError :: InvariantViolation (
829
+ "Segment writer not found" ,
830
+ ) ) ,
831
+ ctx,
832
+ ) ;
833
+ return ;
834
+ }
835
+ } ;
836
+
813
837
if * num_tasks_left == 0 {
814
838
self . dispatch_segment_writer_commit ( message. segment_writer , ctx. receiver ( ) )
815
839
. await ;
@@ -862,9 +886,8 @@ impl Handler<TaskResult<FlushSegmentWriterOutput, FlushSegmentWriterOperatorErro
862
886
let message = message. into_inner ( ) ;
863
887
match message {
864
888
Ok ( message) => {
865
- self . segment_spans
866
- . remove ( & message. flush_info . segment_id )
867
- . unwrap ( ) ; // todo
889
+ // Drops the span so that the end timestamp is accurate
890
+ let _ = self . segment_spans . remove ( & message. flush_info . segment_id ) ;
868
891
869
892
self . flush_results . push ( message. flush_info ) ;
870
893
self . num_uncompleted_flush_tasks -= 1 ;
0 commit comments