1
1
use std:: collections:: BTreeMap ;
2
2
use std:: fmt:: Display ;
3
- use std:: ops:: { Deref , Range } ;
3
+ use std:: ops:: { Deref , Range , RangeBounds } ;
4
4
use std:: sync:: { Arc , RwLock } ;
5
5
use std:: task:: Poll ;
6
6
7
7
use async_trait:: async_trait;
8
8
use futures:: channel:: mpsc;
9
9
use futures:: { SinkExt , Stream , StreamExt } ;
10
+ use range_union_find:: RangeUnionFind ;
10
11
use vortex_buffer:: { Buffer , ByteBuffer } ;
11
12
use vortex_error:: { VortexExpect , VortexResult , vortex_err} ;
12
13
@@ -119,6 +120,7 @@ impl SegmentCollector {
119
120
RowRangePruner {
120
121
store : self . store . clone ( ) ,
121
122
cancellations_tx,
123
+ excluded_ranges : Default :: default ( ) ,
122
124
} ,
123
125
SegmentStream {
124
126
store : self . store ,
@@ -134,18 +136,29 @@ impl SegmentCollector {
134
136
pub struct RowRangePruner {
135
137
store : Arc < RwLock < SegmentStore > > ,
136
138
cancellations_tx : mpsc:: UnboundedSender < SegmentId > ,
139
+ excluded_ranges : Arc < RwLock < RangeUnionFind < u64 > > > ,
137
140
}
138
141
139
142
impl RowRangePruner {
140
143
// Remove all segments fully encompassed by the given row range. Removals
141
144
// of each matching segment is notified to the cancellation channel.
142
145
pub async fn remove ( & mut self , to_exclude : Range < u64 > ) -> VortexResult < ( ) > {
146
+ let to_exclude = {
147
+ let mut excluded_ranges = self . excluded_ranges . write ( ) . vortex_expect ( "poisoned lock" ) ;
148
+ excluded_ranges
149
+ . insert_range ( & to_exclude)
150
+ . map_err ( |e| vortex_err ! ( "invalid range: {e}" ) ) ?;
151
+ excluded_ranges
152
+ . find_range_with_element ( & to_exclude. start )
153
+ . map_err ( |_| vortex_err ! ( "can not find range just inserted" ) ) ?
154
+ } ;
155
+
143
156
let cancelled_segments: Vec < _ > = {
144
157
let mut store = self . store . write ( ) . vortex_expect ( "poisoned lock" ) ;
145
158
let to_remove: Vec < _ > = store
146
159
. keys ( )
147
- . skip_while ( |key| key. row_start < to_exclude . start )
148
- . take_while ( |key| key. row_end <= to_exclude . end )
160
+ . skip_while ( |key| !to_exclude . contains ( & key. row_start ) )
161
+ . take_while ( |key| to_exclude . contains ( & key. row_end ) )
149
162
. copied ( )
150
163
. collect ( ) ;
151
164
to_remove
0 commit comments