Skip to content

Commit

Permalink
feat(shwap): Add ODS file (#3482)
Browse files Browse the repository at this point in the history
This PR adds ods file, that implements eds.Accessor interface. The file stores ods part of eds on disk and lazily reads data upon request. If requested data is from Q4, it reads full ods in single read and stores it in-memory for later re-use.
  • Loading branch information
walldiss authored Jun 19, 2024
1 parent 958ed80 commit 114e0f5
Show file tree
Hide file tree
Showing 13 changed files with 883 additions and 11 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
github.com/ipfs/go-ipld-format v0.6.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipld/go-car v0.6.2
github.com/klauspost/reedsolomon v1.12.1
github.com/libp2p/go-libp2p v0.33.2
github.com/libp2p/go-libp2p-kad-dht v0.25.2
github.com/libp2p/go-libp2p-pubsub v0.10.1
Expand Down Expand Up @@ -228,7 +229,6 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/compress v1.17.6 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/klauspost/reedsolomon v1.12.1 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/lib/pq v1.10.7 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
Expand Down
47 changes: 47 additions & 0 deletions share/new_eds/axis_half.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package eds

import (
"fmt"

"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/shwap"
)
Expand All @@ -20,3 +22,48 @@ func (a AxisHalf) ToRow() shwap.Row {
}
return shwap.NewRow(a.Shares, side)
}

// Extended returns full axis shares from half axis shares.
func (a AxisHalf) Extended() ([]share.Share, error) {
if a.IsParity {
return reconstructShares(a.Shares)
}
return extendShares(a.Shares)
}

// extendShares constructs full axis shares from original half axis shares.
func extendShares(original []share.Share) ([]share.Share, error) {
if len(original) == 0 {
return nil, fmt.Errorf("original shares are empty")
}

codec := share.DefaultRSMT2DCodec()
parity, err := codec.Encode(original)
if err != nil {
return nil, fmt.Errorf("encoding: %w", err)
}
shares := make([]share.Share, len(original)*2)
copy(shares, original)
copy(shares[len(original):], parity)
return shares, nil
}

// reconstructShares constructs full axis shares from parity half axis shares.
func reconstructShares(parity []share.Share) ([]share.Share, error) {
if len(parity) == 0 {
return nil, fmt.Errorf("parity shares are empty")
}

sqLen := len(parity) * 2
shares := make([]share.Share, sqLen)
for i := sqLen / 2; i < sqLen; i++ {
shares[i] = parity[i-sqLen/2]
}

codec := share.DefaultRSMT2DCodec()
shares, err := codec.Decode(shares)
if err != nil {
return nil, fmt.Errorf("reconstructing: %w", err)
}
return shares, nil
}
32 changes: 32 additions & 0 deletions share/new_eds/axis_half_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package eds

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/share/sharetest"
)

func TestExtendAxisHalf(t *testing.T) {
shares := sharetest.RandShares(t, 16)

original := AxisHalf{
Shares: shares,
IsParity: false,
}

extended, err := original.Extended()
require.NoError(t, err)
require.Len(t, extended, len(shares)*2)

parity := AxisHalf{
Shares: extended[len(shares):],
IsParity: true,
}

parityExtended, err := parity.Extended()
require.NoError(t, err)

require.Equal(t, extended, parityExtended)
}
2 changes: 1 addition & 1 deletion share/new_eds/rsmt2d_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

func TestMemFile(t *testing.T) {
odsSize := 8
newAccessor := func(eds *rsmt2d.ExtendedDataSquare) Accessor {
newAccessor := func(tb testing.TB, eds *rsmt2d.ExtendedDataSquare) Accessor {
return &Rsmt2D{ExtendedDataSquare: eds}
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
12 changes: 6 additions & 6 deletions share/new_eds/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/celestiaorg/celestia-node/share/shwap"
)

type createAccessor func(eds *rsmt2d.ExtendedDataSquare) Accessor
type createAccessor func(testing.TB, *rsmt2d.ExtendedDataSquare) Accessor

// TestSuiteAccessor runs a suite of tests for the given Accessor implementation.
func TestSuiteAccessor(
Expand Down Expand Up @@ -51,7 +51,7 @@ func testAccessorSample(
odsSize int,
) {
eds := edstest.RandEDS(t, odsSize)
fl := createAccessor(eds)
fl := createAccessor(t, eds)

dah, err := share.NewRoot(eds)
require.NoError(t, err)
Expand Down Expand Up @@ -108,7 +108,7 @@ func testAccessorRowNamespaceData(
for amount := 1; amount < sharesAmount; amount++ {
// select random amount of shares, but not less than 1
eds, dah := edstest.RandEDSWithNamespace(t, namespace, amount, odsSize)
f := createAccessor(eds)
f := createAccessor(t, eds)

var actualSharesAmount int
// loop over all rows and check that the amount of shares in the namespace is equal to the expected
Expand Down Expand Up @@ -149,7 +149,7 @@ func testAccessorRowNamespaceData(
absentNs, err := share.Namespace(maxNs).AddInt(-1)
require.NoError(t, err)

f := createAccessor(eds)
f := createAccessor(t, eds)
rowData, err := f.RowNamespaceData(ctx, absentNs, i)
require.NoError(t, err)

Expand All @@ -170,7 +170,7 @@ func testAccessorAxisHalf(
odsSize int,
) {
eds := edstest.RandEDS(t, odsSize)
fl := createAccessor(eds)
fl := createAccessor(t, eds)

t.Run("single thread", func(t *testing.T) {
for _, axisType := range []rsmt2d.Axis{rsmt2d.Col, rsmt2d.Row} {
Expand Down Expand Up @@ -224,7 +224,7 @@ func testAccessorShares(
odsSize int,
) {
eds := edstest.RandEDS(t, odsSize)
fl := createAccessor(eds)
fl := createAccessor(t, eds)

shares, err := fl.Shares(ctx)
require.NoError(t, err)
Expand Down
10 changes: 7 additions & 3 deletions share/shwap/row_namespace_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,14 @@ func (rnd RowNamespaceData) Validate(dah *share.Root, namespace share.Namespace,
// verifyInclusion checks the inclusion of the row's shares in the provided root using NMT.
func (rnd RowNamespaceData) verifyInclusion(rowRoot []byte, namespace share.Namespace) bool {
leaves := make([][]byte, 0, len(rnd.Shares))
for _, shr := range rnd.Shares {
namespaceBytes := share.GetNamespace(shr)
leaves = append(leaves, append(namespaceBytes, shr...))
for _, sh := range rnd.Shares {
namespaceBytes := share.GetNamespace(sh)
leave := make([]byte, len(sh)+len(namespaceBytes))
copy(leave, namespaceBytes)
copy(leave[len(namespaceBytes):], sh)
leaves = append(leaves, leave)
}

return rnd.Proof.VerifyNamespace(
share.NewSHA256Hasher(),
namespace.ToNMT(),
Expand Down
24 changes: 24 additions & 0 deletions share/shwap/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"

"github.com/celestiaorg/celestia-app/pkg/wrapper"
"github.com/celestiaorg/nmt"
nmt_pb "github.com/celestiaorg/nmt/pb"
"github.com/celestiaorg/rsmt2d"
Expand All @@ -24,6 +25,29 @@ type Sample struct {
ProofType rsmt2d.Axis // ProofType indicates whether the proof is against a row or a column.
}

// SampleFromShares creates a Sample from a list of shares, using the specified proof type and
// the share index to be included in the sample.
func SampleFromShares(shares []share.Share, proofType rsmt2d.Axis, axisIdx, shrIdx int) (Sample, error) {
tree := wrapper.NewErasuredNamespacedMerkleTree(uint64(len(shares)/2), uint(axisIdx))
for _, shr := range shares {
err := tree.Push(shr)
if err != nil {
return Sample{}, err
}
}

proof, err := tree.ProveRange(shrIdx, shrIdx+1)
if err != nil {
return Sample{}, err
}

return Sample{
Share: shares[shrIdx],
Proof: &proof,
ProofType: proofType,
}, nil
}

// SampleFromProto converts a protobuf Sample back into its domain model equivalent.
func SampleFromProto(s *pb.Sample) Sample {
proof := nmt.NewInclusionProof(
Expand Down
38 changes: 38 additions & 0 deletions store/file/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package file

import (
"sync"

"github.com/klauspost/reedsolomon"
)

var codec Codec

func init() {
codec = NewCodec()
}

type Codec interface {
Encoder(len int) (reedsolomon.Encoder, error)
}

type codecCache struct {
cache sync.Map
}

func NewCodec() Codec {
return &codecCache{}
}

func (l *codecCache) Encoder(len int) (reedsolomon.Encoder, error) {
enc, ok := l.cache.Load(len)
if !ok {
var err error
enc, err = reedsolomon.New(len/2, len/2, reedsolomon.WithLeopardGF(true))
if err != nil {
return nil, err
}
l.cache.Store(len, enc)
}
return enc.(reedsolomon.Encoder), nil
}
83 changes: 83 additions & 0 deletions store/file/codec_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package file

import (
"fmt"
"testing"

"github.com/klauspost/reedsolomon"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/share/sharetest"
)

func BenchmarkCodec(b *testing.B) {
minSize, maxSize := 32, 128

for size := minSize; size <= maxSize; size *= 2 {
// BenchmarkCodec/Leopard/size:32-10 409194 2793 ns/op
// BenchmarkCodec/Leopard/size:64-10 190969 6170 ns/op
// BenchmarkCodec/Leopard/size:128-10 82821 14287 ns/op
b.Run(fmt.Sprintf("Leopard/size:%v", size), func(b *testing.B) {
enc, err := reedsolomon.New(size/2, size/2, reedsolomon.WithLeopardGF(true))
require.NoError(b, err)

shards := newShards(b, size, true)

b.ResetTimer()
for i := 0; i < b.N; i++ {
err = enc.Encode(shards)
require.NoError(b, err)
}
})

// BenchmarkCodec/default/size:32-10 222153 5364 ns/op
// BenchmarkCodec/default/size:64-10 58831 20349 ns/op
// BenchmarkCodec/default/size:128-10 14940 80471 ns/op
b.Run(fmt.Sprintf("default/size:%v", size), func(b *testing.B) {
enc, err := reedsolomon.New(size/2, size/2, reedsolomon.WithLeopardGF(false))
require.NoError(b, err)

shards := newShards(b, size, true)

b.ResetTimer()
for i := 0; i < b.N; i++ {
err = enc.Encode(shards)
require.NoError(b, err)
}
})

// BenchmarkCodec/default-reconstructSome/size:32-10 1263585 954.4 ns/op
// BenchmarkCodec/default-reconstructSome/size:64-10 762273 1554 ns/op
// BenchmarkCodec/default-reconstructSome/size:128-10 429268 2974 ns/op
b.Run(fmt.Sprintf("default-reconstructSome/size:%v", size), func(b *testing.B) {
enc, err := reedsolomon.New(size/2, size/2, reedsolomon.WithLeopardGF(false))
require.NoError(b, err)

shards := newShards(b, size, false)
targets := make([]bool, size)
target := size - 2
targets[target] = true

b.ResetTimer()
for i := 0; i < b.N; i++ {
err = enc.ReconstructSome(shards, targets)
require.NoError(b, err)
shards[target] = nil
}
})
}
}

func newShards(b require.TestingT, size int, fillParity bool) [][]byte {
shards := make([][]byte, size)
original := sharetest.RandShares(b, size/2)
copy(shards, original)

if fillParity {
// fill with parity empty Shares
for j := len(original); j < len(shards); j++ {
shards[j] = make([]byte, len(original[0]))
}
}
return shards
}
Loading

0 comments on commit 114e0f5

Please sign in to comment.