Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: override same chunk #4749

Merged
merged 14 commits into from
Aug 26, 2024
10 changes: 9 additions & 1 deletion pkg/file/joiner/joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/file/splitter"
filetest "github.com/ethersphere/bee/v2/pkg/file/testing"
"github.com/ethersphere/bee/v2/pkg/log"
storage "github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage/inmemchunkstore"
testingc "github.com/ethersphere/bee/v2/pkg/storage/testing"
mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock"
Expand Down Expand Up @@ -1411,6 +1411,14 @@ func (c *chunkStore) Put(_ context.Context, ch swarm.Chunk) error {
return nil
}

func (c *chunkStore) Replace(_ context.Context, ch swarm.Chunk) error {
c.mu.Lock()
defer c.mu.Unlock()
c.chunks[ch.Address().ByteString()] = swarm.NewChunk(ch.Address(), ch.Data()).WithStamp(ch.Stamp())
return nil

}

func (c *chunkStore) Has(_ context.Context, addr swarm.Address) (bool, error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
33 changes: 33 additions & 0 deletions pkg/soc/testing/soc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,39 @@ func (ms MockSOC) Chunk() swarm.Chunk {
return swarm.NewChunk(ms.Address(), append(ms.ID, append(ms.Signature, ms.WrappedChunk.Data()...)...))
}

// GenerateMockSocWithSigner generates a valid mocked SOC from given data and signer.
func GenerateMockSocWithSigner(t *testing.T, data []byte, signer crypto.Signer) *MockSOC {
t.Helper()

owner, err := signer.EthereumAddress()
if err != nil {
t.Fatal(err)
}
ch, err := cac.New(data)
if err != nil {
t.Fatal(err)
}

id := make([]byte, swarm.HashSize)
hasher := swarm.NewHasher()
_, err = hasher.Write(append(id, ch.Address().Bytes()...))
if err != nil {
t.Fatal(err)
}

signature, err := signer.Sign(hasher.Sum(nil))
if err != nil {
t.Fatal(err)
}

return &MockSOC{
ID: id,
Owner: owner.Bytes(),
Signature: signature,
WrappedChunk: ch,
}
}

// GenerateMockSOC generates a valid mocked SOC from given data.
func GenerateMockSOC(t *testing.T, data []byte) *MockSOC {
t.Helper()
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/chunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ type Hasser interface {
Has(context.Context, swarm.Address) (bool, error)
}

// Replacer is the interface that wraps the basic Replace method.
type Replacer interface {
// Replace a chunk in the store.
Replace(context.Context, swarm.Chunk) error
}

// PutterFunc type is an adapter to allow the use of
// ChunkStore as Putter interface. If f is a function
// with the appropriate signature, PutterFunc(f) is a
Expand Down Expand Up @@ -70,6 +76,7 @@ type ChunkStore interface {
Putter
Deleter
Hasser
Replacer

// Iterate over chunks in no particular order.
Iterate(context.Context, IterateChunkFn) error
Expand Down
12 changes: 11 additions & 1 deletion pkg/storage/inmemchunkstore/inmemchunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"context"
"sync"

storage "github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/swarm"
)

Expand Down Expand Up @@ -77,6 +77,16 @@ func (c *ChunkStore) Delete(_ context.Context, addr swarm.Address) error {
return nil
}

func (c *ChunkStore) Replace(_ context.Context, ch swarm.Chunk) error {
c.mu.Lock()
defer c.mu.Unlock()

chunkCount := c.chunks[ch.Address().ByteString()]
chunkCount.chunk = ch
c.chunks[ch.Address().ByteString()] = chunkCount
return nil
}

func (c *ChunkStore) Iterate(_ context.Context, fn storage.IterateChunkFn) error {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/storer/internal/chunkstamp/chunkstamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"fmt"

"github.com/ethersphere/bee/v2/pkg/postage"
storage "github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage/storageutil"
"github.com/ethersphere/bee/v2/pkg/swarm"
)
Expand Down
21 changes: 21 additions & 0 deletions pkg/storer/internal/chunkstore/chunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,27 @@ func Put(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.
return s.Put(rIdx)
}

func Replace(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.Chunk) error {
rIdx := &RetrievalIndexItem{Address: ch.Address()}
err := s.Get(rIdx)
if err != nil {
return fmt.Errorf("chunk store: failed to read retrievalIndex for address %s: %w", ch.Address(), err)
}

err = sh.Release(ctx, rIdx.Location)
if err != nil {
return fmt.Errorf("chunkstore: failed to release sharky location: %w", err)
}

loc, err := sh.Write(ctx, ch.Data())
if err != nil {
return fmt.Errorf("chunk store: write to sharky failed: %w", err)
}
rIdx.Location = loc
rIdx.Timestamp = uint64(time.Now().Unix())
return s.Put(rIdx)
}

func Delete(ctx context.Context, s storage.IndexStore, sh storage.Sharky, addr swarm.Address) error {
rIdx := &RetrievalIndexItem{Address: addr}
err := s.Get(rIdx)
Expand Down
103 changes: 103 additions & 0 deletions pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,100 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
if err != nil {
return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err)
}

sameAddressOldChunkStamp, err := chunkstamp.Load(s.IndexStore(), reserveNamespace, chunk.Address())
if err != nil && !errors.Is(err, storage.ErrNotFound) {
return err
}

// same address
if sameAddressOldChunkStamp != nil {
sameAddressOldStampIndex, err := stampindex.LoadWithStamp(s.IndexStore(), reserveNamespace, sameAddressOldChunkStamp)
if err != nil {
return err
}
prev := binary.BigEndian.Uint64(sameAddressOldStampIndex.StampTimestamp)
curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
if prev >= curr {
return fmt.Errorf("overwrite same chunk. prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk)
}
if loadedStamp {
prev := binary.BigEndian.Uint64(oldItem.StampTimestamp)
if prev >= curr {
return fmt.Errorf("overwrite same chunk. prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk)
}

if !chunk.Address().Equal(oldItem.ChunkAddress) {
r.logger.Warning(
"replacing chunk stamp index",
"old_chunk", oldItem.ChunkAddress,
"new_chunk", chunk.Address(),
"batch_id", hex.EncodeToString(chunk.Stamp().BatchID()),
)
err = r.removeChunk(ctx, s, oldItem.ChunkAddress, oldItem.BatchID, oldItem.StampHash)
if err != nil {
return fmt.Errorf("failed removing older chunk %s: %w", oldItem.ChunkAddress, err)
}
}
}

oldBatchRadiusItem := &BatchRadiusItem{
Bin: bin,
Address: chunk.Address(),
BatchID: sameAddressOldStampIndex.BatchID,
StampHash: sameAddressOldStampIndex.StampHash,
}
err = r.deleteWithStamp(s, oldBatchRadiusItem, sameAddressOldChunkStamp)
if err != nil {
return err
}

err = stampindex.Store(s.IndexStore(), reserveNamespace, chunk)
if err != nil {
return err
}

err = chunkstamp.Store(s.IndexStore(), reserveNamespace, chunk)
if err != nil {
return err
}

binID, err := r.IncBinID(s.IndexStore(), bin)
if err != nil {
return err
}

err = s.IndexStore().Put(&BatchRadiusItem{
Bin: bin,
BinID: binID,
Address: chunk.Address(),
BatchID: chunk.Stamp().BatchID(),
StampHash: stampHash,
})
if err != nil {
return err
}

err = s.IndexStore().Put(&ChunkBinItem{
Bin: bin,
BinID: binID,
Address: chunk.Address(),
BatchID: chunk.Stamp().BatchID(),
ChunkType: storage.ChunkType(chunk),
StampHash: stampHash,
})
if err != nil {
return err
}

if storage.ChunkType(chunk) != swarm.ChunkTypeSingleOwner {
return nil
}
r.logger.Warning("replacing soc in chunkstore", "address", chunk.Address())
return s.ChunkStore().Replace(ctx, chunk)
}

// different address
if loadedStamp {
prev := binary.BigEndian.Uint64(oldItem.StampTimestamp)
curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
Expand Down Expand Up @@ -199,6 +293,15 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
})
}

func (r *Reserve) deleteWithStamp(s transaction.Store, oldBatchRadiusItem *BatchRadiusItem, sameAddressOldChunkStamp swarm.Stamp) error {
return errors.Join(
s.IndexStore().Delete(oldBatchRadiusItem),
s.IndexStore().Delete(&ChunkBinItem{Bin: oldBatchRadiusItem.Bin, BinID: oldBatchRadiusItem.BinID}),
stampindex.Delete(s.IndexStore(), reserveNamespace, swarm.NewChunk(oldBatchRadiusItem.Address, nil).WithStamp(sameAddressOldChunkStamp)),
chunkstamp.DeleteWithStamp(s.IndexStore(), reserveNamespace, oldBatchRadiusItem.Address, sameAddressOldChunkStamp),
)
}

func (r *Reserve) Has(addr swarm.Address, batchID []byte, stampHash []byte) (bool, error) {
item := &BatchRadiusItem{Bin: swarm.Proximity(r.baseAddr.Bytes(), addr.Bytes()), BatchID: batchID, Address: addr, StampHash: stampHash}
return r.st.IndexStore().Has(item)
Expand Down
Loading
Loading