Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ledger): support LMDB as a blockstore database backend #352

Open
wants to merge 32 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
17070a7
feat(ledger): lmdb wip
dnut Oct 25, 2024
427fce7
feat(ledger): wip lmdb
dnut Oct 30, 2024
f1fd0cf
feat(ledger): write last lmdb methods, but still doesn't compile
dnut Nov 1, 2024
3fc08c7
feat(ledger): lmdb compiles. tests fail
dnut Nov 1, 2024
3f0fe86
feat(ledger): get all the tests working for lmdb and add some more
dnut Nov 1, 2024
4c9a312
fix(ledger): remove unused
dnut Nov 1, 2024
8be72a0
fix(ledger): misc bugs found when testing as blockstore db
dnut Nov 1, 2024
d03475c
feat(ledger): customizable blockstore database backend
dnut Nov 1, 2024
d4682f1
fix(ledger): hashmapdb compile error when used as blockstore database
dnut Nov 1, 2024
e1a261c
test(ledger): migrate new database tests to normal zig tests
dnut Nov 4, 2024
6f3bb7b
refactor(build.zig): use switch instead of ifs for database dependency
dnut Nov 5, 2024
876eb2d
refactor(ledger): extract out import to top
dnut Nov 5, 2024
79427fc
refactor(ledger): remove c pointers from lmdb
dnut Nov 5, 2024
1c22591
refactor(ledger): use flags from c import
dnut Nov 5, 2024
cd49700
fix(ledger): lmdb error conversion needs platform specific logic
dnut Nov 5, 2024
c01b74f
refactor(ledger): remove unnecessary ptrCast from lmdb
dnut Nov 5, 2024
54975b2
refactor(ledger): rename lmdb "result" to "maybeError"
dnut Nov 5, 2024
11cd6cc
fix(ledger): strings should be 0-terminated
dnut Nov 5, 2024
e9d84f9
docs(ledger): document txn-based allocator for lmdb
dnut Nov 5, 2024
2ed42a0
refactor(ledger): rename ret to returnOutput and add docs
dnut Nov 5, 2024
038f9e5
fix(ledger): reusing hashmap write batch after execution is a bug, an…
dnut Nov 5, 2024
1808660
fix(ledger): leak in purgeSlots
dnut Nov 5, 2024
8add233
fix(ledger): serializeAlloc for raw bytes should not use bincode
dnut Nov 5, 2024
dbe786a
fix(ledger): deinit shred bytes in test
dnut Nov 5, 2024
e1d5fdc
fix(ledger): write batch copy/pointer mismanagement
dnut Nov 5, 2024
4710c13
fix(ledger): memory leaks in tests
dnut Nov 5, 2024
25bd099
ci: explicitly test ledger databases in github workflow, and don't re…
dnut Nov 6, 2024
ac6ba87
fix(ledger): memory bugs in the shred inserter
dnut Nov 6, 2024
385aa53
test(ledger): improve deleteRange test
dnut Nov 6, 2024
fd14806
fix(ledger): close lmdb env on deinit, and don't abort write txns
dnut Nov 6, 2024
42e7608
test(ledger): run testDatabase(hashmap) tests no matter what
dnut Nov 6, 2024
53a9ea9
fix(ledger): allocator misuse in BytesRef
dnut Nov 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ jobs:
version: 0.13.0

- name: test
run: zig build test -Denable-tsan=true
run: |
zig build test -Denable-tsan=true
zig build test -Denable-tsan=true -Dblockstore-db=lmdb -Dfilter=ledger
zig build test -Denable-tsan=true -Dblockstore-db=hashmap -Dfilter=ledger

kcov_test:
strategy:
Expand Down
42 changes: 38 additions & 4 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ pub fn build(b: *Build) void {
const filters = b.option([]const []const u8, "filter", "List of filters, used for example to filter unit tests by name"); // specified as a series like `-Dfilter="filter1" -Dfilter="filter2"`
const enable_tsan = b.option(bool, "enable-tsan", "Enable TSan for the test suite");
const no_run = b.option(bool, "no-run", "Do not run the selected step and install it") orelse false;
const blockstore_db = b.option(BlockstoreDB, "blockstore-db", "Blockstore database backend") orelse .rocksdb;

// Build options
const build_options = b.addOptions();
build_options.addOption(BlockstoreDB, "blockstore_db", blockstore_db);

// CLI build steps
const sig_step = b.step("run", "Run the sig executable");
Expand Down Expand Up @@ -42,6 +47,9 @@ pub fn build(b: *Build) void {
const rocksdb_dep = b.dependency("rocksdb", dep_opts);
const rocksdb_mod = rocksdb_dep.module("rocksdb-bindings");

const lmdb_dep = b.dependency("lmdb", dep_opts);
const lmdb_mod = lmdb_dep.module("lmdb");

const pretty_table_dep = b.dependency("prettytable", dep_opts);
const pretty_table_mod = pretty_table_dep.module("prettytable");

Expand All @@ -55,7 +63,12 @@ pub fn build(b: *Build) void {
sig_mod.addImport("httpz", httpz_mod);
sig_mod.addImport("zstd", zstd_mod);
sig_mod.addImport("curl", curl_mod);
sig_mod.addImport("rocksdb", rocksdb_mod);
switch (blockstore_db) {
.rocksdb => sig_mod.addImport("rocksdb", rocksdb_mod),
.lmdb => sig_mod.addImport("lmdb", lmdb_mod),
.hashmap => {},
}
sig_mod.addOptions("build-options", build_options);

// main executable
const sig_exe = b.addExecutable(.{
Expand All @@ -72,7 +85,12 @@ pub fn build(b: *Build) void {
sig_exe.root_module.addImport("zig-cli", zig_cli_module);
sig_exe.root_module.addImport("zig-network", zig_network_module);
sig_exe.root_module.addImport("zstd", zstd_mod);
sig_exe.root_module.addImport("rocksdb", rocksdb_mod);
switch (blockstore_db) {
.rocksdb => sig_exe.root_module.addImport("rocksdb", rocksdb_mod),
.lmdb => sig_exe.root_module.addImport("lmdb", lmdb_mod),
.hashmap => {},
}
sig_exe.root_module.addOptions("build-options", build_options);
sig_exe.linkLibC();

const main_exe_run = b.addRunArtifact(sig_exe);
Expand Down Expand Up @@ -110,7 +128,12 @@ pub fn build(b: *Build) void {
unit_tests_exe.root_module.addImport("httpz", httpz_mod);
unit_tests_exe.root_module.addImport("zig-network", zig_network_module);
unit_tests_exe.root_module.addImport("zstd", zstd_mod);
unit_tests_exe.root_module.addImport("rocksdb", rocksdb_mod);
switch (blockstore_db) {
.rocksdb => unit_tests_exe.root_module.addImport("rocksdb", rocksdb_mod),
.lmdb => unit_tests_exe.root_module.addImport("lmdb", lmdb_mod),
.hashmap => {},
}
unit_tests_exe.root_module.addOptions("build-options", build_options);
unit_tests_exe.linkLibC();

const unit_tests_exe_run = b.addRunArtifact(unit_tests_exe);
Expand Down Expand Up @@ -150,8 +173,13 @@ pub fn build(b: *Build) void {
benchmark_exe.root_module.addImport("zig-network", zig_network_module);
benchmark_exe.root_module.addImport("httpz", httpz_mod);
benchmark_exe.root_module.addImport("zstd", zstd_mod);
benchmark_exe.root_module.addImport("rocksdb", rocksdb_mod);
benchmark_exe.root_module.addImport("prettytable", pretty_table_mod);
switch (blockstore_db) {
.rocksdb => benchmark_exe.root_module.addImport("rocksdb", rocksdb_mod),
.lmdb => benchmark_exe.root_module.addImport("lmdb", lmdb_mod),
.hashmap => {},
}
benchmark_exe.root_module.addOptions("build-options", build_options);
benchmark_exe.linkLibC();

const benchmark_exe_run = b.addRunArtifact(benchmark_exe);
Expand Down Expand Up @@ -195,3 +223,9 @@ fn makeZlsNotInstallAnythingDuringBuildOnSave(b: *Build) void {
artifact.generated_bin = null;
}
}

const BlockstoreDB = enum {
rocksdb,
lmdb,
hashmap,
};
4 changes: 4 additions & 0 deletions build.zig.zon
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
.url = "https://github.com/Syndica/rocksdb-zig/archive/9c09659a5e41f226b6b8f3fa21149247eb26dfae.tar.gz",
.hash = "1220aeb80b2f8bb48c131ef306fe48ddfcb537210c5f77742e921cbf40fc4c19b41e",
},
.lmdb = .{
.url = "https://github.com/Syndica/lmdb-zig/archive/ffa8d332cbe85f05b0e6d20db8764801bc16d1e1.tar.gz",
.hash = "1220e27a4e98d4f93b1f29c7c38985a4818e6cd135525379e23087c20c8de4a3034b",
},
.prettytable = .{
.url = "https://github.com/dying-will-bullet/prettytable-zig/archive/46b6ad9b5970def35fa43c9613cd244f28862fa9.tar.gz",
.hash = "122098d444c9c7112c66481e7655bb5389829c67e04b280a029200545e1971187443",
Expand Down
2 changes: 1 addition & 1 deletion src/ledger/benchmarks.zig
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ pub const BenchmarkLedgerSlow = struct {
// connect the chain
parent_slot = slot;
}
try db.commit(write_batch);
try db.commit(&write_batch);

var timer = try sig.time.Timer.start();
const is_connected = try reader.slotRangeConnected(1, slot_per_epoch);
Expand Down
7 changes: 6 additions & 1 deletion src/ledger/blockstore.zig
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
const build_options = @import("build-options");
const ledger = @import("lib.zig");

pub const BlockstoreDB = ledger.database.RocksDB(&ledger.schema.list);
pub const BlockstoreDB = switch (build_options.blockstore_db) {
.rocksdb => ledger.database.RocksDB(&ledger.schema.list),
.hashmap => ledger.database.SharedHashMapDB(&ledger.schema.list),
.lmdb => ledger.database.LMDB(&ledger.schema.list),
};

test BlockstoreDB {
ledger.database.assertIsDatabase(BlockstoreDB);
Expand Down
8 changes: 5 additions & 3 deletions src/ledger/cleanup_service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ fn findSlotsToClean(
/// analog to [`run_purge_with_stats`](https://github.com/anza-xyz/agave/blob/26692e666454d340a6691e2483194934e6a8ddfc/ledger/src/blockstore/blockstore_purge.rs#L202)
pub fn purgeSlots(db: *BlockstoreDB, from_slot: Slot, to_slot: Slot) !bool {
var write_batch = try db.initWriteBatch();
defer write_batch.deinit();

// the methods used below are exclusive [from_slot, to_slot), so we add 1 to purge inclusive
const purge_to_slot = to_slot + 1;
Expand All @@ -212,7 +213,7 @@ pub fn purgeSlots(db: *BlockstoreDB, from_slot: Slot, to_slot: Slot) !bool {
writePurgeRange(&write_batch, from_slot, purge_to_slot) catch {
did_purge = false;
};
try db.commit(write_batch);
try db.commit(&write_batch);

if (did_purge and from_slot == 0) {
try purgeFilesInRange(db, from_slot, purge_to_slot);
Expand Down Expand Up @@ -372,7 +373,7 @@ test "findSlotsToClean" {
defer write_batch.deinit();
try write_batch.put(ledger.schema.schema.slot_meta, lowest_slot_meta.slot, lowest_slot_meta);
try write_batch.put(ledger.schema.schema.slot_meta, highest_slot_meta.slot, highest_slot_meta);
try db.commit(write_batch);
try db.commit(&write_batch);
}

const r = try findSlotsToClean(&reader, 0, 100);
Expand Down Expand Up @@ -421,6 +422,7 @@ test "purgeSlots" {

// write another type
var write_batch = try db.initWriteBatch();
defer write_batch.deinit();
for (0..roots.len + 1) |i| {
const merkle_root_meta = sig.ledger.shred.ErasureSetId{
.erasure_set_index = i,
Expand All @@ -434,7 +436,7 @@ test "purgeSlots" {

try write_batch.put(schema.merkle_root_meta, merkle_root_meta, merkle_meta);
}
try db.commit(write_batch);
try db.commit(&write_batch);

// purge the range [0, 5]
const did_purge2 = try purgeSlots(&db, 0, 5);
Expand Down
35 changes: 24 additions & 11 deletions src/ledger/database/hashmap.zig
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const std = @import("std");
const sig = @import("../../sig.zig");
const database = @import("lib.zig");
const build_options = @import("build-options");

const Allocator = std.mem.Allocator;
const DefaultRwLock = std.Thread.RwLock.DefaultRwLock;
Expand All @@ -26,9 +27,10 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type {

pub fn open(
allocator: Allocator,
_: Logger,
logger: Logger,
_: []const u8,
) Allocator.Error!Self {
logger.info().log("Initializing SharedHashMapDB");
var maps = try allocator.alloc(SharedHashMap, column_families.len);
errdefer {
for (maps) |*m| m.deinit();
Expand Down Expand Up @@ -111,7 +113,7 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type {
const ret = try self.allocator.alloc(u8, val_bytes.len);
@memcpy(ret, val_bytes);
return .{
.allocator = self.allocator,
.deinitializer = BytesRef.Deinitializer.fromAllocator(self.allocator),
.data = ret,
};
}
Expand Down Expand Up @@ -164,7 +166,7 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type {

/// Atomicity may be violated if there is insufficient
/// memory to complete a PUT.
pub fn commit(self: *Self, batch: WriteBatch) Allocator.Error!void {
pub fn commit(self: *Self, batch: *WriteBatch) Allocator.Error!void {
self.transaction_lock.lock();
defer self.transaction_lock.unlock();

Expand All @@ -178,9 +180,17 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type {
const cf_index, const key = delete_ix;
self.maps[cf_index].delete(self.allocator, key);
},
.delete_range => {
// TODO: also add to database tests
@panic("not implemented");
.delete_range => |delete_range_ix| {
const cf_index, const start, const end = delete_range_ix;
const keys, _ = self.maps[cf_index].map.range(start, end);
const to_delete = try batch.allocator.alloc([]const u8, keys.len);
defer batch.allocator.free(to_delete);
for (keys, 0..) |key, i| {
dadepo marked this conversation as resolved.
Show resolved Hide resolved
to_delete[i] = key;
}
for (to_delete) |delete_key| {
self.maps[cf_index].delete(self.allocator, delete_key);
}
},
}
}
Expand Down Expand Up @@ -232,6 +242,7 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type {
key: cf.Key,
value: cf.Value,
) anyerror!void {
std.debug.assert(!self.executed.*);
const k_bytes = try key_serializer.serializeAlloc(self.allocator, key);
errdefer self.allocator.free(k_bytes);
const v_bytes = try value_serializer.serializeAlloc(self.allocator, value);
Expand All @@ -247,6 +258,7 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type {
comptime cf: ColumnFamily,
key: cf.Key,
) anyerror!void {
std.debug.assert(!self.executed.*);
const k_bytes = try key_serializer.serializeAlloc(self.allocator, key);
errdefer self.allocator.free(k_bytes);
return try self.instructions.append(
Expand All @@ -261,12 +273,13 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type {
start: cf.Key,
end: cf.Key,
) anyerror!void {
std.debug.assert(!self.executed.*);
const start_bytes = try key_serializer.serializeAlloc(self.allocator, start);
errdefer self.allocator.free(start_bytes);
const end_bytes = try key_serializer.serializeAlloc(self.allocator, end);
errdefer self.allocator.free(end_bytes);
const cf_index = cf.find(column_families);
self.instructions.append(
try self.instructions.append(
self.allocator,
.{ .delete_range = .{ cf_index, start_bytes, end_bytes } },
);
Expand Down Expand Up @@ -351,19 +364,19 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type {

pub fn nextKey(self: *@This()) anyerror!?cf.Key {
const index = self.nextIndex() orelse return null;
return key_serializer.deserialize(cf.Key, self.allocator, self.keys[index]);
return try key_serializer.deserialize(cf.Key, self.allocator, self.keys[index]);
dadepo marked this conversation as resolved.
Show resolved Hide resolved
}

pub fn nextValue(self: *@This()) anyerror!?cf.Value {
const index = self.nextIndex() orelse return null;
return value_serializer.deserialize(cf.Value, self.allocator, self.vals[index]);
return try value_serializer.deserialize(cf.Value, self.allocator, self.vals[index]);
}

pub fn nextBytes(self: *@This()) error{}!?[2]BytesRef {
const index = self.nextIndex() orelse return null;
return .{
.{ .allocator = null, .data = self.keys[index] },
.{ .allocator = null, .data = self.vals[index] },
.{ .deinitializer = null, .data = self.keys[index] },
.{ .deinitializer = null, .data = self.vals[index] },
};
}

Expand Down
Loading
Loading