Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for block pruning #116

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 83 additions & 42 deletions chain/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,43 +13,32 @@
)

type supplementedBlock struct {
Block types.Block
Header *types.BlockHeader

Check failure on line 16 in chain/db.go

View workflow job for this annotation

GitHub Actions / test / test (1.23, macos-latest)

undefined: types.BlockHeader
Copy link
Member

@n8maninger n8maninger Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see a way to detect and trigger a resync for changes like this. Currently, we panic and require the user to manually delete the consensus database. That's really bad UX, particularly for users that are doing automatic updates. Simplest solution would probably be to try to decode the supplementedBlock when we open the database. If that fails: log an error, close, erase, and reopen.

Longer term, it would be nice to have an actual migration path like the other databases. However, that may be less important if we don't need to store any of this junk after the v2 require height.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed. In this case, though, compatibility is preserved: we always encode as v3, but we can decode either v2 or v3. That's possible because the Header field is optional and gets filled in lazily as needed.

Block *types.Block
Supplement *consensus.V1BlockSupplement
}

func (sb supplementedBlock) EncodeTo(e *types.Encoder) {
e.WriteUint8(2)
(types.V2Block)(sb.Block).EncodeTo(e)
e.WriteBool(sb.Supplement != nil)
if sb.Supplement != nil {
sb.Supplement.EncodeTo(e)
}
e.WriteUint8(3)
types.EncodePtr(e, sb.Header)
types.EncodePtr(e, (*types.V2Block)(sb.Block))
types.EncodePtr(e, sb.Supplement)
}

func (sb *supplementedBlock) DecodeFrom(d *types.Decoder) {
if v := d.ReadUint8(); v != 2 {
d.SetErr(fmt.Errorf("incompatible version (%d)", v))
}
(*types.V2Block)(&sb.Block).DecodeFrom(d)
if d.ReadBool() {
sb.Supplement = new(consensus.V1BlockSupplement)
sb.Supplement.DecodeFrom(d)
}
}

// helper type for decoding just the header information from a block
type supplementedHeader struct {
ParentID types.BlockID
Timestamp time.Time
}

func (sh *supplementedHeader) DecodeFrom(d *types.Decoder) {
if v := d.ReadUint8(); v != 2 {
switch v := d.ReadUint8(); v {
case 2:
sb.Header = nil
sb.Block = new(types.Block)
(*types.V2Block)(sb.Block).DecodeFrom(d)
types.DecodePtr(d, &sb.Supplement)
case 3:
types.DecodePtr(d, &sb.Header)
types.DecodePtrCast[types.V2Block](d, &sb.Block)

Check failure on line 37 in chain/db.go

View workflow job for this annotation

GitHub Actions / test / test (1.23, macos-latest)

undefined: types.DecodePtrCast
types.DecodePtr(d, &sb.Supplement)
default:
d.SetErr(fmt.Errorf("incompatible version (%d)", v))
}
sh.ParentID.DecodeFrom(d)
_ = d.ReadUint64() // nonce
sh.Timestamp = d.ReadTime()
}

type versionedState struct {
Expand Down Expand Up @@ -304,21 +293,62 @@
db.bucket(bStates).put(cs.Index.ID[:], versionedState{cs})
}

func (db *DBStore) getBlock(id types.BlockID) (b types.Block, bs *consensus.V1BlockSupplement, _ bool) {
func (db *DBStore) getBlock(id types.BlockID) (bh types.BlockHeader, b *types.Block, bs *consensus.V1BlockSupplement, _ bool) {

Check failure on line 296 in chain/db.go

View workflow job for this annotation

GitHub Actions / test / test (1.23, macos-latest)

undefined: types.BlockHeader
var sb supplementedBlock
ok := db.bucket(bBlocks).get(id[:], &sb)
return sb.Block, sb.Supplement, ok
if sb.Header == nil {
sb.Header = new(types.BlockHeader)

Check failure on line 300 in chain/db.go

View workflow job for this annotation

GitHub Actions / test / test (1.23, macos-latest)

undefined: types.BlockHeader
*sb.Header = sb.Block.Header()

Check failure on line 301 in chain/db.go

View workflow job for this annotation

GitHub Actions / test / test (1.23, macos-latest)

sb.Block.Header undefined (type *types.Block has no field or method Header)
}
return *sb.Header, sb.Block, sb.Supplement, ok
}

func (db *DBStore) putBlock(b types.Block, bs *consensus.V1BlockSupplement) {
id := b.ID()
db.bucket(bBlocks).put(id[:], supplementedBlock{b, bs})
func (db *DBStore) putBlock(bh types.BlockHeader, b *types.Block, bs *consensus.V1BlockSupplement) {

Check failure on line 306 in chain/db.go

View workflow job for this annotation

GitHub Actions / test / test (1.23, macos-latest)

undefined: types.BlockHeader
id := bh.ID()
db.bucket(bBlocks).put(id[:], supplementedBlock{&bh, b, bs})
}

func (db *DBStore) getBlockHeader(id types.BlockID) (parentID types.BlockID, timestamp time.Time, _ bool) {
var sh supplementedHeader
ok := db.bucket(bBlocks).get(id[:], &sh)
return sh.ParentID, sh.Timestamp, ok
func (db *DBStore) getAncestorInfo(id types.BlockID) (parentID types.BlockID, timestamp time.Time, ok bool) {
ok = db.bucket(bBlocks).get(id[:], types.DecoderFunc(func(d *types.Decoder) {
v := d.ReadUint8()
if v != 2 && v != 3 {
d.SetErr(fmt.Errorf("incompatible version (%d)", v))
}
// kinda cursed; don't worry about it
if v == 3 {
if !d.ReadBool() {
d.ReadBool()
}
}
parentID.DecodeFrom(d)
_ = d.ReadUint64() // nonce
timestamp = d.ReadTime()
}))
return
}

func (db *DBStore) getBlockHeader(id types.BlockID) (bh types.BlockHeader, ok bool) {

Check failure on line 330 in chain/db.go

View workflow job for this annotation

GitHub Actions / test / test (1.23, macos-latest)

undefined: types.BlockHeader
ok = db.bucket(bBlocks).get(id[:], types.DecoderFunc(func(d *types.Decoder) {
v := d.ReadUint8()
if v != 2 && v != 3 {
d.SetErr(fmt.Errorf("incompatible version (%d)", v))
return
}
if v == 3 {
bhp := &bh
types.DecodePtr(d, &bhp)
if bhp != nil {
return
} else if !d.ReadBool() {
d.SetErr(errors.New("neither header nor block present"))
return
}
}
var b types.Block
(*types.V2Block)(&b).DecodeFrom(d)
bh = b.Header()

Check failure on line 349 in chain/db.go

View workflow job for this annotation

GitHub Actions / test / test (1.23, macos-latest)

b.Header undefined (type types.Block has no field or method Header)
}))
return
}

func (db *DBStore) treeKey(row, col uint64) []byte {
Expand Down Expand Up @@ -628,9 +658,9 @@
}
break
}
ancestorID, _, _ = db.getBlockHeader(ancestorID)
ancestorID, _, _ = db.getAncestorInfo(ancestorID)
}
_, t, ok = db.getBlockHeader(ancestorID)
_, t, ok = db.getAncestorInfo(ancestorID)
return
}

Expand All @@ -646,12 +676,23 @@

// Block implements Store.
func (db *DBStore) Block(id types.BlockID) (types.Block, *consensus.V1BlockSupplement, bool) {
return db.getBlock(id)
_, b, bs, ok := db.getBlock(id)
if !ok || b == nil {
return types.Block{}, nil, false
}
return *b, bs, ok
}

// AddBlock implements Store.
func (db *DBStore) AddBlock(b types.Block, bs *consensus.V1BlockSupplement) {
db.putBlock(b, bs)
db.putBlock(b.Header(), &b, bs)

Check failure on line 688 in chain/db.go

View workflow job for this annotation

GitHub Actions / test / test (1.23, macos-latest)

b.Header undefined (type types.Block has no field or method Header)
}

// PruneBlock implements Store.
func (db *DBStore) PruneBlock(id types.BlockID) {
if bh, _, _, ok := db.getBlock(id); ok {
db.putBlock(bh, nil, nil)
}
}

func (db *DBStore) shouldFlush() bool {
Expand Down Expand Up @@ -743,7 +784,7 @@
dbs.putState(genesisState)
bs := consensus.V1BlockSupplement{Transactions: make([]consensus.V1TransactionSupplement, len(genesisBlock.Transactions))}
cs, cau := consensus.ApplyBlock(genesisState, genesisBlock, bs, time.Time{})
dbs.putBlock(genesisBlock, &bs)
dbs.putBlock(genesisBlock.Header(), &genesisBlock, &bs)
dbs.putState(cs)
dbs.ApplyBlock(cs, cau)
if err := dbs.Flush(); err != nil {
Expand Down
13 changes: 12 additions & 1 deletion chain/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Store interface {

Block(id types.BlockID) (types.Block, *consensus.V1BlockSupplement, bool)
AddBlock(b types.Block, bs *consensus.V1BlockSupplement)
PruneBlock(id types.BlockID)
State(id types.BlockID) (consensus.State, bool)
AddState(cs consensus.State)
AncestorTimestamp(id types.BlockID) (time.Time, bool)
Expand Down Expand Up @@ -74,12 +75,15 @@ func blockAndChild(s Store, id types.BlockID) (types.Block, *consensus.V1BlockSu
// A Manager tracks multiple blockchains and identifies the best valid
// chain.
type Manager struct {
log *zap.Logger
store Store
tipState consensus.State
onReorg map[[16]byte]func(types.ChainIndex)
invalidBlocks map[types.BlockID]error

// configuration options
log *zap.Logger
pruneTarget uint64

txpool struct {
txns []types.Transaction
v2txns []types.V2Transaction
Expand Down Expand Up @@ -314,6 +318,13 @@ func (m *Manager) applyTip(index types.ChainIndex) error {
m.store.ApplyBlock(cs, cau)
m.applyPoolUpdate(cau, cs)
m.tipState = cs

if m.pruneTarget != 0 && cs.Index.Height > m.pruneTarget {
//m.log.Info("pruning block", zap.Uint64("height", cs.Index.Height-m.pruneTarget))
if index, ok := m.store.BestIndex(cs.Index.Height - m.pruneTarget); ok {
m.store.PruneBlock(index.ID)
}
}
return nil
}

Expand Down
7 changes: 7 additions & 0 deletions chain/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,10 @@ func WithLog(l *zap.Logger) ManagerOption {
m.log = l
}
}

// WithPruneTarget sets the target number of blocks to store.
func WithPruneTarget(n uint64) ManagerOption {
return func(m *Manager) {
m.pruneTarget = n
}
}
21 changes: 5 additions & 16 deletions miner.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package coreutils

import (
"encoding/binary"
"time"

"go.sia.tech/core/consensus"
Expand All @@ -11,27 +10,17 @@ import (

// FindBlockNonce attempts to find a nonce for b that meets the PoW target.
func FindBlockNonce(cs consensus.State, b *types.Block, timeout time.Duration) bool {
b.Nonce = 0
buf := make([]byte, 32+8+8+32)
binary.LittleEndian.PutUint64(buf[32:], b.Nonce)
binary.LittleEndian.PutUint64(buf[40:], uint64(b.Timestamp.Unix()))
if b.V2 != nil {
copy(buf[:32], "sia/id/block|")
copy(buf[48:], b.V2.Commitment[:])
} else {
root := b.MerkleRoot()
copy(buf[:32], b.ParentID[:])
copy(buf[48:], root[:])
}
bh := b.Header()
bh.Nonce = 0
factor := cs.NonceFactor()
startBlock := time.Now()
for types.BlockID(types.HashBytes(buf)).CmpWork(cs.ChildTarget) < 0 {
b.Nonce += factor
binary.LittleEndian.PutUint64(buf[32:], b.Nonce)
for bh.ID().CmpWork(cs.ChildTarget) < 0 {
bh.Nonce += factor
if time.Since(startBlock) > timeout {
return false
}
}
b.Nonce = bh.Nonce
return true
}

Expand Down
2 changes: 1 addition & 1 deletion rhp/v4/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func startTestNode(tb testing.TB, n *consensus.Network, genesis types.Block) (*c
}
tb.Cleanup(func() { syncerListener.Close() })

s := syncer.New(syncerListener, cm, testutil.NewMemPeerStore(), gateway.Header{
s := syncer.New(syncerListener, cm, testutil.NewEphemeralPeerStore(), gateway.Header{
GenesisID: genesis.ID(),
UniqueID: gateway.GenerateUniqueID(),
NetAddress: "localhost:1234",
Expand Down
14 changes: 7 additions & 7 deletions syncer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (p *Peer) SendBlock(id types.BlockID, timeout time.Duration) (types.Block,
}

// RelayHeader relays a header to the peer.
func (p *Peer) RelayHeader(h gateway.BlockHeader, timeout time.Duration) error {
func (p *Peer) RelayHeader(h types.BlockHeader, timeout time.Duration) error {
return p.callRPC(&gateway.RPCRelayHeader{Header: h}, timeout)
}

Expand Down Expand Up @@ -182,8 +182,8 @@ func (p *Peer) SendCheckpoint(index types.ChainIndex, timeout time.Duration) (ty
}

// RelayV2Header relays a v2 block header to the peer.
func (p *Peer) RelayV2Header(h gateway.V2BlockHeader, timeout time.Duration) error {
return p.callRPC(&gateway.RPCRelayV2Header{Header: h}, timeout)
func (p *Peer) RelayV2Header(bh types.BlockHeader, timeout time.Duration) error {
return p.callRPC(&gateway.RPCRelayV2Header{Header: bh}, timeout)
}

// RelayV2BlockOutline relays a v2 block outline to the peer.
Expand Down Expand Up @@ -381,17 +381,17 @@ func (s *Syncer) handleRPC(id types.Specifier, stream *gateway.Stream, origin *P
if err := stream.ReadRequest(r); err != nil {
return err
}
cs, ok := s.cm.State(r.Header.Parent.ID)
cs, ok := s.cm.State(r.Header.ParentID)
if !ok {
s.resync(origin, fmt.Sprintf("peer relayed a v2 header with unknown parent (%v)", r.Header.Parent.ID))
s.resync(origin, fmt.Sprintf("peer relayed a v2 header with unknown parent (%v)", r.Header.ParentID))
return nil
}
bid := r.Header.ID(cs)
bid := r.Header.ID()
if _, ok := s.cm.State(bid); ok {
return nil // already seen
} else if bid.CmpWork(cs.ChildTarget) < 0 {
return s.ban(origin, errors.New("peer sent v2 header with insufficient work"))
} else if r.Header.Parent != s.cm.Tip() {
} else if r.Header.ParentID != s.cm.Tip().ID {
// block extends a sidechain, which peer (if honest) believes to be the
// heaviest chain
s.resync(origin, "peer relayed a v2 header that does not attach to our tip")
Expand Down
14 changes: 8 additions & 6 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (s *Syncer) runPeer(p *Peer) error {
}
}

func (s *Syncer) relayHeader(h gateway.BlockHeader, origin *Peer) {
func (s *Syncer) relayHeader(h types.BlockHeader, origin *Peer) {
s.mu.Lock()
defer s.mu.Unlock()
for _, p := range s.peers {
Expand All @@ -323,7 +323,7 @@ func (s *Syncer) relayTransactionSet(txns []types.Transaction, origin *Peer) {
}
}

func (s *Syncer) relayV2Header(bh gateway.V2BlockHeader, origin *Peer) {
func (s *Syncer) relayV2Header(bh types.BlockHeader, origin *Peer) {
s.mu.Lock()
defer s.mu.Unlock()
for _, p := range s.peers {
Expand Down Expand Up @@ -533,9 +533,9 @@ func (s *Syncer) peerLoop(ctx context.Context) error {

ctx, cancel := context.WithTimeout(ctx, s.config.ConnectTimeout)
if _, err := s.Connect(ctx, p); err != nil {
log.Debug("connected to peer", zap.String("peer", p))
} else {
log.Debug("failed to connect to peer", zap.String("peer", p), zap.Error(err))
} else {
log.Debug("connected to peer", zap.String("peer", p))
}
cancel()
lastTried[p] = time.Now()
Expand Down Expand Up @@ -723,10 +723,12 @@ func (s *Syncer) Connect(ctx context.Context, addr string) (*Peer, error) {
}

// BroadcastHeader broadcasts a header to all peers.
func (s *Syncer) BroadcastHeader(h gateway.BlockHeader) { s.relayHeader(h, nil) }
func (s *Syncer) BroadcastHeader(bh types.BlockHeader) { s.relayHeader(bh, nil) }

// BroadcastV2Header broadcasts a v2 header to all peers.
func (s *Syncer) BroadcastV2Header(h gateway.V2BlockHeader) { s.relayV2Header(h, nil) }
func (s *Syncer) BroadcastV2Header(bh types.BlockHeader) {
s.relayV2Header(bh, nil)
}

// BroadcastV2BlockOutline broadcasts a v2 block outline to all peers.
func (s *Syncer) BroadcastV2BlockOutline(b gateway.V2BlockOutline) { s.relayV2BlockOutline(b, nil) }
Expand Down
11 changes: 3 additions & 8 deletions syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ func TestSyncer(t *testing.T) {
}
defer l2.Close()

s1 := syncer.New(l1, cm1, testutil.NewMemPeerStore(), gateway.Header{
s1 := syncer.New(l1, cm1, testutil.NewEphemeralPeerStore(), gateway.Header{
GenesisID: genesis.ID(),
UniqueID: gateway.GenerateUniqueID(),
NetAddress: l1.Addr().String(),
}, syncer.WithLogger(log.Named("syncer1")))
defer s1.Close()
go s1.Run(context.Background())

s2 := syncer.New(l2, cm2, testutil.NewMemPeerStore(), gateway.Header{
s2 := syncer.New(l2, cm2, testutil.NewEphemeralPeerStore(), gateway.Header{
GenesisID: genesis.ID(),
UniqueID: gateway.GenerateUniqueID(),
NetAddress: l2.Addr().String(),
Expand Down Expand Up @@ -80,12 +80,7 @@ func TestSyncer(t *testing.T) {
}

// broadcast the tip from s1 to s2
s1.BroadcastHeader(gateway.BlockHeader{
ParentID: b.ParentID,
Nonce: b.Nonce,
Timestamp: b.Timestamp,
MerkleRoot: b.MerkleRoot(),
})
s1.BroadcastHeader(b.Header())

for i := 0; i < 100; i++ {
if cm1.Tip() == cm2.Tip() {
Expand Down
Loading
Loading