Skip to content

Commit

Permalink
Fix block zero; load genesis file and use it for getGenesisHash and…
Browse files Browse the repository at this point in the history
… block 0 timestamp. (#69)

* Fix error checking

* Handle block zero for mainnet

* Fix slot 1

* Check sooner if it's block zero

* Include genesis reading/parsing code from https://github.com/firedancer-io/radiance

* Handle genesis file

* Handle getGenesisHash

* Fix prefetching

* Cleanup logs
  • Loading branch information
gagliardetto authored Nov 17, 2023
1 parent f1f3cb7 commit e192640
Show file tree
Hide file tree
Showing 16 changed files with 919 additions and 47 deletions.
6 changes: 6 additions & 0 deletions cmd-rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ func newCmd_rpc() *cli.Command {
EpochSearchConcurrency: epochSearchConcurrency,
})

defer func() {
if err := multi.Close(); err != nil {
klog.Errorf("error closing multi-epoch: %s", err.Error())
}
}()

for _, epoch := range epochs {
if err := multi.AddEpoch(epoch.Epoch(), epoch); err != nil {
return cli.Exit(fmt.Sprintf("failed to add epoch %d: %s", epoch.Epoch(), err.Error()), 1)
Expand Down
18 changes: 18 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ type Config struct {
URI URI `json:"uri" yaml:"uri"`
} `json:"sig_exists" yaml:"sig_exists"`
} `json:"indexes" yaml:"indexes"`
Genesis struct {
URI URI `json:"uri" yaml:"uri"`
} `json:"genesis" yaml:"genesis"`
}

func (c *Config) ConfigFilepath() string {
Expand Down Expand Up @@ -296,5 +299,20 @@ func (c *Config) Validate() error {
}
}
}
{
// if epoch is 0, then the genesis URI must be set:
if *c.Epoch == 0 {
if c.Genesis.URI.IsZero() {
return fmt.Errorf("epoch is 0, but genesis.uri is not set")
}
if !c.Genesis.URI.IsValid() {
return fmt.Errorf("genesis.uri is invalid")
}
// only support local genesis files for now:
if !c.Genesis.URI.IsLocal() {
return fmt.Errorf("genesis.uri must be a local file")
}
}
}
return nil
}
69 changes: 54 additions & 15 deletions epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bufio"
"bytes"
"context"
"crypto/rand"
"encoding/binary"
Expand All @@ -12,6 +13,7 @@ import (

"github.com/gagliardetto/solana-go"
"github.com/ipfs/go-cid"
carv1 "github.com/ipld/go-car"
"github.com/ipld/go-car/util"
carv2 "github.com/ipld/go-car/v2"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -22,6 +24,7 @@ import (
"github.com/rpcpool/yellowstone-faithful/gsfa"
"github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode"
"github.com/rpcpool/yellowstone-faithful/iplddecoders"
"github.com/rpcpool/yellowstone-faithful/radiance/genesis"
"github.com/urfave/cli/v2"
"k8s.io/klog/v2"
)
Expand All @@ -30,20 +33,22 @@ type Epoch struct {
epoch uint64
isFilecoinMode bool // true if the epoch is in Filecoin mode (i.e. Lassie mode)
config *Config
// genesis:
genesis *GenesisContainer
// contains indexes and block data for the epoch
lassieFetcher *lassieWrapper
localCarReader *carv2.Reader
remoteCarReader ReaderAtCloser
remoteCarHeaderSize uint64
cidToOffsetIndex *compactindex.DB
slotToCidIndex *compactindex36.DB
sigToCidIndex *compactindex36.DB
sigExists *bucketteer.Reader
gsfaReader *gsfa.GsfaReader
cidToNodeCache *cache.Cache // TODO: prevent OOM
onClose []func() error
slotToCidCache *cache.Cache
cidToOffsetCache *cache.Cache
lassieFetcher *lassieWrapper
localCarReader *carv2.Reader
remoteCarReader ReaderAtCloser
carHeaderSize uint64
cidToOffsetIndex *compactindex.DB
slotToCidIndex *compactindex36.DB
sigToCidIndex *compactindex36.DB
sigExists *bucketteer.Reader
gsfaReader *gsfa.GsfaReader
cidToNodeCache *cache.Cache // TODO: prevent OOM
onClose []func() error
slotToCidCache *cache.Cache
cidToOffsetCache *cache.Cache
}

func (r *Epoch) getSlotToCidFromCache(slot uint64) (cid.Cid, error, bool) {
Expand Down Expand Up @@ -92,6 +97,10 @@ func (e *Epoch) Close() error {
return errors.Join(multiErr...)
}

func (e *Epoch) GetGenesis() *GenesisContainer {
return e.genesis
}

func NewEpochFromConfig(config *Config, c *cli.Context) (*Epoch, error) {
if config == nil {
return nil, fmt.Errorf("config must not be nil")
Expand All @@ -105,6 +114,19 @@ func NewEpochFromConfig(config *Config, c *cli.Context) (*Epoch, error) {
config: config,
onClose: make([]func() error, 0),
}
{
// if epoch is 0, then try loading the genesis from the config:
if *config.Epoch == 0 {
genesisConfig, ha, err := genesis.ReadGenesisFromFile(string(config.Genesis.URI))
if err != nil {
return nil, fmt.Errorf("failed to read genesis: %w", err)
}
ep.genesis = &GenesisContainer{
Hash: solana.HashFromBytes(ha[:]),
Config: genesisConfig,
}
}
}

if isCarMode {
// The CAR-mode requires a cid-to-offset index.
Expand Down Expand Up @@ -207,7 +229,7 @@ func NewEpochFromConfig(config *Config, c *cli.Context) (*Epoch, error) {
ep.localCarReader = localCarReader
ep.remoteCarReader = remoteCarReader
if remoteCarReader != nil {
// read 10 bytes from the CAR file to get the header size
// determine the header size so that we know where the data starts:
headerSizeBuf, err := readSectionFromReaderAt(remoteCarReader, 0, 10)
if err != nil {
return nil, fmt.Errorf("failed to read CAR header: %w", err)
Expand All @@ -217,7 +239,24 @@ func NewEpochFromConfig(config *Config, c *cli.Context) (*Epoch, error) {
if n <= 0 {
return nil, fmt.Errorf("failed to decode CAR header size")
}
ep.remoteCarHeaderSize = uint64(n) + headerSize
ep.carHeaderSize = uint64(n) + headerSize
}
if localCarReader != nil {
// determine the header size so that we know where the data starts:
dr, err := localCarReader.DataReader()
if err != nil {
return nil, fmt.Errorf("failed to get local CAR data reader: %w", err)
}
header, err := readHeader(dr)
if err != nil {
return nil, fmt.Errorf("failed to read local CAR header: %w", err)
}
var buf bytes.Buffer
if err = carv1.WriteHeader(header, &buf); err != nil {
return nil, fmt.Errorf("failed to encode local CAR header: %w", err)
}
headerSize := uint64(buf.Len())
ep.carHeaderSize = headerSize
}
}
{
Expand Down
12 changes: 12 additions & 0 deletions genesis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package main

import (
"github.com/gagliardetto/solana-go"
"github.com/rpcpool/yellowstone-faithful/radiance/genesis"
)

type GenesisContainer struct {
Hash solana.Hash
// The genesis config.
Config *genesis.Genesis
}
88 changes: 57 additions & 31 deletions multiepoch-getBlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,14 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
{
prefetcherFromCar := func() error {
parentIsInPreviousEpoch := CalcEpochForSlot(uint64(block.Meta.Parent_slot)) != CalcEpochForSlot(slot)
if slot == 0 {
parentIsInPreviousEpoch = true
}
if slot > 1 && block.Meta.Parent_slot == 0 {
parentIsInPreviousEpoch = true
}

var blockCid, parentCid cid.Cid
var blockCid, parentBlockCid cid.Cid
wg := new(errgroup.Group)
wg.Go(func() (err error) {
blockCid, err = epochHandler.FindCidFromSlot(ctx, slot)
Expand All @@ -84,7 +90,7 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
if parentIsInPreviousEpoch {
return nil
}
parentCid, err = epochHandler.FindCidFromSlot(ctx, uint64(block.Meta.Parent_slot))
parentBlockCid, err = epochHandler.FindCidFromSlot(ctx, uint64(block.Meta.Parent_slot))
if err != nil {
return err
}
Expand All @@ -94,7 +100,17 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
if err != nil {
return err
}
klog.Infof("%s -> %s", parentCid, blockCid)
if slot == 0 {
klog.Infof("car start to slot(0)::%s", blockCid)
} else {
klog.Infof(
"slot(%d)::%s to slot(%d)::%s",
uint64(block.Meta.Parent_slot),
parentBlockCid,
slot,
blockCid,
)
}
{
var blockOffset, parentOffset uint64
wg := new(errgroup.Group)
Expand All @@ -108,13 +124,12 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
wg.Go(func() (err error) {
if parentIsInPreviousEpoch {
// get car file header size
parentOffset = epochHandler.remoteCarHeaderSize
parentOffset = epochHandler.carHeaderSize
return nil
}
parentOffset, err = epochHandler.FindOffsetFromCid(ctx, parentCid)
parentOffset, err = epochHandler.FindOffsetFromCid(ctx, parentBlockCid)
if err != nil {
// If the parent is not found, it (probably) means that it's outside of the car file.
parentOffset = epochHandler.remoteCarHeaderSize
return err
}
return nil
})
Expand All @@ -125,41 +140,27 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex

length := blockOffset - parentOffset
MiB := uint64(1024 * 1024)
maxSize := MiB * 100
if length > maxSize {
length = maxSize
maxPrefetchSize := MiB * 10 // let's cap prefetching size
if length > maxPrefetchSize {
length = maxPrefetchSize
}

idealEntrySize := uint64(36190)
var start uint64
if parentIsInPreviousEpoch {
start = parentOffset
} else {
if parentOffset > idealEntrySize {
start = parentOffset - idealEntrySize
} else {
start = parentOffset
}
length += idealEntrySize
}
start := parentOffset

klog.Infof("prefetching CAR: start=%d length=%d (parent_offset=%d)", start, length, parentOffset)
carSection, err := epochHandler.ReadAtFromCar(ctx, start, length)
if err != nil {
return err
}
dr := bytes.NewReader(carSection)
if !parentIsInPreviousEpoch {
dr.Seek(int64(idealEntrySize), io.SeekStart)
}
br := bufio.NewReader(dr)

gotCid, data, err := util.ReadNode(br)
if err != nil {
return fmt.Errorf("failed to read first node: %w", err)
}
if !parentIsInPreviousEpoch && !gotCid.Equals(parentCid) {
return fmt.Errorf("CID mismatch: expected %s, got %s", parentCid, gotCid)
if !parentIsInPreviousEpoch && !gotCid.Equals(parentBlockCid) {
return fmt.Errorf("CID mismatch: expected %s, got %s", parentBlockCid, gotCid)
}
epochHandler.putNodeInCache(gotCid, data)

Expand Down Expand Up @@ -225,8 +226,6 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
klog.Errorf("failed to decode Transaction %s: %v", tcid, err)
return nil
}
// NOTE: this messes up the order of transactions,
// but we sort them later anyway.
mu.Lock()
allTransactionNodes[entryIndex][txI] = txNode
mu.Unlock()
Expand Down Expand Up @@ -388,6 +387,7 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
allTransactions = append(allTransactions, txResp)
}
}

sort.Slice(allTransactions, func(i, j int) bool {
return allTransactions[i].Position < allTransactions[j].Position
})
Expand All @@ -399,6 +399,21 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
blockResp.ParentSlot = uint64(block.Meta.Parent_slot)
blockResp.Rewards = rewards

if slot == 0 {
genesis := epochHandler.GetGenesis()
if genesis != nil {
blockZeroBlocktime := uint64(genesis.Config.CreationTime.Unix())
blockResp.BlockTime = &blockZeroBlocktime
}
blockResp.ParentSlot = uint64(0)

zeroBlockHeight := uint64(0)
blockResp.BlockHeight = &zeroBlockHeight

blockZeroBlockHash := lastEntryHash.String()
blockResp.PreviousBlockhash = &blockZeroBlockHash // NOTE: this is what solana RPC does. Should it be nil instead? Or should it be the genesis hash?
}

{
blockHeight, ok := block.GetBlockHeight()
if ok {
Expand All @@ -408,7 +423,7 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
{
// get parent slot
parentSlot := uint64(block.Meta.Parent_slot)
if parentSlot != 0 && CalcEpochForSlot(parentSlot) == epochNumber {
if (parentSlot != 0 || slot == 1) && CalcEpochForSlot(parentSlot) == epochNumber {
// NOTE: if the parent is in the same epoch, we can get it from the same epoch handler as the block;
// otherwise, we need to get it from the previous epoch (TODO: implement this)
parentBlock, err := epochHandler.GetBlock(WithSubrapghPrefetch(ctx, false), parentSlot)
Expand All @@ -432,11 +447,22 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
blockResp.PreviousBlockhash = &parentEntryHash
}
} else {
klog.Infof("parent slot is in a different epoch, not implemented yet (can't get previousBlockhash)")
if slot != 0 {
klog.Infof("parent slot is in a different epoch, not implemented yet (can't get previousBlockhash)")
}
}
}
tim.time("get parent block")

{
if len(blockResp.Transactions) == 0 {
blockResp.Transactions = make([]GetTransactionResponse, 0)
}
if blockResp.Rewards == nil || len(blockResp.Rewards.([]any)) == 0 {
blockResp.Rewards = make([]any, 0)
}
}

err = conn.Reply(
ctx,
req.ID,
Expand Down
Loading

0 comments on commit e192640

Please sign in to comment.