Skip to content

Commit

Permalink
chore: refactor and added tests
Browse files Browse the repository at this point in the history
  • Loading branch information
acha-bill committed Aug 8, 2024
1 parent 8be8f9b commit 84a1a15
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 1 deletion.
26 changes: 25 additions & 1 deletion pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,19 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
defer r.multx.Unlock(strconv.Itoa(int(bin)))

return r.st.Run(ctx, func(s transaction.Store) error {

// index collision (same or different chunk)
oldItem, loadedStamp, err := stampindex.LoadOrStore(s.IndexStore(), reserveNamespace, chunk)
if err != nil {
return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err)
}

if loadedStamp {
prev := binary.BigEndian.Uint64(oldItem.StampTimestamp)
curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
if prev >= curr {
return fmt.Errorf("overwrite prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk)
}

// An older (same or different) chunk with the same batchID and stamp index has been previously
// saved to the reserve. We must do the below before saving the new chunk:
// 1. Delete the old chunk from the chunkstore.
Expand All @@ -152,6 +154,28 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
if err != nil {
return fmt.Errorf("failed updating stamp index: %w", err)
}
} else {
// same chunk with different index
oldChunkStamp, err := chunkstamp.LoadWithBatchID(s.IndexStore(), reserveNamespace, chunk.Address(), chunk.Stamp().BatchID())
if err != nil && !errors.Is(err, storage.ErrNotFound) {
return err
}
if oldChunkStamp != nil {
prev := binary.BigEndian.Uint64(oldChunkStamp.Timestamp())
curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
if prev >= curr {
return fmt.Errorf("overwrite prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk)
}
oldChunkStampHash, err := oldChunkStamp.Hash()
if err != nil {
return err
}
err = r.removeChunk(ctx, s, chunk.Address(), oldChunkStamp.BatchID(), oldChunkStampHash)
if err != nil {
return fmt.Errorf("failed removing older chunk %s: %w", oldItem.ChunkAddress, err)
}
// stampindex is already saved in LoadOrStore
}
}

err = chunkstamp.Store(s.IndexStore(), reserveNamespace, chunk)
Expand Down
98 changes: 98 additions & 0 deletions pkg/storer/internal/reserve/reserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,104 @@ func TestReserveChunkType(t *testing.T) {
}
}

func TestSameChunkSameIndex(t *testing.T) {
t.Parallel()

ctx := context.Background()
baseAddr := swarm.RandAddress(t)

ts := internal.NewInmemStorage()

r, err := reserve.New(
baseAddr,
ts,
0, kademlia.NewTopologyDriver(),
log.Noop,
)
if err != nil {
t.Fatal(err)
}

t.Run("same stamp index older timestamp", func(t *testing.T) {
batch := postagetesting.MustNewBatch()
ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0))
ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0))
err = r.Put(ctx, ch1)
if err != nil {
t.Fatal(err)
}
err = r.Put(ctx, ch2)
if !errors.Is(err, storage.ErrOverwriteNewerChunk) {
t.Fatal("expected error")
}
})

t.Run("different stamp index older timestamp", func(t *testing.T) {
batch := postagetesting.MustNewBatch()
ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0))
ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 1, 0))

err = r.Put(ctx, ch1)
if err != nil {
t.Fatal(err)
}
err = r.Put(ctx, ch2)
if !errors.Is(err, storage.ErrOverwriteNewerChunk) {
t.Fatal("expected error")
}
})

replace := func(t *testing.T, ch1, ch2 swarm.Chunk) {
t.Helper()

err := r.Put(ctx, ch1)
if err != nil {
t.Fatal(err)
}

err = r.Put(ctx, ch2)
if err != nil {
t.Fatal(err)
}

ch1StampHash, err := ch1.Stamp().Hash()
if err != nil {
t.Fatal(err)
}

ch2StampHash, err := ch2.Stamp().Hash()
if err != nil {
t.Fatal(err)
}
checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: 0, BatchID: ch1.Stamp().BatchID(), Address: ch1.Address(), StampHash: ch1StampHash}, true)
checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: 0, BatchID: ch2.Stamp().BatchID(), Address: ch2.Address(), StampHash: ch2StampHash}, false)
checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: 0, BinID: 3, StampHash: ch1StampHash}, true)
checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: 0, BinID: 4, StampHash: ch2StampHash}, false)

ch, err := ts.ChunkStore().Get(ctx, ch2.Address())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(ch.Data(), ch2.Data()) {
t.Fatalf("expected chunk data to be updated")
}
}

t.Run("same stamp index newer timestamp", func(t *testing.T) {
batch := postagetesting.MustNewBatch()
ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0))
ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 1))
replace(t, ch1, ch2)
})

t.Run("different stamp index newer timestamp", func(t *testing.T) {
batch := postagetesting.MustNewBatch()
ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0))
ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 1, 1))
replace(t, ch1, ch2)
})
}

func TestReplaceOldIndex(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 84a1a15

Please sign in to comment.