@@ -12,7 +12,7 @@ use byteorder::{WriteBytesExt, BE};
12
12
use lzzzz:: lz4:: { self , ACC_LEVEL_DEFAULT } ;
13
13
use parking_lot:: Mutex ;
14
14
use rayon:: {
15
- iter:: { IndexedParallelIterator , IntoParallelIterator , ParallelIterator } ,
15
+ iter:: { Either , IndexedParallelIterator , IntoParallelIterator , ParallelIterator } ,
16
16
scope,
17
17
} ;
18
18
use smallvec:: SmallVec ;
@@ -40,6 +40,10 @@ struct ThreadLocalState<K: StoreKey + Send, const FAMILIES: usize> {
40
40
new_blob_files : Vec < ( u32 , File ) > ,
41
41
}
42
42
43
+ const COLLECTOR_SHARDS : usize = 4 ;
44
+ const COLLECTOR_SHARD_SHIFT : usize =
45
+ u64:: BITS as usize - COLLECTOR_SHARDS . trailing_zeros ( ) as usize ;
46
+
43
47
/// The result of a `WriteBatch::finish` operation.
44
48
pub ( crate ) struct FinishResult {
45
49
pub ( crate ) sequence_number : u32 ,
@@ -49,6 +53,14 @@ pub(crate) struct FinishResult {
49
53
pub ( crate ) new_blob_files : Vec < ( u32 , File ) > ,
50
54
}
51
55
56
+ enum GlobalCollectorState < K : StoreKey + Send > {
57
+ /// Initial state. Single collector. Once the collector is full, we switch to sharded mode.
58
+ Unsharded ( Collector < K > ) ,
59
+ /// Sharded mode.
60
+ /// We use multiple collectors, and select one based on the first bits of the key hash.
61
+ Sharded ( [ Collector < K > ; COLLECTOR_SHARDS ] ) ,
62
+ }
63
+
52
64
/// A write batch.
53
65
pub struct WriteBatch < K : StoreKey + Send , const FAMILIES : usize > {
54
66
/// The database path
@@ -58,7 +70,7 @@ pub struct WriteBatch<K: StoreKey + Send, const FAMILIES: usize> {
58
70
/// The thread local state.
59
71
thread_locals : ThreadLocal < UnsafeCell < ThreadLocalState < K , FAMILIES > > > ,
60
72
/// Collectors in use. The thread local collectors flush into these when they are full.
61
- collectors : [ Mutex < Collector < K > > ; FAMILIES ] ,
73
+ collectors : [ Mutex < GlobalCollectorState < K > > ; FAMILIES ] ,
62
74
/// The list of new SST files that have been created.
63
75
/// Tuple of (sequence number, file).
64
76
new_sst_files : Mutex < Vec < ( u32 , File ) > > ,
@@ -78,7 +90,8 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
78
90
path,
79
91
current_sequence_number : AtomicU32 :: new ( current) ,
80
92
thread_locals : ThreadLocal :: new ( ) ,
81
- collectors : [ ( ) ; FAMILIES ] . map ( |_| Mutex :: new ( Collector :: new ( ) ) ) ,
93
+ collectors : [ ( ) ; FAMILIES ]
94
+ . map ( |_| Mutex :: new ( GlobalCollectorState :: Unsharded ( Collector :: new ( ) ) ) ) ,
82
95
new_sst_files : Mutex :: new ( Vec :: new ( ) ) ,
83
96
idle_collectors : Mutex :: new ( Vec :: new ( ) ) ,
84
97
idle_thread_local_collectors : Mutex :: new ( Vec :: new ( ) ) ,
@@ -131,17 +144,39 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
131
144
) -> Result < ( ) > {
132
145
let mut full_collectors = SmallVec :: < [ _ ; 2 ] > :: new ( ) ;
133
146
{
134
- let mut global_collector = self . collectors [ usize_from_u32 ( family) ] . lock ( ) ;
147
+ let mut global_collector_state = self . collectors [ usize_from_u32 ( family) ] . lock ( ) ;
135
148
for entry in collector. drain ( ) {
136
- global_collector. add_entry ( entry) ;
137
- if global_collector. is_full ( ) {
138
- full_collectors. push ( replace (
139
- & mut * global_collector,
140
- self . idle_collectors
141
- . lock ( )
142
- . pop ( )
143
- . unwrap_or_else ( || Collector :: new ( ) ) ,
144
- ) ) ;
149
+ match & mut * global_collector_state {
150
+ GlobalCollectorState :: Unsharded ( collector) => {
151
+ collector. add_entry ( entry) ;
152
+ if collector. is_full ( ) {
153
+ // When full, split the entries into shards.
154
+ let mut shards: [ Collector < K > ; 4 ] =
155
+ [ ( ) ; COLLECTOR_SHARDS ] . map ( |_| Collector :: new ( ) ) ;
156
+ for entry in collector. drain ( ) {
157
+ let shard = ( entry. key . hash >> COLLECTOR_SHARD_SHIFT ) as usize ;
158
+ shards[ shard] . add_entry ( entry) ;
159
+ }
160
+ // There is a rare edge case where all entries are in the same shard,
161
+ // and the collector is full after the split.
162
+ for collector in shards. iter_mut ( ) {
163
+ if collector. is_full ( ) {
164
+ full_collectors
165
+ . push ( replace ( & mut * collector, self . get_new_collector ( ) ) ) ;
166
+ }
167
+ }
168
+ * global_collector_state = GlobalCollectorState :: Sharded ( shards) ;
169
+ }
170
+ }
171
+ GlobalCollectorState :: Sharded ( shards) => {
172
+ let shard = ( entry. key . hash >> COLLECTOR_SHARD_SHIFT ) as usize ;
173
+ let collector = & mut shards[ shard] ;
174
+ collector. add_entry ( entry) ;
175
+ if collector. is_full ( ) {
176
+ full_collectors
177
+ . push ( replace ( & mut * collector, self . get_new_collector ( ) ) ) ;
178
+ }
179
+ }
145
180
}
146
181
}
147
182
}
@@ -155,6 +190,13 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
155
190
Ok ( ( ) )
156
191
}
157
192
193
+ fn get_new_collector ( & self ) -> Collector < K > {
194
+ self . idle_collectors
195
+ . lock ( )
196
+ . pop ( )
197
+ . unwrap_or_else ( || Collector :: new ( ) )
198
+ }
199
+
158
200
/// Puts a key-value pair into the write batch.
159
201
pub fn put ( & self , family : u32 , key : K , value : ValueBuffer < ' _ > ) -> Result < ( ) > {
160
202
let state = self . thread_local_state ( ) ;
@@ -217,23 +259,27 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
217
259
let mut new_sst_files = take ( self . new_sst_files . get_mut ( ) ) ;
218
260
let shared_new_sst_files = Mutex :: new ( & mut new_sst_files) ;
219
261
220
- let collectors = replace (
221
- & mut self . collectors ,
222
- [ ( ) ; FAMILIES ] . map ( |_| {
223
- Mutex :: new (
224
- self . idle_collectors
225
- . lock ( )
226
- . pop ( )
227
- . unwrap_or_else ( || Collector :: new ( ) ) ,
228
- )
229
- } ) ,
230
- ) ;
262
+ let new_collectors = [ ( ) ; FAMILIES ]
263
+ . map ( |_| Mutex :: new ( GlobalCollectorState :: Unsharded ( self . get_new_collector ( ) ) ) ) ;
264
+ let collectors = replace ( & mut self . collectors , new_collectors) ;
231
265
collectors
232
266
. into_par_iter ( )
233
267
. enumerate ( )
234
- . try_for_each ( |( family, collector) | {
268
+ . flat_map ( |( family, state) | {
269
+ let collector = state. into_inner ( ) ;
270
+ match collector {
271
+ GlobalCollectorState :: Unsharded ( collector) => {
272
+ Either :: Left ( [ ( family, collector) ] . into_par_iter ( ) )
273
+ }
274
+ GlobalCollectorState :: Sharded ( shards) => Either :: Right (
275
+ shards
276
+ . into_par_iter ( )
277
+ . map ( move |collector| ( family, collector) ) ,
278
+ ) ,
279
+ }
280
+ } )
281
+ . try_for_each ( |( family, mut collector) | {
235
282
let family = family as u32 ;
236
- let mut collector = collector. into_inner ( ) ;
237
283
if !collector. is_empty ( ) {
238
284
let sst = self . create_sst_file ( family, collector. sorted ( ) ) ?;
239
285
collector. clear ( ) ;
0 commit comments