diff --git a/src/accountsdb/db.zig b/src/accountsdb/db.zig index bf8f07397..e9a99866a 100644 --- a/src/accountsdb/db.zig +++ b/src/accountsdb/db.zig @@ -132,7 +132,8 @@ pub const AccountsDB = struct { pub const FileMap = std.AutoArrayHashMapUnmanaged(FileId, AccountFile); pub const InitConfig = struct { - number_of_index_shards: usize, + max_number_of_accounts: u64, + number_of_index_shards: u64, use_disk_index: bool, }; @@ -153,7 +154,7 @@ pub const AccountsDB = struct { logger, index_config, config.number_of_index_shards, - 0, + config.max_number_of_accounts, ); errdefer account_index.deinit(true); @@ -291,6 +292,10 @@ pub const AccountsDB = struct { bhs.accumulate(snapshot_manifest.bank_hash_info.stats); } + // prealloc the references + const n_accounts_estimate = n_account_files * accounts_per_file_estimate; + try self.account_index.reference_allocator.ensureCapacity(n_accounts_estimate * @sizeOf(AccountRef)); + var timer = try sig.time.Timer.start(); // short path if (n_threads == 1) { @@ -313,26 +318,28 @@ pub const AccountsDB = struct { } // setup the parallel indexing - const use_disk_index = self.config.use_disk_index; var loading_threads = try ArrayList(AccountsDB).initCapacity( self.allocator, n_parse_threads, ); + var thread_config = self.config; + thread_config.max_number_of_accounts = 0; // reference memory is shared from the main index for (0..n_parse_threads) |_| { var thread_db = loading_threads.addOneAssumeCapacity(); thread_db.* = try AccountsDB.init( per_thread_allocator, .noop, // dont spam the logs with init information (we set it after) self.snapshot_dir, - self.config, + thread_config, self.geyser_writer, ); thread_db.logger = self.logger; - // set the disk allocator after init() doesnt create a new one - if (use_disk_index) { - thread_db.account_index.reference_allocator = self.account_index.reference_allocator; - } + // set the reference allocator to the main index: + // 1) delete the old ptr so we dont leak + per_thread_allocator.destroy(thread_db.account_index.reference_allocator); + // 2) set the new ptr to the main index + thread_db.account_index.reference_allocator = self.account_index.reference_allocator; } defer { // at this defer point, there are three memory components we care about @@ -351,7 +358,6 @@ pub const AccountsDB = struct { file_map.deinit(per_thread_allocator); // NOTE: important `false` (ie, 1) - loading_thread.account_index.reference_allocator = .{ .ram = per_thread_allocator }; // dont destory the **disk** allocator (since its shared) loading_thread.account_index.deinit(false); const accounts_cache, var accounts_cache_lg = loading_thread.accounts_cache.writeWithLock(); @@ -440,15 +446,8 @@ pub const AccountsDB = struct { // allocate all the references in one shot with a wrapper allocator // without this large allocation, snapshot loading is very slow const n_accounts_estimate = n_account_files * accounts_per_file_est; - var references = try ArrayList(AccountRef).initCapacity( - self.account_index.reference_allocator.get(), - n_accounts_estimate, - ); - - const references_ptr = references.items.ptr; - - const counting_alloc = try FreeCounterAllocator.init(self.allocator, references); - defer counting_alloc.deinitIfSafe(); + const reference_allocator = self.account_index.reference_allocator; // TODO(fastload): fix + const references_buf = try reference_allocator.allocator().alloc(AccountRef, n_accounts_estimate); var timer = try sig.time.Timer.start(); var progress_timer = try sig.time.Timer.start(); @@ -477,7 +476,7 @@ pub const AccountsDB = struct { } } - var total_n_accounts: u64 = 0; + var n_accounts_total: u64 = 0; for ( file_info_map.keys()[file_map_start_index..file_map_end_index], file_info_map.values()[file_map_start_index..file_map_end_index], @@ -487,40 +486,48 @@ pub const AccountsDB = struct { var accounts_file = blk: { const file_name_bounded = sig.utils.fmt.boundedFmt("{d}.{d}", .{ slot, file_info.id.toInt() }); - const accounts_file_file = accounts_dir.openFile(file_name_bounded.constSlice(), .{ .mode = .read_write }) catch |err| { + const accounts_file = accounts_dir.openFile(file_name_bounded.constSlice(), .{ .mode = .read_write }) catch |err| { self.logger.err().logf("Failed to open accounts/{s}: {s}", .{ file_name_bounded.constSlice(), @errorName(err) }); return err; }; - errdefer accounts_file_file.close(); + errdefer accounts_file.close(); - break :blk AccountFile.init(accounts_file_file, file_info, slot) catch |err| { + break :blk AccountFile.init(accounts_file, file_info, slot) catch |err| { self.logger.err().logf("failed to *open* AccountsFile {s}: {s}\n", .{ file_name_bounded.constSlice(), @errorName(err) }); return err; }; }; errdefer accounts_file.deinit(); + // index the account file + var slot_references = std.ArrayListUnmanaged(AccountRef).initBuffer(references_buf[n_accounts_total..]); indexAndValidateAccountFile( &accounts_file, self.account_index.pubkey_ref_map.shard_calculator, shard_counts, - &references, + &slot_references, geyser_slot_storage, ) catch |err| { - self.logger.err().logf("failed to *validate/index* AccountsFile: {d}.{d}: {s}\n", .{ - accounts_file.slot, - accounts_file.id.toInt(), - @errorName(err), - }); - return err; + if (err == ValidateAccountFileError.OutOfReferenceMemory) { + std.debug.panic("accounts-per-file-estimate too small ({d}), increase (using flag '-a') and try again...", .{accounts_per_file_est}); + } else { + self.logger.err().logf("failed to *validate/index* AccountsFile: {d}.{d}: {s}\n", .{ + accounts_file.slot, + accounts_file.id.toInt(), + @errorName(err), + }); + } }; - - // NOTE: rn we dont support resizing because it invalidates pointers - // - something went wrong if we resized - if (references.items.ptr != references_ptr) { - std.debug.panic("accounts-per-file-estimate too small ({d}), increase (using flag '-a') and try again...", .{accounts_per_file_est}); + const n_accounts_this_slot = accounts_file.number_of_accounts; + if (n_accounts_this_slot == 0) { + continue; } + // track slice of references per slot + n_accounts_total += n_accounts_this_slot; + slot_reference_map.putAssumeCapacityNoClobber(slot, slot_references.items[0..n_accounts_this_slot]); + + // write to geyser if (geyser_is_enabled) { var geyser_storage = geyser_slot_storage.?; // SAFE: will always be set if geyser_is_enabled const geyser_writer = self.geyser_writer.?; // SAFE: will always be set if geyser_is_enabled @@ -538,23 +545,8 @@ pub const AccountsDB = struct { try geyser_writer.writePayloadToPipe(data_versioned); } - if (accounts_file.number_of_accounts > 0) { - // the last `number_of_accounts` is associated with this file - const start_index = references.items.len - accounts_file.number_of_accounts; - const end_index = references.items.len; - const ref_slice = references.items[start_index..end_index]; - const ref_list = ArrayList(AccountRef).fromOwnedSlice( - // deinit allocator uses the counting allocator - counting_alloc.allocator(), - ref_slice, - ); - counting_alloc.count += 1; - slot_reference_map.putAssumeCapacityNoClobber(slot, ref_list); - } - total_n_accounts += accounts_file.number_of_accounts; - + // track file const file_id = file_info.id; - file_map.putAssumeCapacityNoClobber(file_id, accounts_file); self.largest_file_id = FileId.max(self.largest_file_id, file_id); _ = self.largest_rooted_slot.fetchMax(slot, .release); @@ -573,10 +565,15 @@ pub const AccountsDB = struct { } } + // free extra memory, if we overallocated (very likely) + if (n_accounts_total != references_buf.len) { + reference_allocator.freeUnusedSpace(std.mem.asBytes(references_buf[0..n_accounts_total])); + } + // NOTE: this is good for debugging what to set `accounts_per_file_est` to if (print_progress) { self.logger.info().logf("accounts_per_file: actual vs estimated: {d} vs {d}", .{ - total_n_accounts / n_account_files, + n_accounts_total / n_account_files, accounts_per_file_est, }); } @@ -584,23 +581,18 @@ pub const AccountsDB = struct { // allocate enough memory try self.account_index.pubkey_ref_map.ensureTotalCapacity(shard_counts); + // index the references // PERF: can probs be faster if you sort the pubkeys first, and then you know // it will always be a search for a free spot, and not search for a match timer.reset(); - var ref_count: usize = 0; - var slot_iter = slot_reference_map.keyIterator(); - while (slot_iter.next()) |slot| { - const refs = slot_reference_map.get(slot.*).?; - for (refs.items) |*ref| { - _ = self.account_index.indexRefIfNotDuplicateSlotAssumeCapacity(ref); - ref_count += 1; - } + for (references_buf[0..n_accounts_total], 0..) |*ref, ref_count| { + _ = self.account_index.indexRefIfNotDuplicateSlotAssumeCapacity(ref); if (print_progress and progress_timer.read().asNanos() > DB_LOG_RATE.asNanos()) { printTimeEstimate( self.logger, &timer, - total_n_accounts, + n_accounts_total, ref_count, "building index", "thread0", @@ -1759,11 +1751,7 @@ pub const AccountsDB = struct { } // update the references - var new_reference_block = try ArrayList(AccountRef).initCapacity( - self.account_index.reference_allocator.get(), - accounts_alive_count, - ); - + const new_reference_block = try self.account_index.reference_allocator.allocator().alloc(AccountRef, accounts_alive_count); account_iter.reset(); var offset_index: u64 = 0; for (is_alive_flags.items) |is_alive| { @@ -1784,7 +1772,7 @@ pub const AccountsDB = struct { }; // copy + update the values - const new_ref_ptr = new_reference_block.addOneAssumeCapacity(); + const new_ref_ptr = &new_reference_block[offset_index]; new_ref_ptr.* = ptr_to_ref_field.*.*; new_ref_ptr.location.File = .{ .offset = offsets.items[offset_index], @@ -1803,9 +1791,10 @@ pub const AccountsDB = struct { const slot_reference_map_entry = slot_reference_map.getEntry(slot) orelse { std.debug.panic("missing corresponding reference memory for slot {d}\n", .{slot}); }; + + // free the old reference memory // NOTE: this is ok because nothing points to this old reference memory - // deinit old block of reference memory - slot_reference_map_entry.value_ptr.deinit(); + self.account_index.reference_allocator.allocator().free(slot_reference_map_entry.value_ptr.*); // point to new block slot_reference_map_entry.value_ptr.* = new_reference_block; } @@ -1863,9 +1852,12 @@ pub const AccountsDB = struct { } // free the reference memory - self.account_index.freeReferenceBlock(slot) catch |err| switch (err) { - error.MemoryNotFound => std.debug.panic("memory block @ slot not found: {d}", .{slot}), - }; + { + var slot_ref_map, var lock = self.account_index.slot_reference_map.writeWithLock(); + defer lock.unlock(); + const r = slot_ref_map.fetchRemove(slot) orelse std.debug.panic("slot reference map not found for slot: {d}", .{slot}); + self.account_index.reference_allocator.allocator().free(r.value); + } // free the account memory for (accounts) |account| { @@ -2098,11 +2090,8 @@ pub const AccountsDB = struct { defer self.allocator.free(shard_counts); @memset(shard_counts, 0); - var references = try ArrayList(AccountRef).initCapacity( - self.account_index.reference_allocator.get(), - n_accounts, - ); - + const reference_buf = try self.account_index.reference_allocator.allocator().alloc(AccountRef, n_accounts); + var references = std.ArrayListUnmanaged(AccountRef).initBuffer(reference_buf); try indexAndValidateAccountFile( account_file, self.account_index.pubkey_ref_map.shard_calculator, @@ -2112,7 +2101,13 @@ pub const AccountsDB = struct { // to support geyser null, ); - try self.account_index.putReferenceBlock(account_file.slot, references); + + // track the slot's references + { + const slot_ref_map, var lock = self.account_index.slot_reference_map.writeWithLock(); + defer lock.unlock(); + try slot_ref_map.putNoClobber(account_file.slot, reference_buf); + } { const file_map, var file_map_lg = self.file_map.writeWithLock(); @@ -2120,7 +2115,7 @@ pub const AccountsDB = struct { try file_map.put(self.allocator, account_file.id, account_file.*); - // we update the bank hash stats while locking the file map to avoid + // NOTE: we update the bank hash stats while locking the file map to avoid // reading accounts from the file map and getting inaccurate/stale // bank hash stats. var account_iter = account_file.iterator(); @@ -2137,8 +2132,7 @@ pub const AccountsDB = struct { // allocate enough memory here try self.account_index.pubkey_ref_map.ensureTotalAdditionalCapacity(shard_counts); - - // compute how many account_references for each pubkey + // index the accounts var accounts_dead_count: u64 = 0; for (references.items) |*ref| { const was_inserted = self.account_index.indexRefIfNotDuplicateSlotAssumeCapacity(ref); @@ -2220,38 +2214,39 @@ pub const AccountsDB = struct { } try self.account_index.pubkey_ref_map.ensureTotalAdditionalCapacity(shard_counts); - // update index + // index the accounts + const reference_buf = try self.account_index.reference_allocator.allocator().alloc(AccountRef, accounts.len); var accounts_dead_count: u64 = 0; - var references = try ArrayList(AccountRef).initCapacity( - self.account_index.reference_allocator.get(), - accounts.len, - ); for (0..accounts.len) |i| { - const ref_ptr = references.addOneAssumeCapacity(); - ref_ptr.* = AccountRef{ + reference_buf[i] = AccountRef{ .pubkey = pubkeys[i], .slot = slot, .location = .{ .UnrootedMap = .{ .index = i } }, }; - const was_inserted = self.account_index.indexRefIfNotDuplicateSlotAssumeCapacity(ref_ptr); + const was_inserted = self.account_index.indexRefIfNotDuplicateSlotAssumeCapacity(&reference_buf[i]); if (!was_inserted) { self.logger.warn().logf( "duplicate reference not inserted: slot: {d} pubkey: {s}", - .{ ref_ptr.slot, ref_ptr.pubkey }, + .{ slot, pubkeys[i] }, ); accounts_dead_count += 1; } - std.debug.assert(self.account_index.exists(&pubkeys[i], slot)); } + // track the slot's references + { + const slot_ref_map, var lock = self.account_index.slot_reference_map.writeWithLock(); + defer lock.unlock(); + try slot_ref_map.putNoClobber(slot, reference_buf); + } + if (accounts_dead_count != 0) { const dead_accounts, var dead_accounts_lg = self.dead_accounts_counter.writeWithLock(); defer dead_accounts_lg.unlock(); try dead_accounts.putNoClobber(slot, accounts_dead_count); } - try self.account_index.putReferenceBlock(slot, references); } /// Returns a pointer to the bank hash stats for the given slot, and a lock guard on the @@ -2830,13 +2825,14 @@ pub const ValidateAccountFileError = error{ ShardCountMismatch, InvalidAccountFileLength, OutOfMemory, + OutOfReferenceMemory, } || AccountInFile.ValidateError || GeyserTmpStorage.Error; pub fn indexAndValidateAccountFile( accounts_file: *AccountFile, shard_calculator: PubkeyShardCalculator, shard_counts: []usize, - account_refs: *ArrayList(AccountRef), + account_refs: *ArrayListUnmanaged(AccountRef), geyser_storage: ?*GeyserTmpStorage, ) ValidateAccountFileError!void { var offset: usize = 0; @@ -2854,7 +2850,10 @@ pub fn indexAndValidateAccountFile( try storage.cloneAndTrack(account); } - try account_refs.append(.{ + if (account_refs.capacity == account_refs.items.len) { + return error.OutOfReferenceMemory; + } + account_refs.appendAssumeCapacity(.{ .pubkey = account.store_info.pubkey, .slot = accounts_file.slot, .location = .{ @@ -2879,88 +2878,6 @@ pub fn indexAndValidateAccountFile( accounts_file.number_of_accounts = number_of_accounts; } -/// allocator which counts the number of times free is called. when count -/// reaches 0, it will deinit the full arraylist. useful for when you want -/// to allocate a large Arraylist and split it across multiple different -/// ArrayLists -- alloc and resize are not implemented. -/// -/// see `loadAndVerifyAccountsFiles` for an example of how to use this allocator -const FreeCounterAllocator = struct { - /// optional heap allocator to deinit the ptr on deinit - self_allocator: std.mem.Allocator, - references: ArrayList(AccountRef), - count: usize, - - const Self = @This(); - - pub fn init(self_allocator: std.mem.Allocator, references: ArrayList(AccountRef)) !*Self { - const self = try self_allocator.create(Self); - self.* = .{ - .self_allocator = self_allocator, - .references = references, - .count = 0, - }; - return self; - } - - pub fn allocator(self: *Self) std.mem.Allocator { - return std.mem.Allocator{ - .ptr = self, - .vtable = &.{ - .alloc = alloc, - .resize = resize, - .free = free, - }, - }; - } - - pub fn deinitIfSafe(self: *Self) void { - if (self.count == 0) { - self.deinit(); - } - } - - pub fn deinit(self: *Self) void { - // this shouldnt happen often but just in case - if (self.count != 0) { - std.debug.print( - "Reference Counting Allocator deinit with count = {} (!= 0)\n", - .{self.count}, - ); - } - self.references.deinit(); - // free pointer - self.self_allocator.destroy(self); - } - - pub fn alloc(ctx: *anyopaque, n: usize, log2_align: u8, return_address: usize) ?[*]u8 { - _ = ctx; - _ = n; - _ = log2_align; - _ = return_address; - @panic("not implemented"); - } - - pub fn resize(ctx: *anyopaque, buf: []u8, log2_align: u8, new_len: usize, return_address: usize) bool { - _ = ctx; - _ = buf; - _ = log2_align; - _ = new_len; - _ = return_address; - @panic("not implemented"); - } - - pub fn free(ctx: *anyopaque, buf: []u8, log2_align: u8, return_address: usize) void { - _ = buf; - _ = log2_align; - _ = return_address; - - const self: *Self = @ptrCast(@alignCast(ctx)); - self.count -|= 1; - self.deinitIfSafe(); - } -}; - /// All entries in `snapshot_fields.accounts_db_fields.file_map` must correspond to an entry in `file_map`, /// with the association defined by the file id (a field of the value of the former, the key of the latter). pub fn writeSnapshotTarWithFields( @@ -3040,7 +2957,7 @@ fn testWriteSnapshotIncremental( var snap_fields = try SnapshotFields.decodeFromBincode(allocator, manifest_file.reader()); defer snap_fields.deinit(allocator); - _ = try accounts_db.loadFromSnapshot(snap_fields.accounts_db_fields, 1, allocator, 1_500); + _ = try accounts_db.loadFromSnapshot(snap_fields.accounts_db_fields, 1, allocator, 500); const snapshot_gen_info = try accounts_db.generateIncrementalSnapshot(.{ .target_slot = slot, @@ -3091,10 +3008,17 @@ test "testWriteSnapshot" { try parallelUnpackZstdTarBall(allocator, .noop, archive_file, tmp_snap_dir, 4, false); } - var accounts_db = try AccountsDB.init(allocator, .noop, tmp_snap_dir, .{ - .number_of_index_shards = ACCOUNT_INDEX_SHARDS, - .use_disk_index = false, - }, null); + var accounts_db = try AccountsDB.init( + allocator, + .noop, + tmp_snap_dir, + .{ + .number_of_index_shards = ACCOUNT_INDEX_SHARDS, + .use_disk_index = false, + .max_number_of_accounts = 100_000, + }, + null, + ); defer accounts_db.deinit(); try testWriteSnapshotFull( @@ -3162,6 +3086,7 @@ fn loadTestAccountsDB(allocator: std.mem.Allocator, use_disk: bool, n_threads: u var accounts_db = try AccountsDB.init(allocator, logger, dir, .{ .number_of_index_shards = 4, .use_disk_index = use_disk, + .max_number_of_accounts = 10_000, }, null); errdefer accounts_db.deinit(); @@ -3229,6 +3154,7 @@ test "geyser stream on load" { .{ .number_of_index_shards = 4, .use_disk_index = false, + .max_number_of_accounts = 0, }, geyser_writer, ); @@ -3241,7 +3167,7 @@ test "geyser stream on load" { snapshot.accounts_db_fields, 1, allocator, - 1_500, + 300, ); } @@ -3399,6 +3325,7 @@ test "flushing slots works" { var accounts_db = try AccountsDB.init(allocator, logger, snapshot_dir, .{ .number_of_index_shards = 4, .use_disk_index = false, + .max_number_of_accounts = 1_000, }, null); defer accounts_db.deinit(); @@ -3450,6 +3377,7 @@ test "purge accounts in cache works" { var accounts_db = try AccountsDB.init(allocator, logger, snapshot_dir, .{ .number_of_index_shards = 4, .use_disk_index = false, + .max_number_of_accounts = 1_000, }, null); defer accounts_db.deinit(); @@ -3507,6 +3435,7 @@ test "clean to shrink account file works with zero-lamports" { var accounts_db = try AccountsDB.init(allocator, logger, snapshot_dir, .{ .number_of_index_shards = 4, .use_disk_index = false, + .max_number_of_accounts = 1_000, }, null); defer accounts_db.deinit(); @@ -3583,6 +3512,7 @@ test "clean to shrink account file works" { var accounts_db = try AccountsDB.init(allocator, logger, snapshot_dir, .{ .number_of_index_shards = 4, .use_disk_index = false, + .max_number_of_accounts = 1_000, }, null); defer accounts_db.deinit(); @@ -3651,6 +3581,7 @@ test "full clean account file works" { var accounts_db = try AccountsDB.init(allocator, logger, snapshot_dir, .{ .number_of_index_shards = 4, .use_disk_index = false, + .max_number_of_accounts = 1_000, }, null); defer accounts_db.deinit(); @@ -3736,6 +3667,7 @@ test "shrink account file works" { var accounts_db = try AccountsDB.init(allocator, logger, snapshot_dir, .{ .number_of_index_shards = 4, .use_disk_index = false, + .max_number_of_accounts = 1_000, }, null); defer accounts_db.deinit(); @@ -3819,7 +3751,7 @@ test "shrink account file works" { defer slot_reference_map_lg.unlock(); const slot_mem = slot_reference_map.get(new_slot).?; - try std.testing.expect(slot_mem.items.len == accounts2.len); + try std.testing.expect(slot_mem.len == accounts2.len); } // test: files were shrunk @@ -3864,7 +3796,7 @@ test "shrink account file works" { defer slot_reference_map_lg.unlock(); const slot_mem = slot_reference_map.get(slot).?; - try std.testing.expectEqual(1, slot_mem.items.len); + try std.testing.expectEqual(1, slot_mem.len); } // last account ref should still be accessible @@ -3948,6 +3880,7 @@ pub const BenchmarkAccountsDBSnapshotLoad = struct { var accounts_db = try AccountsDB.init(allocator, logger, snapshot_dir, .{ .number_of_index_shards = 32, .use_disk_index = bench_args.use_disk, + .max_number_of_accounts = 1_000, }, null); defer accounts_db.deinit(); @@ -4128,6 +4061,7 @@ pub const BenchmarkAccountsDB = struct { var accounts_db: AccountsDB = try AccountsDB.init(allocator, logger, snapshot_dir, .{ .number_of_index_shards = ACCOUNT_INDEX_SHARDS, .use_disk_index = bench_args.index == .disk, + .max_number_of_accounts = total_n_accounts, }, null); defer accounts_db.deinit(); diff --git a/src/accountsdb/fuzz.zig b/src/accountsdb/fuzz.zig index 5cea57bc1..db745c0a8 100644 --- a/src/accountsdb/fuzz.zig +++ b/src/accountsdb/fuzz.zig @@ -98,6 +98,7 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void { .{ .number_of_index_shards = sig.accounts_db.db.ACCOUNT_INDEX_SHARDS, .use_disk_index = use_disk, + .max_number_of_accounts = 1_000_000, // TODO: other things we can fuzz (number of shards, ...) }, null, @@ -320,7 +321,7 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void { var alt_accounts_db = try AccountsDB.init(allocator, .noop, alternative_snapshot_dir, accounts_db.config, null); defer alt_accounts_db.deinit(); - _ = try alt_accounts_db.loadWithDefaults(allocator, &snapshot_fields, 1, true, 1_500); + _ = try alt_accounts_db.loadWithDefaults(allocator, &snapshot_fields, 1, true, 500); const maybe_inc_slot = if (snapshot_files.incremental_snapshot) |inc| inc.slot else null; logger.info().logf("loaded and validated snapshot at slot: {} (and inc snapshot @ slot {any})", .{ snapshot_info.slot, maybe_inc_slot }); } diff --git a/src/accountsdb/index.zig b/src/accountsdb/index.zig index b77c2c37b..932f6bd33 100644 --- a/src/accountsdb/index.zig +++ b/src/accountsdb/index.zig @@ -8,6 +8,7 @@ const FileId = sig.accounts_db.accounts_file.FileId; const RwMux = sig.sync.RwMux; const SwissMap = sig.accounts_db.swiss_map.SwissMap; const DiskMemoryAllocator = sig.utils.allocators.DiskMemoryAllocator; +const RecycleFBA = sig.utils.allocators.RecycleFBA; /// reference to an account (either in a file or in the unrooted_map) pub const AccountRef = struct { @@ -41,21 +42,17 @@ pub const AccountRef = struct { /// Analogous to [AccountsIndex](https://github.com/anza-xyz/agave/blob/a6b2283142192c5360ad0f53bec1eb4a9fb36154/accounts-db/src/accounts_index.rs#L644) pub const AccountIndex = struct { allocator: std.mem.Allocator, - /// map from Pubkey -> AccountRefHead pubkey_ref_map: ShardedPubkeyRefMap, - - /// things for managing the AccountRef memory - reference_allocator: ReferenceAllocator, /// map from slot -> []AccountRef slot_reference_map: RwMux(SlotRefMap), + // this uses the reference_allocator to allocate a max number of references + // which can be re-used throughout the life of the program (ie, manages the state of free/used AccountRefs) + reference_allocator: *RecycleFBA(.{}), + // this is the allocator used to allocate bytes for the reference_allocator + underlying_reference_allocator: ReferenceAllocator, - // TODO(fastload): add field []AccountRef which is a single allocation of a large array of AccountRefs - // reads can access this directly - // TODO(fastload): recycle_fba.init([]AccountRef) - this will manage the state of free/used AccountRefs - - // TODO(fastload): change to []AccountRef - pub const SlotRefMap = std.AutoHashMap(Slot, std.ArrayList(AccountRef)); + pub const SlotRefMap = std.AutoHashMap(Slot, []AccountRef); pub const AllocatorConfig = union(enum) { Ram: struct { allocator: std.mem.Allocator }, Disk: struct { accountsdb_dir: std.fs.Dir }, @@ -72,7 +69,7 @@ pub const AccountIndex = struct { number_of_shards: usize, max_account_references: u64, ) !Self { - const reference_allocator: ReferenceAllocator = switch (allocator_config) { + const underlying_reference_allocator: ReferenceAllocator = switch (allocator_config) { .Ram => |ram| blk: { logger.info().logf("using ram memory for account index", .{}); break :blk .{ .ram = ram.allocator }; @@ -86,13 +83,21 @@ pub const AccountIndex = struct { break :blk .{ .disk = .{ .dma = disk_allocator, .ptr_allocator = allocator } }; }, }; - _ = max_account_references; // TODO(fastload): use this + + const reference_allocator = try RecycleFBA(.{}).create( + .{ + .records_allocator = allocator, + .bytes_allocator = underlying_reference_allocator.get(), + }, + max_account_references * @sizeOf(AccountRef), + ); return .{ .allocator = allocator, - .reference_allocator = reference_allocator, .pubkey_ref_map = try ShardedPubkeyRefMap.init(allocator, number_of_shards), .slot_reference_map = RwMux(SlotRefMap).init(SlotRefMap.init(allocator)), + .reference_allocator = reference_allocator, + .underlying_reference_allocator = underlying_reference_allocator, }; } @@ -102,31 +107,14 @@ pub const AccountIndex = struct { { const slot_reference_map, var slot_reference_map_lg = self.slot_reference_map.writeWithLock(); defer slot_reference_map_lg.unlock(); - - if (free_memory) { - var iter = slot_reference_map.iterator(); - while (iter.next()) |entry| { - entry.value_ptr.deinit(); - } - } slot_reference_map.deinit(); } - self.reference_allocator.deinit(); - } - - pub fn putReferenceBlock(self: *Self, slot: Slot, references: std.ArrayList(AccountRef)) !void { - const slot_reference_map, var slot_reference_map_lg = self.slot_reference_map.writeWithLock(); - defer slot_reference_map_lg.unlock(); - try slot_reference_map.putNoClobber(slot, references); - } - - pub fn freeReferenceBlock(self: *Self, slot: Slot) error{MemoryNotFound}!void { - const slot_reference_map, var slot_reference_map_lg = self.slot_reference_map.writeWithLock(); - defer slot_reference_map_lg.unlock(); - - const removed_kv = slot_reference_map.fetchRemove(slot) orelse return error.MemoryNotFound; - removed_kv.value.deinit(); + if (free_memory) { + self.reference_allocator.deinit(); + self.allocator.destroy(self.reference_allocator); + } + self.underlying_reference_allocator.deinit(); } pub const ReferenceParent = union(enum) { diff --git a/src/cmd/cmd.zig b/src/cmd/cmd.zig index a50a242c7..45d218c2e 100644 --- a/src/cmd/cmd.zig +++ b/src/cmd/cmd.zig @@ -268,6 +268,14 @@ pub fn run() !void { .value_name = "accounts_per_file_estimate", }; + var max_number_of_accounts_option = cli.Option{ + .long_name = "max-number-of-accounts", + .help = "maximum number of accounts to store in the accounts db", + .value_ref = cli.mkRef(&config.current.accounts_db.max_number_of_accounts), + .required = false, + .value_name = "max_number_of_accounts", + }; + // geyser options var enable_geyser_option = cli.Option{ .long_name = "enable-geyser", @@ -378,6 +386,7 @@ pub fn run() !void { &number_of_index_shards_option, &genesis_file_path, &accounts_per_file_estimate, + &max_number_of_accounts_option, // geyser &enable_geyser_option, &geyser_pipe_path_option, @@ -474,6 +483,7 @@ pub fn run() !void { &number_of_index_shards_option, &genesis_file_path, &accounts_per_file_estimate, + &max_number_of_accounts_option, // geyser &enable_geyser_option, &geyser_pipe_path_option, @@ -1427,6 +1437,7 @@ fn loadSnapshot( .{ .number_of_index_shards = config.current.accounts_db.number_of_index_shards, .use_disk_index = config.current.accounts_db.use_disk_index, + .max_number_of_accounts = config.current.accounts_db.max_number_of_accounts, }, geyser_writer, ); diff --git a/src/cmd/config.zig b/src/cmd/config.zig index ebd3a281b..394ddea0a 100644 --- a/src/cmd/config.zig +++ b/src/cmd/config.zig @@ -121,6 +121,10 @@ pub const AccountsDBConfig = struct { force_new_snapshot_download: bool = false, /// estimate of the number of accounts per file (used for preallocation) accounts_per_file_estimate: u64 = 1_500, + /// maximum number of accounts to manage references to for the account index. if loading from a snapshot, + /// the number of accounts supported will be max(this, accounts_per_file_estimate * number_of_account_files). + /// NOTE: usually number_of_account_files is ~400K. + max_number_of_accounts: u64 = 1_000_000, }; pub const GeyserConfig = struct { diff --git a/src/utils/allocators.zig b/src/utils/allocators.zig index 5f54ddb07..bc5d4a527 100644 --- a/src/utils/allocators.zig +++ b/src/utils/allocators.zig @@ -38,6 +38,12 @@ pub fn RecycleFBA(config: struct { }; } + pub fn create(allocator_config: AllocatorConfig, n_bytes: u64) !*Self { + const self = try allocator_config.records_allocator.create(Self); + self.* = try Self.init(allocator_config, n_bytes); + return self; + } + pub fn deinit(self: *Self) void { self.bytes_allocator.free(self.fba_allocator.buffer); self.records.deinit(); @@ -62,7 +68,7 @@ pub fn RecycleFBA(config: struct { defer if (config.thread_safe) self.mux.unlock(); if (n > self.fba_allocator.buffer.len) { - @panic("RecycleFBA.alloc: requested size too large, make the buffer larger"); + std.debug.panic("RecycleFBA.alloc: requested size too large ({d} > {d}), make the buffer larger", .{ n, self.fba_allocator.buffer.len }); } // check for a buf to recycle @@ -73,6 +79,7 @@ pub fn RecycleFBA(config: struct { { if (item.is_free) { item.is_free = false; + // TODO/PERF: if this is an overallocation, we could split it return item.buf; } else { // additional saftey check @@ -87,7 +94,7 @@ pub fn RecycleFBA(config: struct { if (!is_possible_to_recycle) { // not enough memory to allocate and no possible recycles will be perma stuck // TODO(x19): loop this and have a comptime limit? - self.tryCollapse(); + self.collapse(); if (!self.isPossibleToAllocate(n, log2_align)) { @panic("RecycleFBA.alloc: no possible recycles and not enough memory to allocate"); } @@ -156,6 +163,43 @@ pub fn RecycleFBA(config: struct { return false; } + pub fn ensureCapacity(self: *Self, n: u64) !void { + const current_buf = self.fba_allocator.buffer; + if (current_buf.len >= n) return; + + if (config.thread_safe) self.mux.lock(); + defer if (config.thread_safe) self.mux.unlock(); + + if (!self.bytes_allocator.resize(current_buf, n)) { + const current_usage = self.fba_allocator.end_index; + if (current_usage != 0) return error.ResizeUsedAllocatorNotSupported; + + // NOTE: this can be expensive on memory (if two large bufs) + const new_buf = try self.bytes_allocator.alloc(u8, n); + self.fba_allocator.buffer = new_buf; + self.bytes_allocator.free(current_buf); + } + } + + /// frees the unused space of a buf. + /// this is useful when a buf is initially overallocated and then resized. + pub fn freeUnusedSpace(self: *Self, valid_buf: []u8) void { + if (config.thread_safe) self.mux.lock(); + defer if (config.thread_safe) self.mux.unlock(); + + for (self.records.items) |*record| { + if (record.buf == valid_buf.ptr) { + const unused_len = record.len - valid_buf.len; + if (unused_len > 0) { + const unused_buf_ptr = valid_buf.ptr + valid_buf.len + 1; + self.records.append(.{ .is_free = true, .buf = unused_buf_ptr, .len = unused_len }) catch { + @panic("RecycleFBA.freeUnusedSpace: unable to append to records"); + }; + } + } + } + } + pub fn isPossibleToAllocate(self: *Self, n: u64, log2_align: u8) bool { // direct alloc check const fba_size_left = self.fba_allocator.buffer.len - self.fba_allocator.end_index; @@ -176,8 +220,8 @@ pub fn RecycleFBA(config: struct { } /// collapses adjacent free records into a single record - pub fn tryCollapse(self: *Self) void { - var new_records = std.ArrayList(Record).init(self.bytes_allocator); + pub fn collapse(self: *Self) void { + var new_records = std.ArrayList(Record).init(self.records.allocator); var last_was_free = false; for (self.records.items) |record| { @@ -187,12 +231,12 @@ pub fn RecycleFBA(config: struct { } else { last_was_free = true; new_records.append(record) catch { - @panic("RecycleFBA.tryCollapse: unable to append to new_records"); + @panic("RecycleFBA.collapse: unable to append to new_records"); }; } } else { new_records.append(record) catch { - @panic("RecycleFBA.tryCollapse: unable to append to new_records"); + @panic("RecycleFBA.collapse: unable to append to new_records"); }; last_was_free = false; } @@ -219,17 +263,17 @@ pub const DiskMemoryAllocator = struct { pub const MmapRatio = u16; /// Metadata stored at the end of each allocation. - const Metadata = extern struct { + pub const Metadata = extern struct { file_index: u32, mmap_size: usize align(4), }; /// Returns the aligned size with enough space for `size` and `Metadata` at the end. - inline fn alignedFileSize(size: usize) usize { + pub inline fn alignedFileSize(size: usize) usize { return std.mem.alignForward(usize, size + @sizeOf(Metadata), std.mem.page_size); } - inline fn alignedMmapSize( + pub inline fn alignedMmapSize( /// Must be `= alignedFileSize(size)`. aligned_file_size: usize, mmap_ratio: MmapRatio, @@ -253,7 +297,7 @@ pub const DiskMemoryAllocator = struct { /// mmaps at least enough memory to the file for `size`, the metadata, and optionally /// more based on the `mmap_ratio` field, in order to accommodate potential growth /// from `resize` calls. - fn alloc(ctx: *anyopaque, size: usize, log2_align: u8, return_address: usize) ?[*]u8 { + pub fn alloc(ctx: *anyopaque, size: usize, log2_align: u8, return_address: usize) ?[*]u8 { _ = return_address; const self: *Self = @ptrCast(@alignCast(ctx)); std.debug.assert(self.mmap_ratio != 0); @@ -301,7 +345,7 @@ pub const DiskMemoryAllocator = struct { } /// Resizes the allocation within the bounds of the mmap'd address space if possible. - fn resize( + pub fn resize( ctx: *anyopaque, buf: []u8, log2_align: u8, @@ -347,7 +391,7 @@ pub const DiskMemoryAllocator = struct { } /// unmaps the memory and deletes the associated file. - fn free(ctx: *anyopaque, buf: []u8, log2_align: u8, return_address: usize) void { + pub fn free(ctx: *anyopaque, buf: []u8, log2_align: u8, return_address: usize) void { _ = return_address; const self: *Self = @ptrCast(@alignCast(ctx)); std.debug.assert(self.mmap_ratio != 0); @@ -371,15 +415,15 @@ pub const DiskMemoryAllocator = struct { }; } - fn logFailure(self: Self, err: anyerror, file_name: []const u8) void { + pub fn logFailure(self: Self, err: anyerror, file_name: []const u8) void { self.logger.err().logf("Disk Memory Allocator error: {s}, filepath: {s}", .{ @errorName(err), sig.utils.fmt.tryRealPath(self.dir, file_name), }); } - const file_name_max_len = sig.utils.fmt.boundedLenValue("bin_{d}", .{std.math.maxInt(u32)}); - inline fn fileNameBounded(file_index: u32) std.BoundedArray(u8, file_name_max_len) { - return sig.utils.fmt.boundedFmt("bin_{d}", .{file_index}); + const file_name_max_len = sig.utils.fmt.boundedLenValue("memory_{d}", .{std.math.maxInt(u32)}); + pub inline fn fileNameBounded(file_index: u32) std.BoundedArray(u8, file_name_max_len) { + return sig.utils.fmt.boundedFmt("memory_{d}", .{file_index}); } }; @@ -445,7 +489,29 @@ pub const failing = struct { } }; -test "recycle allocator: tryCollapse" { +test "recycle allocator: freeUnused" { + const backing_allocator = std.testing.allocator; + var allocator = try RecycleFBA(.{}).init(.{ + .records_allocator = backing_allocator, + .bytes_allocator = backing_allocator, + }, 200); + defer allocator.deinit(); + + // alloc a slice of 100 bytes + const bytes = try allocator.allocator().alloc(u8, 100); + defer allocator.allocator().free(bytes); + // free the unused space + allocator.freeUnusedSpace(bytes[0..50]); + + // this should be ok now + const bytes2 = try allocator.allocator().alloc(u8, 50); + defer allocator.allocator().free(bytes2); + + const expected_ptr: [*]u8 = @alignCast(@ptrCast(&bytes[51])); + try std.testing.expectEqual(expected_ptr, bytes2.ptr); +} + +test "recycle allocator: collapse" { const bytes_allocator = std.testing.allocator; var allocator = try RecycleFBA(.{}).init(.{ .records_allocator = bytes_allocator, @@ -462,7 +528,7 @@ test "recycle allocator: tryCollapse" { allocator.allocator().free(bytes); allocator.allocator().free(bytes2); - allocator.tryCollapse(); + allocator.collapse(); // this should be ok now const bytes3 = try allocator.allocator().alloc(u8, 150); allocator.allocator().free(bytes3); @@ -559,8 +625,10 @@ test "disk allocator on arraylists" { }; const dma = dma_state.allocator(); + const file0 = "memory_0"; // DiskMemoryAllocator.fileNameBounded(0).constSlice(); + const file1 = "memory_1"; // DiskMemoryAllocator.fileNameBounded(1).constSlice(); { - try std.testing.expectError(error.FileNotFound, tmp_dir.access("bin_0", .{})); // this should not exist + try std.testing.expectError(error.FileNotFound, tmp_dir.access(file0, .{})); // this should not exist var disk_account_refs = try std.ArrayList(u8).initCapacity(dma, 1); defer disk_account_refs.deinit(); @@ -574,18 +642,18 @@ test "disk allocator on arraylists" { try std.testing.expectEqual(19, disk_account_refs.items[0]); try std.testing.expectEqual(21, disk_account_refs.items[1]); - try tmp_dir.access("bin_0", .{}); // this should exist - try std.testing.expectError(error.FileNotFound, tmp_dir.access("bin_1", .{})); // this should not exist + try tmp_dir.access(file0, .{}); // this should exist + try std.testing.expectError(error.FileNotFound, tmp_dir.access(file1, .{})); // this should not exist const array_ptr = try dma.create([4096]u8); defer dma.destroy(array_ptr); @memset(array_ptr, 0); - try tmp_dir.access("bin_1", .{}); // this should now exist + try tmp_dir.access(file1, .{}); // this should now exist } - try std.testing.expectError(error.FileNotFound, tmp_dir.access("bin_0", .{})); - try std.testing.expectError(error.FileNotFound, tmp_dir.access("bin_1", .{})); + try std.testing.expectError(error.FileNotFound, tmp_dir.access(file0, .{})); + try std.testing.expectError(error.FileNotFound, tmp_dir.access(file1, .{})); } test "disk allocator large realloc" {