@@ -9,6 +9,7 @@ use state_processing::{
9
9
use std:: borrow:: Cow ;
10
10
use std:: iter;
11
11
use std:: time:: Duration ;
12
+ use store:: metadata:: DataColumnInfo ;
12
13
use store:: { chunked_vector:: BlockRoots , AnchorInfo , BlobInfo , ChunkWriter , KeyValueStore } ;
13
14
use types:: { Hash256 , Slot } ;
14
15
@@ -66,6 +67,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
66
67
. get_anchor_info ( )
67
68
. ok_or ( HistoricalBlockError :: NoAnchorInfo ) ?;
68
69
let blob_info = self . store . get_blob_info ( ) ;
70
+ let data_column_info = self . store . get_data_column_info ( ) ;
69
71
70
72
// Take all blocks with slots less than the oldest block slot.
71
73
let num_relevant = blocks. partition_point ( |available_block| {
@@ -90,18 +92,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
90
92
return Ok ( 0 ) ;
91
93
}
92
94
93
- let n_blobs_lists_to_import = blocks_to_import
95
+ // Blobs are stored per block, and data columns are each stored individually
96
+ let n_blob_ops_per_block = if self . spec . is_peer_das_scheduled ( ) {
97
+ self . data_availability_checker . get_custody_columns_count ( )
98
+ } else {
99
+ 1
100
+ } ;
101
+
102
+ let blob_batch_size = blocks_to_import
94
103
. iter ( )
95
104
. filter ( |available_block| available_block. blobs ( ) . is_some ( ) )
96
- . count ( ) ;
105
+ . count ( )
106
+ . saturating_mul ( n_blob_ops_per_block) ;
97
107
98
108
let mut expected_block_root = anchor_info. oldest_block_parent ;
99
109
let mut prev_block_slot = anchor_info. oldest_block_slot ;
100
110
let mut chunk_writer =
101
111
ChunkWriter :: < BlockRoots , _ , _ > :: new ( & self . store . cold_db , prev_block_slot. as_usize ( ) ) ?;
102
112
let mut new_oldest_blob_slot = blob_info. oldest_blob_slot ;
113
+ let mut new_oldest_data_column_slot = data_column_info. oldest_data_column_slot ;
103
114
104
- let mut blob_batch = Vec :: with_capacity ( n_blobs_lists_to_import ) ;
115
+ let mut blob_batch = Vec :: with_capacity ( blob_batch_size ) ;
105
116
let mut cold_batch = Vec :: with_capacity ( blocks_to_import. len ( ) ) ;
106
117
let mut hot_batch = Vec :: with_capacity ( blocks_to_import. len ( ) ) ;
107
118
let mut signed_blocks = Vec :: with_capacity ( blocks_to_import. len ( ) ) ;
@@ -129,11 +140,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
129
140
. blobs_as_kv_store_ops ( & block_root, blobs, & mut blob_batch) ;
130
141
}
131
142
// Store the data columns too
132
- if let Some ( _data_columns) = maybe_data_columns {
133
- // TODO(das): depends on https://github.com/sigp/lighthouse/pull/6073
134
- // new_oldest_data_column_slot = Some(block.slot());
135
- // self.store
136
- // .data_columns_as_kv_store_ops(&block_root, data_columns, &mut blob_batch);
143
+ if let Some ( data_columns) = maybe_data_columns {
144
+ new_oldest_data_column_slot = Some ( block. slot ( ) ) ;
145
+ self . store
146
+ . data_columns_as_kv_store_ops ( & block_root, data_columns, & mut blob_batch) ;
137
147
}
138
148
139
149
// Store block roots, including at all skip slots in the freezer DB.
@@ -212,7 +222,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
212
222
self . store . hot_db . do_atomically ( hot_batch) ?;
213
223
self . store . cold_db . do_atomically ( cold_batch) ?;
214
224
215
- let mut anchor_and_blob_batch = Vec :: with_capacity ( 2 ) ;
225
+ let mut anchor_and_blob_batch = Vec :: with_capacity ( 3 ) ;
216
226
217
227
// Update the blob info.
218
228
if new_oldest_blob_slot != blob_info. oldest_blob_slot {
@@ -228,6 +238,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
228
238
}
229
239
}
230
240
241
+ // Update the data column info.
242
+ if new_oldest_data_column_slot != data_column_info. oldest_data_column_slot {
243
+ if let Some ( oldest_data_column_slot) = new_oldest_data_column_slot {
244
+ let new_data_column_info = DataColumnInfo {
245
+ oldest_data_column_slot : Some ( oldest_data_column_slot) ,
246
+ } ;
247
+ anchor_and_blob_batch. push (
248
+ self . store
249
+ . compare_and_set_data_column_info ( data_column_info, new_data_column_info) ?,
250
+ ) ;
251
+ }
252
+ }
253
+
231
254
// Update the anchor.
232
255
let new_anchor = AnchorInfo {
233
256
oldest_block_slot : prev_block_slot,
0 commit comments