Skip to content

Commit

Permalink
refactor: localstore transaction (#4626)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored May 9, 2024
1 parent 1c71a72 commit 52c2475
Show file tree
Hide file tree
Showing 119 changed files with 3,682 additions and 6,173 deletions.
2 changes: 1 addition & 1 deletion cmd/bee/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (c *command) setHomeDir() (err error) {

func (c *command) setAllFlags(cmd *cobra.Command) {
cmd.Flags().String(optionNameDataDir, filepath.Join(c.homeDir, ".bee"), "data directory")
cmd.Flags().Uint64(optionNameCacheCapacity, 1000000, fmt.Sprintf("cache capacity in chunks, multiply by %d to get approximate capacity in bytes", swarm.ChunkSize))
cmd.Flags().Uint64(optionNameCacheCapacity, 1_000_000, fmt.Sprintf("cache capacity in chunks, multiply by %d to get approximate capacity in bytes", swarm.ChunkSize))
cmd.Flags().Uint64(optionNameDBOpenFilesLimit, 200, "number of open files allowed by database")
cmd.Flags().Uint64(optionNameDBBlockCacheCapacity, 32*1024*1024, "size of block cache of the database in bytes")
cmd.Flags().Uint64(optionNameDBWriteBufferSize, 32*1024*1024, "size of the database write buffer in bytes")
Expand Down
85 changes: 82 additions & 3 deletions cmd/bee/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (

"github.com/ethersphere/bee/v2/pkg/node"
"github.com/ethersphere/bee/v2/pkg/postage"
"github.com/ethersphere/bee/v2/pkg/puller"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/storer/migration"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/spf13/cobra"
)
Expand All @@ -46,6 +48,7 @@ func (c *command) initDBCmd() {
dbCompactCmd(cmd)
dbValidateCmd(cmd)
dbValidatePinsCmd(cmd)
dbRepairReserve(cmd)

c.root.AddCommand(cmd)
}
Expand Down Expand Up @@ -81,6 +84,7 @@ func dbInfoCmd(cmd *cobra.Command) {
RadiusSetter: noopRadiusSetter{},
Batchstore: new(postage.NoOpBatchStore),
ReserveCapacity: node.ReserveCapacity,
CacheCapacity: 1_000_000,
})
if err != nil {
return fmt.Errorf("localstore: %w", err)
Expand Down Expand Up @@ -227,6 +231,77 @@ func dbValidatePinsCmd(cmd *cobra.Command) {
cmd.AddCommand(c)
}

func dbRepairReserve(cmd *cobra.Command) {
c := &cobra.Command{
Use: "repair-reserve",
Short: "Repairs the reserve by resetting the binIDs and removes dangling entries.",
RunE: func(cmd *cobra.Command, args []string) (err error) {
v, err := cmd.Flags().GetString(optionNameVerbosity)
if err != nil {
return fmt.Errorf("get verbosity: %w", err)
}
v = strings.ToLower(v)
logger, err := newLogger(cmd, v)
if err != nil {
return fmt.Errorf("new logger: %w", err)
}

dataDir, err := cmd.Flags().GetString(optionNameDataDir)
if err != nil {
return fmt.Errorf("get data-dir: %w", err)
}
if dataDir == "" {
return errors.New("no data-dir provided")
}

logger.Warning("Repair will recreate the reserve entries based on the chunk availability in the chunkstore. The epoch time and bin IDs will be reset.")
logger.Warning("The pullsync peer sync intervals are reset so on the next run, the node will perform historical syncing.")
logger.Warning("This is a destructive process. If the process is stopped for any reason, the reserve may become corrupted.")
logger.Warning("To prevent permanent loss of data, data should be backed up before running the cmd.")
logger.Warning("You have another 10 seconds to change your mind and kill this process with CTRL-C...")
time.Sleep(10 * time.Second)
logger.Warning("proceeding with repair...")

d, err := cmd.Flags().GetDuration(optionNameSleepAfter)
if err != nil {
logger.Error(err, "getting sleep value failed")
}
defer func() { time.Sleep(d) }()

db, err := storer.New(cmd.Context(), path.Join(dataDir, "localstore"), &storer.Options{
Logger: logger,
RadiusSetter: noopRadiusSetter{},
Batchstore: new(postage.NoOpBatchStore),
ReserveCapacity: node.ReserveCapacity,
CacheCapacity: 1_000_000,
})
if err != nil {
return fmt.Errorf("localstore: %w", err)
}
defer db.Close()

err = migration.ReserveRepairer(db.Storage(), storage.ChunkType, logger)()
if err != nil {
return fmt.Errorf("repair: %w", err)
}

stateStore, _, err := node.InitStateStore(logger, dataDir, 1000)
if err != nil {
return fmt.Errorf("new statestore: %w", err)
}
defer stateStore.Close()

return stateStore.Iterate(puller.IntervalPrefix, func(key, val []byte) (stop bool, err error) {
return false, stateStore.Delete(string(key))
})
},
}
c.Flags().String(optionNameDataDir, "", "data directory")
c.Flags().String(optionNameVerbosity, "info", "verbosity level")
c.Flags().Duration(optionNameSleepAfter, time.Duration(0), "time to sleep after the operation finished")
cmd.AddCommand(c)
}

func dbValidateCmd(cmd *cobra.Command) {
c := &cobra.Command{
Use: "validate",
Expand Down Expand Up @@ -257,7 +332,7 @@ func dbValidateCmd(cmd *cobra.Command) {

localstorePath := path.Join(dataDir, "localstore")

err = storer.Validate(context.Background(), localstorePath, &storer.Options{
err = storer.ValidateRetrievalIndex(context.Background(), localstorePath, &storer.Options{
Logger: logger,
RadiusSetter: noopRadiusSetter{},
Batchstore: new(postage.NoOpBatchStore),
Expand Down Expand Up @@ -325,6 +400,7 @@ func dbExportReserveCmd(cmd *cobra.Command) {
RadiusSetter: noopRadiusSetter{},
Batchstore: new(postage.NoOpBatchStore),
ReserveCapacity: node.ReserveCapacity,
CacheCapacity: 1_000_000,
})
if err != nil {
return fmt.Errorf("localstore: %w", err)
Expand Down Expand Up @@ -406,7 +482,8 @@ func dbExportPinningCmd(cmd *cobra.Command) {
Logger: logger,
RadiusSetter: noopRadiusSetter{},
Batchstore: new(postage.NoOpBatchStore),
ReserveCapacity: 4_194_304,
ReserveCapacity: node.ReserveCapacity,
CacheCapacity: 1_000_000,
})
if err != nil {
return fmt.Errorf("localstore: %w", err)
Expand Down Expand Up @@ -516,6 +593,7 @@ func dbImportReserveCmd(cmd *cobra.Command) {
RadiusSetter: noopRadiusSetter{},
Batchstore: new(postage.NoOpBatchStore),
ReserveCapacity: node.ReserveCapacity,
CacheCapacity: 1_000_000,
})
if err != nil {
return fmt.Errorf("localstore: %w", err)
Expand Down Expand Up @@ -598,7 +676,8 @@ func dbImportPinningCmd(cmd *cobra.Command) {
Logger: logger,
RadiusSetter: noopRadiusSetter{},
Batchstore: new(postage.NoOpBatchStore),
ReserveCapacity: 4_194_304,
ReserveCapacity: node.ReserveCapacity,
CacheCapacity: 1_000_000,
})
if err != nil {
return fmt.Errorf("localstore: %w", err)
Expand Down
13 changes: 2 additions & 11 deletions cmd/bee/cmd/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func TestDBExportImport(t *testing.T) {
if err != nil {
t.Fatal(err)
}
fmt.Println("put chunk: ", ch.Address().String())
chunks[ch.Address().String()] = 0
}
db1.Close()
Expand Down Expand Up @@ -115,7 +114,6 @@ func TestDBExportImportPinning(t *testing.T) {
if err != nil {
t.Fatal(err)
}
fmt.Println("collection ", rootAddr.String(), " put chunk: ", ch.Address().String())
chunks[ch.Address().String()] = 0
}
err = collection.Done(rootAddr)
Expand All @@ -125,16 +123,9 @@ func TestDBExportImportPinning(t *testing.T) {
pins[rootAddr.String()] = nil
}

addresses, err := db1.Pins()
if err != nil {
t.Fatal(err)
}
for _, addr := range addresses {
fmt.Println("pin: ", addr.String())
}
db1.Close()

err = newCommand(t, cmd.WithArgs("db", "export", "pinning", export, "--data-dir", dir1)).Execute()
err := newCommand(t, cmd.WithArgs("db", "export", "pinning", export, "--data-dir", dir1)).Execute()
if err != nil {
t.Fatal(err)
}
Expand All @@ -150,7 +141,7 @@ func TestDBExportImportPinning(t *testing.T) {
Logger: testutil.NewLogger(t),
ReserveCapacity: node.ReserveCapacity,
}, dir2)
addresses, err = db2.Pins()
addresses, err := db2.Pins()
if err != nil {
t.Fatal(err)
}
Expand Down
15 changes: 2 additions & 13 deletions pkg/api/stewardship.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (s *Service) stewardshipPutHandler(w http.ResponseWriter, r *http.Request)
}

headers := struct {
BatchID []byte `map:"Swarm-Postage-Batch-Id"`
BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"`
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
response("invalid header params", logger, w)
Expand All @@ -41,18 +41,7 @@ func (s *Service) stewardshipPutHandler(w http.ResponseWriter, r *http.Request)
err error
)

if len(headers.BatchID) == 0 {
logger.Debug("missing postage batch id for re-upload")
batchID, err = s.storer.BatchHint(paths.Address)
if err != nil {
logger.Debug("unable to find old batch for reference", "error", err)
logger.Error(nil, "unable to find old batch for reference")
jsonhttp.NotFound(w, "unable to find old batch for reference, provide new batch id")
return
}
} else {
batchID = headers.BatchID
}
batchID = headers.BatchID
stamper, save, err := s.getStamper(batchID)
if err != nil {
switch {
Expand Down
1 change: 1 addition & 0 deletions pkg/api/stewardship_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestStewardship(t *testing.T) {
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
jsonhttptest.WithRequestHeader("Swarm-Postage-Batch-Id", "aa"),
)
if !stewardMock.LastAddress().Equal(addr) {
t.Fatalf("\nhave address: %q\nwant address: %q", stewardMock.LastAddress().String(), addr.String())
Expand Down
72 changes: 69 additions & 3 deletions pkg/file/joiner/joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ func (m *mockPutter) store(cnt int) error {
}

// nolint:thelper
func TestJoinerRedundancy(t *testing.T) {
func TestJoinerRedundancy_FLAKY(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
rLevel redundancy.Level
Expand Down Expand Up @@ -1229,8 +1229,7 @@ func TestJoinerRedundancyMultilevel(t *testing.T) {
t.Parallel()
test := func(t *testing.T, rLevel redundancy.Level, encrypt bool, size int) {
t.Helper()
store := mockstorer.NewForgettingStore(inmemchunkstore.New())
testutil.CleanupCloser(t, store)
store := mockstorer.NewForgettingStore(newChunkStore())
seed, err := pseudorand.NewSeed()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1367,3 +1366,70 @@ func TestJoinerRedundancyMultilevel(t *testing.T) {
})
}
}

type chunkStore struct {
mu sync.Mutex
chunks map[string]swarm.Chunk
}

func newChunkStore() *chunkStore {
return &chunkStore{
chunks: make(map[string]swarm.Chunk),
}
}

func (c *chunkStore) Get(_ context.Context, addr swarm.Address) (swarm.Chunk, error) {
c.mu.Lock()
defer c.mu.Unlock()

chunk, ok := c.chunks[addr.ByteString()]
if !ok {
return nil, storage.ErrNotFound
}
return chunk, nil
}

func (c *chunkStore) Put(_ context.Context, ch swarm.Chunk) error {
c.mu.Lock()
defer c.mu.Unlock()
c.chunks[ch.Address().ByteString()] = swarm.NewChunk(ch.Address(), ch.Data()).WithStamp(ch.Stamp())
return nil
}

func (c *chunkStore) Has(_ context.Context, addr swarm.Address) (bool, error) {
c.mu.Lock()
defer c.mu.Unlock()

_, exists := c.chunks[addr.ByteString()]

return exists, nil
}

func (c *chunkStore) Delete(_ context.Context, addr swarm.Address) error {
c.mu.Lock()
defer c.mu.Unlock()

delete(c.chunks, addr.ByteString())
return nil
}

func (c *chunkStore) Iterate(_ context.Context, fn storage.IterateChunkFn) error {
c.mu.Lock()
defer c.mu.Unlock()

for _, c := range c.chunks {
stop, err := fn(c)
if err != nil {
return err
}
if stop {
return nil
}
}

return nil
}

func (c *chunkStore) Close() error {
return nil
}
8 changes: 0 additions & 8 deletions pkg/node/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/ethersphere/bee/v2/pkg/pricing"
"github.com/ethersphere/bee/v2/pkg/retrieval"
"github.com/ethersphere/bee/v2/pkg/settlement/pseudosettle"
"github.com/ethersphere/bee/v2/pkg/settlement/swap/chequebook"
"github.com/ethersphere/bee/v2/pkg/spinlock"
"github.com/ethersphere/bee/v2/pkg/storage"
storer "github.com/ethersphere/bee/v2/pkg/storer"
Expand All @@ -42,7 +41,6 @@ import (
"github.com/ethersphere/bee/v2/pkg/topology/kademlia"
"github.com/ethersphere/bee/v2/pkg/topology/lightnode"
"github.com/ethersphere/bee/v2/pkg/tracing"
"github.com/ethersphere/bee/v2/pkg/transaction"
"github.com/hashicorp/go-multierror"
ma "github.com/multiformats/go-multiaddr"
)
Expand All @@ -65,15 +63,9 @@ func bootstrapNode(
addr string,
swarmAddress swarm.Address,
nonce []byte,
chainID int64,
overlayEthAddress common.Address,
addressbook addressbook.Interface,
bootnodes []ma.Multiaddr,
lightNodes *lightnode.Container,
chequebookService chequebook.Service,
chequeStore chequebook.ChequeStore,
cashoutService chequebook.CashoutService,
transactionService transaction.Service,
stateStore storage.StateStorer,
signer crypto.Signer,
networkID uint64,
Expand Down
Loading

0 comments on commit 52c2475

Please sign in to comment.