Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RemoveUntraceableHeaders #3750

Merged
merged 5 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/node-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ node-related settings described in the table below.
| Relay | `bool` | `true` | Determines whether the server is forwarding its inventory. |
| Consensus | [Consensus Configuration](#Consensus-Configuration) | | Describes consensus (dBFT) configuration. See the [Consensus Configuration](#Consensus-Configuration) for details. |
| RemoveUntraceableBlocks | `bool`| `false` | Denotes whether old blocks should be removed from cache and database. If enabled, then only the last `MaxTraceableBlocks` are stored and accessible to smart contracts. Old MPT data is also deleted in accordance with `GarbageCollectionPeriod` setting. If enabled along with `P2PStateExchangeExtensions` protocol extension, then old blocks and MPT states will be removed up to the second latest state synchronisation point (see `StateSyncInterval`). |
| RemoveUntraceableHeaders | `bool`| `false` | Used only with RemoveUntraceableBlocks and makes node delete untraceable block headers as well. Notice that this is an experimental option, not recommended for production use. |
| RPC | [RPC Configuration](#RPC-Configuration) | | Describes [RPC subsystem](rpc.md) configuration. See the [RPC Configuration](#RPC-Configuration) for details. |
| SaveStorageBatch | `bool` | `false` | Enables storage batch saving before every persist. It is similar to StorageDump plugin for C# node. |
| SkipBlockVerification | `bool` | `false` | Allows to disable verification of received/processed blocks (including cryptographic checks). |
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/ledger_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ type Ledger struct {
KeepOnlyLatestState bool `yaml:"KeepOnlyLatestState"`
// RemoveUntraceableBlocks specifies if old data should be removed.
RemoveUntraceableBlocks bool `yaml:"RemoveUntraceableBlocks"`
// RemoveUntraceableHeaders is used in addition to RemoveUntraceableBlocks
// when headers need to be removed as well.
RemoveUntraceableHeaders bool `yaml:"RemoveUntraceableHeaders"`
// SaveStorageBatch enables storage batch saving before every persist.
SaveStorageBatch bool `yaml:"SaveStorageBatch"`
// SkipBlockVerification allows to disable verification of received
Expand Down
46 changes: 35 additions & 11 deletions pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"sync/atomic"
"time"

lru "github.com/hashicorp/golang-lru/v2"
json "github.com/nspcc-dev/go-ordered-json"
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/config/limits"
Expand Down Expand Up @@ -61,6 +62,14 @@
// HeaderVerificationGasLimit is the maximum amount of GAS for block header verification.
HeaderVerificationGasLimit = 3_00000000 // 3 GAS
defaultStateSyncInterval = 40000

// defaultBlockTimesCache should be sufficient for tryRunGC() to get in
// sync with storeBlock(). Most of the time they differ by some thousands of
// blocks and GC interval is more like 10K, so this is sufficient for 80K
// deviation and should be sufficient. If it's not, it's not a big issue
// either, the next cycle will still do the job (only transfers need this,
// MPT won't notice at all).
defaultBlockTimesCache = 8
)

// stateChangeStage denotes the stage of state modification process.
Expand Down Expand Up @@ -156,6 +165,11 @@
// Current persisted block count.
persistedHeight uint32

// Index->Timestamp cache for garbage collector. Headers can be gone
// by the time it runs, so we use a tiny little cache to sync block
// removal (performed in storeBlock()) with transfer/MPT GC (tryRunGC())
gcBlockTimes *lru.Cache[uint32, uint64]

// Stop synchronization mechanisms.
stopCh chan struct{}
runToExitCh chan struct{}
Expand Down Expand Up @@ -283,6 +297,9 @@
zap.Int("StateSyncInterval", cfg.StateSyncInterval))
}
}
if cfg.RemoveUntraceableHeaders && !cfg.RemoveUntraceableBlocks {
return nil, errors.New("RemoveUntraceableHeaders is enabled, but RemoveUntraceableBlocks is not")
}
if cfg.Hardforks == nil {
cfg.Hardforks = map[string]uint32{}
for _, hf := range config.StableHardforks {
Expand Down Expand Up @@ -321,6 +338,7 @@
contracts: *native.NewContracts(cfg.ProtocolConfiguration),
}

bc.gcBlockTimes, _ = lru.New[uint32, uint64](defaultBlockTimesCache) // Never errors for positive size
bc.stateRoot = stateroot.NewModule(cfg, bc.VerifyWitness, bc.log, bc.dao.Store)
bc.contracts.Designate.StateRootService = bc.stateRoot

Expand Down Expand Up @@ -603,7 +621,7 @@
// After current state is updated, we need to remove outdated state-related data if so.
// The only outdated data we might have is genesis-related data, so check it.
if p-bc.config.MaxTraceableBlocks > 0 {
err := cache.DeleteBlock(bc.GetHeaderHash(0))
_, err := cache.DeleteBlock(bc.GetHeaderHash(0), false)
if err != nil {
return fmt.Errorf("failed to remove outdated state data for the genesis block: %w", err)
}
Expand Down Expand Up @@ -797,7 +815,7 @@
keysCnt = new(int)
)
for i := height + 1; i <= currHeight; i++ {
err := upperCache.DeleteBlock(bc.GetHeaderHash(i))
_, err := upperCache.DeleteBlock(bc.GetHeaderHash(i), false)
if err != nil {
return fmt.Errorf("error while removing block %d: %w", i, err)
}
Expand Down Expand Up @@ -1141,8 +1159,6 @@
oldHeight /= bc.config.Ledger.GarbageCollectionPeriod
newHeight /= bc.config.Ledger.GarbageCollectionPeriod
if tgtBlock > int64(bc.config.Ledger.GarbageCollectionPeriod) && newHeight != oldHeight {
tgtBlock /= int64(bc.config.Ledger.GarbageCollectionPeriod)
tgtBlock *= int64(bc.config.Ledger.GarbageCollectionPeriod)
dur = bc.stateRoot.GC(uint32(tgtBlock), bc.store)
dur += bc.removeOldTransfers(uint32(tgtBlock))
}
Expand Down Expand Up @@ -1288,15 +1304,20 @@

func (bc *Blockchain) removeOldTransfers(index uint32) time.Duration {
bc.log.Info("starting transfer data garbage collection", zap.Uint32("index", index))
start := time.Now()
h, err := bc.GetHeader(bc.GetHeaderHash(index))
if err != nil {
var (
err error
kept int64
removed int64
start = time.Now()
ts, ok = bc.gcBlockTimes.Get(index)
)

if !ok {
dur := time.Since(start)
bc.log.Error("failed to find block header for transfer GC", zap.Duration("time", dur), zap.Error(err))
bc.log.Error("failed to get block timestamp transfer GC", zap.Duration("time", dur), zap.Uint32("index", index))
return dur
}
var removed, kept int64
var ts = h.Timestamp

prefixes := []byte{byte(storage.STNEP11Transfers), byte(storage.STNEP17Transfers)}

for i := range prefixes {
Expand Down Expand Up @@ -1622,7 +1643,10 @@
stop = start + 1
}
for index := start; index < stop; index++ {
err := kvcache.DeleteBlock(bc.GetHeaderHash(index))
ts, err := kvcache.DeleteBlock(bc.GetHeaderHash(index), bc.config.Ledger.RemoveUntraceableHeaders)
if bc.config.Ledger.RemoveUntraceableHeaders && index%bc.config.Ledger.GarbageCollectionPeriod == 0 {
_ = bc.gcBlockTimes.Add(index, ts)
}

Check warning on line 1649 in pkg/core/blockchain.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/blockchain.go#L1648-L1649

Added lines #L1648 - L1649 were not covered by tests
if err != nil {
bc.log.Warn("error while removing old block",
zap.Uint32("index", index),
Expand Down
41 changes: 28 additions & 13 deletions pkg/core/blockchain_core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func TestRemoveOldTransfers(t *testing.T) {

_, err = bc.dao.Persist()
require.NoError(t, err)
_ = bc.gcBlockTimes.Add(0, h.Timestamp)
_ = bc.removeOldTransfers(0)

for i := range uint32(2) {
Expand Down Expand Up @@ -139,6 +140,33 @@ func TestRemoveOldTransfers(t *testing.T) {
}
}

func checkNewBlockchainErr(t *testing.T, cfg func(c *config.Config), store storage.Store, errText string) {
unitTestNetCfg, err := config.Load("../../config", testchain.Network())
require.NoError(t, err)
cfg(&unitTestNetCfg)
log := zaptest.NewLogger(t)
_, err = NewBlockchain(store, unitTestNetCfg.Blockchain(), log)
if len(errText) != 0 {
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), errText))
} else {
require.NoError(t, err)
}
}

func TestNewBlockchainIncosistencies(t *testing.T) {
t.Run("untraceable blocks/headers", func(t *testing.T) {
checkNewBlockchainErr(t, func(c *config.Config) {
c.ApplicationConfiguration.RemoveUntraceableHeaders = true
}, storage.NewMemoryStore(), "RemoveUntraceableHeaders is enabled, but RemoveUntraceableBlocks is not")
})
t.Run("state exchange without state root", func(t *testing.T) {
checkNewBlockchainErr(t, func(c *config.Config) {
c.ProtocolConfiguration.P2PStateExchangeExtensions = true
}, storage.NewMemoryStore(), "P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off")
})
}

func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) {
var (
stateSyncInterval = 4
Expand Down Expand Up @@ -186,19 +214,6 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) {
_, err := batch.Persist()
require.NoError(t, err)

checkNewBlockchainErr := func(t *testing.T, cfg func(c *config.Config), store storage.Store, errText string) {
unitTestNetCfg, err := config.Load("../../config", testchain.Network())
require.NoError(t, err)
cfg(&unitTestNetCfg)
log := zaptest.NewLogger(t)
_, err = NewBlockchain(store, unitTestNetCfg.Blockchain(), log)
if len(errText) != 0 {
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), errText))
} else {
require.NoError(t, err)
}
}
boltCfg := func(c *config.Config) {
spountCfg(c)
c.ApplicationConfiguration.KeepOnlyLatestState = true
Expand Down
23 changes: 14 additions & 9 deletions pkg/core/dao/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,17 +765,22 @@
}

// DeleteBlock removes the block from dao. It's not atomic, so make sure you're
// using private MemCached instance here.
func (dao *Simple) DeleteBlock(h util.Uint256) error {
// using private MemCached instance here. It returns block timestamp for GC
// convenience.
func (dao *Simple) DeleteBlock(h util.Uint256, dropHeader bool) (uint64, error) {
key := dao.makeExecutableKey(h)

b, err := dao.getBlock(key)
if err != nil {
return err
return 0, err

Check warning on line 775 in pkg/core/dao/dao.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/dao/dao.go#L775

Added line #L775 was not covered by tests
}
err = dao.storeHeader(key, &b.Header)
if err != nil {
return err
if !dropHeader {
err = dao.storeHeader(key, &b.Header)
if err != nil {
return 0, err
}

Check warning on line 781 in pkg/core/dao/dao.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/dao/dao.go#L780-L781

Added lines #L780 - L781 were not covered by tests
} else {
dao.Store.Delete(key)
}

for _, tx := range b.Transactions {
Expand All @@ -787,7 +792,7 @@

v, err := dao.Store.Get(key)
if err != nil {
return fmt.Errorf("failed to retrieve conflict record stub for %s (height %d, conflict %s): %w", tx.Hash().StringLE(), b.Index, hash.StringLE(), err)
return 0, fmt.Errorf("failed to retrieve conflict record stub for %s (height %d, conflict %s): %w", tx.Hash().StringLE(), b.Index, hash.StringLE(), err)

Check warning on line 795 in pkg/core/dao/dao.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/dao/dao.go#L795

Added line #L795 was not covered by tests
}
// It might be a block since we allow transactions to have block hash in the Conflicts attribute.
if v[0] != storage.ExecTransaction {
Expand All @@ -805,7 +810,7 @@
sKey := append(key, s.Account.BytesBE()...)
v, err := dao.Store.Get(sKey)
if err != nil {
return fmt.Errorf("failed to retrieve conflict record for %s (height %d, conflict %s, signer %s): %w", tx.Hash().StringLE(), b.Index, hash.StringLE(), address.Uint160ToString(s.Account), err)
return 0, fmt.Errorf("failed to retrieve conflict record for %s (height %d, conflict %s, signer %s): %w", tx.Hash().StringLE(), b.Index, hash.StringLE(), address.Uint160ToString(s.Account), err)

Check warning on line 813 in pkg/core/dao/dao.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/dao/dao.go#L813

Added line #L813 was not covered by tests
}
index = binary.LittleEndian.Uint32(v[1:])
if index == b.Index {
Expand All @@ -815,7 +820,7 @@
}
}

return nil
return b.Timestamp, nil
}

// PurgeHeader completely removes specified header from dao. It differs from
Expand Down
14 changes: 14 additions & 0 deletions pkg/core/dao/dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func TestPutGetBlock(t *testing.T) {
dao := NewSimple(storage.NewMemoryStore(), false)
b := &block.Block{
Header: block.Header{
Timestamp: 42,
Script: transaction.Witness{
VerificationScript: []byte{byte(opcode.PUSH1)},
InvocationScript: []byte{byte(opcode.NOP)},
Expand Down Expand Up @@ -107,6 +108,19 @@ func TestPutGetBlock(t *testing.T) {
require.Equal(t, 2, len(gotAppExecResult))
require.Equal(t, *appExecResult1, gotAppExecResult[0])
require.Equal(t, *appExecResult2, gotAppExecResult[1])

ts, err := dao.DeleteBlock(hash, false)
require.NoError(t, err)
require.Equal(t, uint64(42), ts)
gotBlock, err = dao.GetBlock(hash) // It's just a header, but it's still there.
require.NoError(t, err)
require.NotNil(t, gotBlock)

ts, err = dao.DeleteBlock(hash, true)
require.NoError(t, err)
require.Equal(t, uint64(42), ts)
_, err = dao.GetBlock(hash)
require.Error(t, err)
}

func TestGetVersion_NoVersion(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,7 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlockByIndex) error
}
header, err := s.chain.GetHeader(hash)
if err != nil {
break
continue
}
resp.Hdrs = append(resp.Hdrs, header)
}
Expand Down
Loading