Skip to content

Commit

Permalink
Merge remote-tracking branch 'gagliardetto/multiepoch' into multiepoch
Browse files Browse the repository at this point in the history
  • Loading branch information
linuskendall committed Sep 6, 2023
2 parents c2354c3 + c2bf5a2 commit 348ee2c
Show file tree
Hide file tree
Showing 32 changed files with 1,397 additions and 963 deletions.
38 changes: 38 additions & 0 deletions bucketteer/bucketteer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package bucketteer

import (
"sort"

"github.com/cespare/xxhash/v2"
)

var _Magic = [8]byte{'b', 'u', 'c', 'k', 'e', 't', 't', 'e'}

func Magic() [8]byte {
return _Magic
}

const Version = uint64(1)

func sortWithCompare[T any](a []T, compare func(i, j int) int) {
sort.Slice(a, func(i, j int) bool {
return compare(i, j) < 0
})
sorted := make([]T, len(a))
eytzinger(a, sorted, 0, 1)
copy(a, sorted)
}

func eytzinger[T any](in, out []T, i, k int) int {
if k <= len(in) {
i = eytzinger(in, out, i, 2*k)
out[k-1] = in[i]
i++
i = eytzinger(in, out, i, 2*k+1)
}
return i
}

func Hash(sig [64]byte) uint64 {
return xxhash.Sum64(sig[:])
}
188 changes: 188 additions & 0 deletions bucketteer/bucketteer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package bucketteer

import (
"os"
"path/filepath"
"testing"

bin "github.com/gagliardetto/binary"
"github.com/stretchr/testify/require"
"golang.org/x/exp/mmap"
)

func TestBucketteer(t *testing.T) {
path := filepath.Join(t.TempDir(), "test-bucketteer")
wr, err := NewWriter(path)
require.NoError(t, err)
firstSig := [64]byte{1, 2, 3, 4}
wr.Put(firstSig)

if !wr.Has(firstSig) {
t.Fatal("expected to have firstSig")
}
{
sig := [64]byte{1, 2, 3, 5}
require.False(t, wr.Has(sig))
wr.Put(sig)
require.True(t, wr.Has(sig))
}
{
sig := [64]byte{1, 2, 3, 6}
require.False(t, wr.Has(sig))
wr.Put(sig)
require.True(t, wr.Has(sig))
}
{
sig := [64]byte{22, 2, 3, 6}
require.False(t, wr.Has(sig))
wr.Put(sig)
require.True(t, wr.Has(sig))
}
{
sig := [64]byte{99, 2, 3, 6}
require.False(t, wr.Has(sig))
wr.Put(sig)
require.True(t, wr.Has(sig))
}
require.Equal(t, 3, len(wr.prefixToHashes))
{
gotSize, err := wr.Seal(map[string]string{
"epoch": "test",
})
require.NoError(t, err)
require.NoError(t, wr.Close())
realSize, err := getFizeSize(path)
require.NoError(t, err)
require.Equal(t, realSize, gotSize)

fileContent, err := os.ReadFile(path)
require.NoError(t, err)

reader := bin.NewBorshDecoder(fileContent)

// read header size:
headerSize, err := reader.ReadUint32(bin.LE)
require.NoError(t, err)
require.Equal(t, uint32(8+8+8+(8+(4+5)+(4+4))+(3*(2+8))), headerSize)

// magic:
{
magicBuf := [8]byte{}
_, err := reader.Read(magicBuf[:])
require.NoError(t, err)
require.Equal(t, _Magic, magicBuf)
}
// version:
{
got, err := reader.ReadUint64(bin.LE)
require.NoError(t, err)
require.Equal(t, Version, got)
}
{
// read meta:
numMeta, err := reader.ReadUint64(bin.LE)
require.NoError(t, err)
require.Equal(t, uint64(1), numMeta)

key, err := reader.ReadString()
require.NoError(t, err)
require.Equal(t, "epoch", key)

value, err := reader.ReadString()
require.NoError(t, err)
require.Equal(t, "test", value)
}
// numPrefixes:
numPrefixes, err := reader.ReadUint64(bin.LE)
require.NoError(t, err)
require.Equal(t, uint64(3), numPrefixes)
// prefix -> offset:
prefixToOffset := make(map[[2]byte]uint64)
{
for i := 0; i < int(numPrefixes); i++ {
var prefix [2]byte
_, err := reader.Read(prefix[:])
require.NoError(t, err)
offset, err := reader.ReadUint64(bin.LE)
require.NoError(t, err)
prefixToOffset[prefix] = offset
}
}
{
require.Equal(t,
map[[2]uint8]uint64{
{0x1, 0x2}: 0x0,
{0x16, 0x2}: 0x1c,
{0x63, 0x2}: 0x28,
}, prefixToOffset)
}
contentBuf, err := reader.ReadNBytes(reader.Remaining())
require.NoError(t, err)
require.Equal(t,
[]byte{
0x3, 0x0, 0x0, 0x0, // num entries
0x49, 0xd7, 0xaf, 0x9e, 0x94, 0x4d, 0x9a, 0x6f,
0x2f, 0x12, 0xdb, 0x5b, 0x1, 0x62, 0xae, 0x1a,
0x3b, 0xb6, 0x71, 0x5f, 0x4, 0x4f, 0x36, 0xf2,
0x1, 0x0, 0x0, 0x0, // num entries
0x58, 0xe1, 0x9d, 0xde, 0x7c, 0xfb, 0xeb, 0x5a,
0x1, 0x0, 0x0, 0x0, // num entries
0x4c, 0xbd, 0xa3, 0xed, 0xd3, 0x8b, 0xa8, 0x44,
},
contentBuf,
)
contentReader := bin.NewBorshDecoder(contentBuf)
{
for prefix, offset := range prefixToOffset {
// Now read the bucket:
{
err := contentReader.SetPosition(uint(offset))
require.NoError(t, err)
numHashes, err := contentReader.ReadUint32(bin.LE)
require.NoError(t, err)
switch prefix {
case [2]byte{1, 2}:
require.Equal(t, uint32(3), numHashes)
case [2]byte{22, 2}:
require.Equal(t, uint32(1), numHashes)
case [2]byte{99, 2}:
require.Equal(t, uint32(1), numHashes)
}

for i := 0; i < int(numHashes); i++ {
hash, err := contentReader.ReadUint64(bin.LE)
require.NoError(t, err)
found := false
for _, h := range wr.prefixToHashes[prefix] {
if h == hash {
found = true
break
}
}
require.True(t, found)
}
}
}
}
{
// read temp file:
require.NoError(t, err)
mmr, err := mmap.Open(path)
require.NoError(t, err)
defer mmr.Close()
reader, err := NewReader(mmr)
require.NoError(t, err)
ok, err := reader.Has(firstSig)
require.NoError(t, err)
require.True(t, ok)
}
}
}

func getFizeSize(path string) (int64, error) {
info, err := os.Stat(path)
if err != nil {
return 0, err
}
return info.Size(), nil
}
146 changes: 146 additions & 0 deletions bucketteer/example/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package main

import (
"crypto/rand"
"flag"
"fmt"
"os"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/dustin/go-humanize"
"github.com/rpcpool/yellowstone-faithful/bucketteer"
"golang.org/x/exp/mmap"
)

func main() {
startedAt := time.Now()
defer func() {
fmt.Printf("took: %v\n", time.Since(startedAt))
}()
var numItemsToInsert int
flag.IntVar(&numItemsToInsert, "num", 1_000_000, "num")
flag.Parse()

file := flag.Arg(0) // "bucketteer.bin"
if file == "" {
panic("no file specified")
}

samples := make([][64]byte, 0)
if !fileExistsAndIsNotEmpty(file) {
fmt.Println("File does not exist or is empty, creating it...")
fmt.Println("Items to insert:", humanize.Comma(int64(numItemsToInsert)))
totalWriteStartedAt := time.Now()
buWr, err := bucketteer.NewWriter(file)
if err != nil {
panic(err)
}
defer buWr.Close()
tookBatch := time.Duration(0)
for i := 1; i <= numItemsToInsert; i++ {
sig := newRandomSignature()
startedSet := time.Now()
buWr.Put(sig)
tookBatch += time.Since(startedSet)
if i%100_000 == 0 {
fmt.Print(".")
samples = append(samples, sig)
}
if i%1_000_000 == 0 {
fmt.Print(humanize.Comma(int64(i)))
fmt.Printf(
" · took: %v (%s per item)\n",
tookBatch,
tookBatch/time.Duration(1_000_000),
)
tookBatch = 0
}
}

fmt.Println("writing to file...")
writeStartedAt := time.Now()
_, err = buWr.Seal(nil)
if err != nil {
panic(err)
}
fmt.Println("writing to file took:", time.Since(writeStartedAt))
fmt.Println("total write took:", time.Since(totalWriteStartedAt))
}
mmr, err := mmap.Open(file)
if err != nil {
panic(err)
}
defer mmr.Close()
buRd, err := bucketteer.NewReader(mmr)
if err != nil {
panic(err)
}
spew.Dump(buRd.Meta())
if len(samples) > 0 {
fmt.Println("testing search with samples from the inserted signatures...")
tookBatch := time.Duration(0)
for _, sig := range samples {
startedSearch := time.Now()
found, err := buRd.Has(sig)
if err != nil {
panic(err)
}
if !found {
panic("not found")
}
tookBatch += time.Since(startedSearch)
}
fmt.Println("\n"+" num samples:", len(samples))
fmt.Println(" search took:", tookBatch)
fmt.Println("avg search took:", tookBatch/time.Duration(len(samples)))
}
if true {
// now search for random signatures that are not in the Bucketteer:
numSearches := 100_000_000
fmt.Println(
"testing search for random signatures that are not in the Bucketteer (numSearches:",
humanize.Comma(int64(numSearches)),
")...",
)
tookBatch := time.Duration(0)
for i := 1; i <= numSearches; i++ {
sig := newRandomSignature()
startedSearch := time.Now()
found, err := buRd.Has(sig)
if err != nil {
panic(err)
}
if found {
panic("found")
}
tookBatch += time.Since(startedSearch)
if i%100_000 == 0 {
fmt.Print(".")
}
}
fmt.Println("\n"+" num candidates:", humanize.Comma(int64(numSearches)))
fmt.Println(" search took:", tookBatch)
fmt.Println("avg search took:", tookBatch/time.Duration(numSearches))
}
}

func newRandomSignature() [64]byte {
var sig [64]byte
rand.Read(sig[:])
return sig
}

func fileExistsAndIsNotEmpty(path string) bool {
info, err := os.Stat(path)
if os.IsNotExist(err) {
return false
}
if err != nil {
panic(err)
}
if info.Size() == 0 {
return false
}
return true
}
Loading

0 comments on commit 348ee2c

Please sign in to comment.