Skip to content

Commit

Permalink
fix: override same chunk (#4749)
Browse files Browse the repository at this point in the history
  • Loading branch information
acha-bill authored Aug 26, 2024
1 parent 17782b8 commit cd86a9f
Show file tree
Hide file tree
Showing 10 changed files with 342 additions and 3 deletions.
10 changes: 9 additions & 1 deletion pkg/file/joiner/joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/file/splitter"
filetest "github.com/ethersphere/bee/v2/pkg/file/testing"
"github.com/ethersphere/bee/v2/pkg/log"
storage "github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage/inmemchunkstore"
testingc "github.com/ethersphere/bee/v2/pkg/storage/testing"
mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock"
Expand Down Expand Up @@ -1411,6 +1411,14 @@ func (c *chunkStore) Put(_ context.Context, ch swarm.Chunk) error {
return nil
}

func (c *chunkStore) Replace(_ context.Context, ch swarm.Chunk) error {
c.mu.Lock()
defer c.mu.Unlock()
c.chunks[ch.Address().ByteString()] = swarm.NewChunk(ch.Address(), ch.Data()).WithStamp(ch.Stamp())
return nil

}

func (c *chunkStore) Has(_ context.Context, addr swarm.Address) (bool, error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
33 changes: 33 additions & 0 deletions pkg/soc/testing/soc.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,39 @@ func (ms MockSOC) Chunk() swarm.Chunk {
return swarm.NewChunk(ms.Address(), append(ms.ID, append(ms.Signature, ms.WrappedChunk.Data()...)...))
}

// GenerateMockSocWithSigner generates a valid mocked SOC from given data and signer.
func GenerateMockSocWithSigner(t *testing.T, data []byte, signer crypto.Signer) *MockSOC {
t.Helper()

owner, err := signer.EthereumAddress()
if err != nil {
t.Fatal(err)
}
ch, err := cac.New(data)
if err != nil {
t.Fatal(err)
}

id := make([]byte, swarm.HashSize)
hasher := swarm.NewHasher()
_, err = hasher.Write(append(id, ch.Address().Bytes()...))
if err != nil {
t.Fatal(err)
}

signature, err := signer.Sign(hasher.Sum(nil))
if err != nil {
t.Fatal(err)
}

return &MockSOC{
ID: id,
Owner: owner.Bytes(),
Signature: signature,
WrappedChunk: ch,
}
}

// GenerateMockSOC generates a valid mocked SOC from given data.
func GenerateMockSOC(t *testing.T, data []byte) *MockSOC {
t.Helper()
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/chunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ type Hasser interface {
Has(context.Context, swarm.Address) (bool, error)
}

// Replacer is the interface that wraps the basic Replace method.
type Replacer interface {
// Replace a chunk in the store.
Replace(context.Context, swarm.Chunk) error
}

// PutterFunc type is an adapter to allow the use of
// ChunkStore as Putter interface. If f is a function
// with the appropriate signature, PutterFunc(f) is a
Expand Down Expand Up @@ -70,6 +76,7 @@ type ChunkStore interface {
Putter
Deleter
Hasser
Replacer

// Iterate over chunks in no particular order.
Iterate(context.Context, IterateChunkFn) error
Expand Down
12 changes: 11 additions & 1 deletion pkg/storage/inmemchunkstore/inmemchunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"context"
"sync"

storage "github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/swarm"
)

Expand Down Expand Up @@ -77,6 +77,16 @@ func (c *ChunkStore) Delete(_ context.Context, addr swarm.Address) error {
return nil
}

func (c *ChunkStore) Replace(_ context.Context, ch swarm.Chunk) error {
c.mu.Lock()
defer c.mu.Unlock()

chunkCount := c.chunks[ch.Address().ByteString()]
chunkCount.chunk = ch
c.chunks[ch.Address().ByteString()] = chunkCount
return nil
}

func (c *ChunkStore) Iterate(_ context.Context, fn storage.IterateChunkFn) error {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/storer/internal/chunkstamp/chunkstamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"fmt"

"github.com/ethersphere/bee/v2/pkg/postage"
storage "github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage/storageutil"
"github.com/ethersphere/bee/v2/pkg/swarm"
)
Expand Down
21 changes: 21 additions & 0 deletions pkg/storer/internal/chunkstore/chunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,27 @@ func Put(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.
return s.Put(rIdx)
}

func Replace(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.Chunk) error {
rIdx := &RetrievalIndexItem{Address: ch.Address()}
err := s.Get(rIdx)
if err != nil {
return fmt.Errorf("chunk store: failed to read retrievalIndex for address %s: %w", ch.Address(), err)
}

err = sh.Release(ctx, rIdx.Location)
if err != nil {
return fmt.Errorf("chunkstore: failed to release sharky location: %w", err)
}

loc, err := sh.Write(ctx, ch.Data())
if err != nil {
return fmt.Errorf("chunk store: write to sharky failed: %w", err)
}
rIdx.Location = loc
rIdx.Timestamp = uint64(time.Now().Unix())
return s.Put(rIdx)
}

func Delete(ctx context.Context, s storage.IndexStore, sh storage.Sharky, addr swarm.Address) error {
rIdx := &RetrievalIndexItem{Address: addr}
err := s.Get(rIdx)
Expand Down
109 changes: 109 additions & 0 deletions pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,106 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
if err != nil {
return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err)
}

sameAddressOldChunkStamp, err := chunkstamp.Load(s.IndexStore(), reserveNamespace, chunk.Address())
if err != nil && !errors.Is(err, storage.ErrNotFound) {
return err
}

// same address
if sameAddressOldChunkStamp != nil {
sameAddressOldStampIndex, err := stampindex.LoadWithStamp(s.IndexStore(), reserveNamespace, sameAddressOldChunkStamp)
if err != nil {
return err
}
prev := binary.BigEndian.Uint64(sameAddressOldStampIndex.StampTimestamp)
curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
if prev >= curr {
return fmt.Errorf("overwrite same chunk. prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk)
}
if loadedStamp {
prev := binary.BigEndian.Uint64(oldItem.StampTimestamp)
if prev >= curr {
return fmt.Errorf("overwrite same chunk. prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk)
}

if !chunk.Address().Equal(oldItem.ChunkAddress) {
r.logger.Warning(
"replacing chunk stamp index",
"old_chunk", oldItem.ChunkAddress,
"new_chunk", chunk.Address(),
"batch_id", hex.EncodeToString(chunk.Stamp().BatchID()),
)
err = r.removeChunk(ctx, s, oldItem.ChunkAddress, oldItem.BatchID, oldItem.StampHash)
if err != nil {
return fmt.Errorf("failed removing older chunk %s: %w", oldItem.ChunkAddress, err)
}
}
}

oldBatchRadiusItem := &BatchRadiusItem{
Bin: bin,
Address: chunk.Address(),
BatchID: sameAddressOldStampIndex.BatchID,
StampHash: sameAddressOldStampIndex.StampHash,
}
// load item to get the binID
err = s.IndexStore().Get(oldBatchRadiusItem)
if err != nil {
return err
}

err = r.deleteWithStamp(s, oldBatchRadiusItem, sameAddressOldChunkStamp)
if err != nil {
return err
}

err = stampindex.Store(s.IndexStore(), reserveNamespace, chunk)
if err != nil {
return err
}

err = chunkstamp.Store(s.IndexStore(), reserveNamespace, chunk)
if err != nil {
return err
}

binID, err := r.IncBinID(s.IndexStore(), bin)
if err != nil {
return err
}

err = s.IndexStore().Put(&BatchRadiusItem{
Bin: bin,
BinID: binID,
Address: chunk.Address(),
BatchID: chunk.Stamp().BatchID(),
StampHash: stampHash,
})
if err != nil {
return err
}

err = s.IndexStore().Put(&ChunkBinItem{
Bin: bin,
BinID: binID,
Address: chunk.Address(),
BatchID: chunk.Stamp().BatchID(),
ChunkType: storage.ChunkType(chunk),
StampHash: stampHash,
})
if err != nil {
return err
}

if storage.ChunkType(chunk) != swarm.ChunkTypeSingleOwner {
return nil
}
r.logger.Warning("replacing soc in chunkstore", "address", chunk.Address())
return s.ChunkStore().Replace(ctx, chunk)
}

// different address
if loadedStamp {
prev := binary.BigEndian.Uint64(oldItem.StampTimestamp)
curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
Expand Down Expand Up @@ -199,6 +299,15 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
})
}

func (r *Reserve) deleteWithStamp(s transaction.Store, oldBatchRadiusItem *BatchRadiusItem, sameAddressOldChunkStamp swarm.Stamp) error {
return errors.Join(
s.IndexStore().Delete(oldBatchRadiusItem),
s.IndexStore().Delete(&ChunkBinItem{Bin: oldBatchRadiusItem.Bin, BinID: oldBatchRadiusItem.BinID}),
stampindex.Delete(s.IndexStore(), reserveNamespace, swarm.NewChunk(oldBatchRadiusItem.Address, nil).WithStamp(sameAddressOldChunkStamp)),
chunkstamp.DeleteWithStamp(s.IndexStore(), reserveNamespace, oldBatchRadiusItem.Address, sameAddressOldChunkStamp),
)
}

func (r *Reserve) Has(addr swarm.Address, batchID []byte, stampHash []byte) (bool, error) {
item := &BatchRadiusItem{Bin: swarm.Proximity(r.baseAddr.Bytes(), addr.Bytes()), BatchID: batchID, Address: addr, StampHash: stampHash}
return r.st.IndexStore().Has(item)
Expand Down
Loading

0 comments on commit cd86a9f

Please sign in to comment.