Skip to content

Commit

Permalink
feat(accountsdb): improved struct/var naming (#320)
Browse files Browse the repository at this point in the history
* fix: chage recycle_fba init to be more specific

* move disk allocator to inside AccountIndex.init

* change to new allocator config for the index init

* naming: rename reference memory

* move bin logic to `ShardedPubkeyRefMap` and rename to shards

* test fixes

* improve hash eql

* small import change

* fix: ReferenceAllocator struct

* fix: const slices

* fix: remove inlines
  • Loading branch information
0xNineteen authored Oct 21, 2024
1 parent e01e5a9 commit fbe858e
Show file tree
Hide file tree
Showing 14 changed files with 582 additions and 499 deletions.
407 changes: 180 additions & 227 deletions src/accountsdb/db.zig

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/accountsdb/fuzz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
logger,
snapshot_dir,
.{
.number_of_index_bins = sig.accounts_db.db.ACCOUNT_INDEX_BINS,
.number_of_index_shards = sig.accounts_db.db.ACCOUNT_INDEX_SHARDS,
.use_disk_index = use_disk,
// TODO: other things we can fuzz (number of bins, ...)
// TODO: other things we can fuzz (number of shards, ...)
},
null,
);
Expand Down
547 changes: 325 additions & 222 deletions src/accountsdb/index.zig

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/accountsdb/lib.zig
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ pub const StatusCache = snapshots.StatusCache;
pub const downloadSnapshotsFromGossip = download.downloadSnapshotsFromGossip;
pub const parallelUnpackZstdTarBall = snapshots.parallelUnpackZstdTarBall;

pub const ACCOUNT_INDEX_BINS = db.ACCOUNT_INDEX_BINS;
pub const ACCOUNT_INDEX_SHARDS = db.ACCOUNT_INDEX_SHARDS;
14 changes: 7 additions & 7 deletions src/accountsdb/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ with deletion. This allows us to *not* use a lock per-account-file.

## account index

The account index shards pubkeys across multiple bins where each pubkey is associated with a specific bin based on the pubkey’s first N bits.
The account index shards pubkeys across multiple shards where each pubkey is associated with a specific shard based on the pubkey’s first N bits.
This allows for parallel read/write access to the database (locking only
a single bin for each lookup vs the entire struct).
a single shard for each lookup vs the entire struct).

![](imgs/2024-04-24-15-10-09.png)

Expand Down Expand Up @@ -279,14 +279,14 @@ across multiple threads (part1 of the diagram below) - this means each thread:
- reads and mmaps every account file
- creates and populates an `ArrayList(AccountRef)` with every account it
parses from the account files
- populates their own sharded index by binning the pubkeys and populating
- populates their own sharded index by sharding the pubkeys and populating
the hashmap with the `*AccountRef`s

the result is N threads (`--n-threads-snapshot-load` decides the value for N) each with their own account index, which we now need
to combine. to combine indexes we merge index bins in parallel across threads.
to comsharde. to combine indexes we merge index shards in parallel across threads.

for example, one thread will merge bins[0..10] another will merge bins [10..20],
... etc for all the bins across all the threads.
for example, one thread will merge shards[0..10] another will merge shards[10..20],
... etc for all the shards across all the threads.

this approach generates the index with zero locks

Expand All @@ -311,7 +311,7 @@ the goal of validating snapshots is to generate a merkle tree over all the accou
is `validateLoadFromSnapshot`.

we take the following approach:
- account hashes are collected in parallel across bins using `getHashesFromIndexMultiThread` - similar to how the index is generated
- account hashes are collected in parallel across shards using `getHashesFromIndexMultiThread` - similar to how the index is generated
- each thread will have a slice of hashes, the root hash is computed against this nested slices using `NestedHashTree`

note: pubkeys are also sorted so results are consistent
Expand Down
23 changes: 14 additions & 9 deletions src/accountsdb/swiss_map.zig
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,12 @@ pub fn SwissMapUnmanaged(
}

test "swissmap resize" {
var map = SwissMap(sig.core.Pubkey, accounts_db.index.AccountRef, accounts_db.index.pubkey_hash, accounts_db.index.pubkey_eql).init(std.testing.allocator);
var map = SwissMap(
sig.core.Pubkey,
accounts_db.index.AccountRef,
accounts_db.index.ShardedPubkeyRefMap.hash,
accounts_db.index.ShardedPubkeyRefMap.eql,
).init(std.testing.allocator);
defer map.deinit();

try map.ensureTotalCapacity(100);
Expand All @@ -525,8 +530,8 @@ test "swissmap read/write/delete" {
var map = try SwissMap(
sig.core.Pubkey,
*accounts_db.index.AccountRef,
accounts_db.index.pubkey_hash,
accounts_db.index.pubkey_eql,
accounts_db.index.ShardedPubkeyRefMap.hash,
accounts_db.index.ShardedPubkeyRefMap.eql,
).initCapacity(allocator, n_accounts);
defer map.deinit();

Expand Down Expand Up @@ -576,8 +581,8 @@ test "swissmap read/write" {
var map = try SwissMap(
sig.core.Pubkey,
*accounts_db.index.AccountRef,
accounts_db.index.pubkey_hash,
accounts_db.index.pubkey_eql,
accounts_db.index.ShardedPubkeyRefMap.hash,
accounts_db.index.ShardedPubkeyRefMap.eql,
).initCapacity(allocator, n_accounts);
defer map.deinit();

Expand Down Expand Up @@ -649,8 +654,8 @@ pub const BenchmarkSwissMap = struct {
SwissMap(
sig.core.Pubkey,
*accounts_db.index.AccountRef,
accounts_db.index.pubkey_hash,
accounts_db.index.pubkey_eql,
accounts_db.index.ShardedPubkeyRefMap.hash,
accounts_db.index.ShardedPubkeyRefMap.eql,
),
allocator,
accounts,
Expand All @@ -663,11 +668,11 @@ pub const BenchmarkSwissMap = struct {
const InnerT = std.HashMap(sig.core.Pubkey, *accounts_db.index.AccountRef, struct {
pub fn hash(self: @This(), key: sig.core.Pubkey) u64 {
_ = self;
return accounts_db.index.pubkey_hash(key);
return accounts_db.index.ShardedPubkeyRefMap.hash(key);
}
pub fn eql(self: @This(), key1: sig.core.Pubkey, key2: sig.core.Pubkey) bool {
_ = self;
return accounts_db.index.pubkey_eql(key1, key2);
return accounts_db.index.ShardedPubkeyRefMap.eql(key1, key2);
}
}, std.hash_map.default_max_load_percentage);

Expand Down
2 changes: 1 addition & 1 deletion src/benchmarks.zig
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub fn main() !void {

if (std.mem.startsWith(u8, filter, "swissmap") or run_all_benchmarks) {
try benchmark(
@import("accountsdb/index.zig").BenchmarkSwissMap,
@import("accountsdb/swiss_map.zig").BenchmarkSwissMap,
max_time_per_bench,
.microseconds,
);
Expand Down
24 changes: 12 additions & 12 deletions src/cmd/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,12 @@ pub fn run() !void {
.value_name = "min_snapshot_download_speed_mb",
};

var number_of_index_bins_option = cli.Option{
var number_of_index_shards_option = cli.Option{
.long_name = "number-of-index-bins",
.help = "number of bins to shard the account index across",
.value_ref = cli.mkRef(&config.current.accounts_db.number_of_index_bins),
.help = "number of shards for the account index's pubkey_ref_map",
.value_ref = cli.mkRef(&config.current.accounts_db.number_of_index_shards),
.required = false,
.value_name = "number_of_index_bins",
.value_name = "number_of_index_shards",
};

var accounts_per_file_estimate = cli.Option{
Expand Down Expand Up @@ -375,7 +375,7 @@ pub fn run() !void {
&min_snapshot_download_speed_mb_option,
&force_new_snapshot_download_option,
&trusted_validators_option,
&number_of_index_bins_option,
&number_of_index_shards_option,
&genesis_file_path,
&accounts_per_file_estimate,
// geyser
Expand Down Expand Up @@ -471,7 +471,7 @@ pub fn run() !void {
&n_threads_snapshot_load_option,
&n_threads_snapshot_unpack_option,
&force_unpack_snapshot_option,
&number_of_index_bins_option,
&number_of_index_shards_option,
&genesis_file_path,
&accounts_per_file_estimate,
// geyser
Expand Down Expand Up @@ -549,7 +549,7 @@ pub fn run() !void {
&min_snapshot_download_speed_mb_option,
&force_new_snapshot_download_option,
&trusted_validators_option,
&number_of_index_bins_option,
&number_of_index_shards_option,
&genesis_file_path,
&accounts_per_file_estimate,
// general
Expand Down Expand Up @@ -927,10 +927,10 @@ fn createSnapshot() !void {
const slot = snapshot_result.snapshot_fields.full.bank_fields.slot;

var n_accounts_indexed: u64 = 0;
for (accounts_db.account_index.bins) |*bin_rw| {
const bin, var bin_lg = bin_rw.readWithLock();
defer bin_lg.unlock();
n_accounts_indexed += bin.count();
for (accounts_db.account_index.pubkey_ref_map.shards) |*shard_rw| {
const shard, var lock = shard_rw.readWithLock();
defer lock.unlock();
n_accounts_indexed += shard.count();
}
app_base.logger.info().logf("accountsdb: indexed {d} accounts", .{n_accounts_indexed});

Expand Down Expand Up @@ -1425,7 +1425,7 @@ fn loadSnapshot(
logger,
snapshot_dir,
.{
.number_of_index_bins = config.current.accounts_db.number_of_index_bins,
.number_of_index_shards = config.current.accounts_db.number_of_index_shards,
.use_disk_index = config.current.accounts_db.use_disk_index,
},
geyser_writer,
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/config.zig
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const std = @import("std");
const sig = @import("../sig.zig");

const ACCOUNT_INDEX_BINS = sig.accounts_db.db.ACCOUNT_INDEX_BINS;
const ACCOUNT_INDEX_SHARDS = sig.accounts_db.db.ACCOUNT_INDEX_SHARDS;
const ShredCollectorConfig = sig.shred_collector.ShredCollectorConfig;
const IpAddr = sig.net.IpAddr;
const LogLevel = sig.trace.Level;
Expand Down Expand Up @@ -110,7 +110,7 @@ pub const AccountsDBConfig = struct {
/// number of threads to unpack snapshot from .tar.zstd
num_threads_snapshot_unpack: u16 = 0,
/// number of shards to use across the index
number_of_index_bins: u64 = ACCOUNT_INDEX_BINS,
number_of_index_shards: u64 = ACCOUNT_INDEX_SHARDS,
/// use disk based index for accounts index
use_disk_index: bool = false,
/// force unpacking a fresh snapshot even if an accounts/ dir exists
Expand Down
4 changes: 3 additions & 1 deletion src/core/hash.zig
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ pub const Hash = extern struct {
}

pub fn eql(self: Hash, other: Hash) bool {
return self.order(&other) == .eq;
const xx: @Vector(size, u8) = self.data;
const yy: @Vector(size, u8) = other.data;
return @reduce(.And, xx == yy);
}

pub fn order(a: *const Hash, b: *const Hash) std.math.Order {
Expand Down
5 changes: 4 additions & 1 deletion src/geyser/core.zig
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ pub const GeyserWriter = struct {
const file = try openPipe(pipe_path);
const io_channel = try sig.sync.Channel([]u8).create(allocator);
const io_allocator_state = try allocator.create(RecycleFBA(.{}));
io_allocator_state.* = try RecycleFBA(.{}).init(allocator, io_fba_bytes);
io_allocator_state.* = try RecycleFBA(.{}).init(.{
.records_allocator = allocator,
.bytes_allocator = allocator,
}, io_fba_bytes);
const metrics = try GeyserWriterMetrics.init();

return .{
Expand Down
5 changes: 4 additions & 1 deletion src/geyser/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,10 @@ pub fn csvDump() !void {

// preallocate memory for csv rows
const recycle_fba = try allocator.create(sig.utils.allocators.RecycleFBA(.{ .thread_safe = true }));
recycle_fba.* = try sig.utils.allocators.RecycleFBA(.{ .thread_safe = true }).init(allocator, config.csv_buf_len);
recycle_fba.* = try sig.utils.allocators.RecycleFBA(.{ .thread_safe = true }).init(.{
.records_allocator = allocator,
.bytes_allocator = allocator,
}, config.csv_buf_len);
defer {
recycle_fba.deinit();
allocator.destroy(recycle_fba);
Expand Down
5 changes: 4 additions & 1 deletion src/trace/log.zig
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ pub const ChannelPrintLogger = struct {
pub fn init(config: Config) !*Self {
const max_buffer = config.max_buffer orelse return error.MaxBufferNotSet;
const recycle_fba = try config.allocator.create(RecycleFBA(.{}));
recycle_fba.* = try RecycleFBA(.{}).init(config.allocator, max_buffer);
recycle_fba.* = try RecycleFBA(.{}).init(.{
.records_allocator = config.allocator,
.bytes_allocator = config.allocator,
}, max_buffer);
const self = try config.allocator.create(Self);
self.* = .{
.allocator = config.allocator,
Expand Down
35 changes: 23 additions & 12 deletions src/utils/allocators.zig
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,37 @@ pub fn RecycleFBA(config: struct {
return struct {
// this allocates the underlying memory + dynamic expansions
// (only used on init/deinit + arraylist expansion)
backing_allocator: std.mem.Allocator,
bytes_allocator: std.mem.Allocator,
// this does the data allocations (data is returned from alloc)
fba_allocator: std.heap.FixedBufferAllocator,
// recycling depot
records: std.ArrayList(Record),

// for thread safety
mux: std.Thread.Mutex = .{},

const Record = struct { is_free: bool, buf: [*]u8, len: u64 };
const AllocatorConfig = struct {
// used for the records array
records_allocator: std.mem.Allocator,
// used for the underlying memory for the allocations
bytes_allocator: std.mem.Allocator,
};
const Self = @This();

pub fn init(backing_allocator: std.mem.Allocator, n_bytes: u64) !Self {
const buf = try backing_allocator.alloc(u8, n_bytes);
pub fn init(allocator_config: AllocatorConfig, n_bytes: u64) !Self {
const buf = try allocator_config.bytes_allocator.alloc(u8, n_bytes);
const fba_allocator = std.heap.FixedBufferAllocator.init(buf);
const records = std.ArrayList(Record).init(backing_allocator);
const records = std.ArrayList(Record).init(allocator_config.records_allocator);

return .{
.backing_allocator = backing_allocator,
.bytes_allocator = allocator_config.bytes_allocator,
.fba_allocator = fba_allocator,
.records = records,
};
}

pub fn deinit(self: *Self) void {
self.backing_allocator.free(self.fba_allocator.buffer);
self.bytes_allocator.free(self.fba_allocator.buffer);
self.records.deinit();
}

Expand Down Expand Up @@ -172,7 +177,7 @@ pub fn RecycleFBA(config: struct {

/// collapses adjacent free records into a single record
pub fn tryCollapse(self: *Self) void {
var new_records = std.ArrayList(Record).init(self.backing_allocator);
var new_records = std.ArrayList(Record).init(self.bytes_allocator);
var last_was_free = false;

for (self.records.items) |record| {
Expand Down Expand Up @@ -441,8 +446,11 @@ pub const failing = struct {
};

test "recycle allocator: tryCollapse" {
const backing_allocator = std.testing.allocator;
var allocator = try RecycleFBA(.{}).init(backing_allocator, 200);
const bytes_allocator = std.testing.allocator;
var allocator = try RecycleFBA(.{}).init(.{
.records_allocator = bytes_allocator,
.bytes_allocator = bytes_allocator,
}, 200);
defer allocator.deinit();

// alloc a slice of 100 bytes
Expand All @@ -461,8 +469,11 @@ test "recycle allocator: tryCollapse" {
}

test "recycle allocator" {
const backing_allocator = std.testing.allocator;
var allocator = try RecycleFBA(.{}).init(backing_allocator, 1024);
const bytes_allocator = std.testing.allocator;
var allocator = try RecycleFBA(.{}).init(.{
.records_allocator = bytes_allocator,
.bytes_allocator = bytes_allocator,
}, 1024);
defer allocator.deinit();

// alloc a slice of 100 bytes
Expand Down

0 comments on commit fbe858e

Please sign in to comment.