Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(shwap): Add ODS file #3482

Merged
merged 13 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
github.com/ipfs/go-ipld-format v0.6.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipld/go-car v0.6.2
github.com/klauspost/reedsolomon v1.12.1
github.com/libp2p/go-libp2p v0.33.2
github.com/libp2p/go-libp2p-kad-dht v0.25.2
github.com/libp2p/go-libp2p-pubsub v0.10.1
Expand Down Expand Up @@ -228,7 +229,6 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/compress v1.17.6 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/klauspost/reedsolomon v1.12.1 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/lib/pq v1.10.7 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
Expand Down
47 changes: 47 additions & 0 deletions share/new_eds/axis_half.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package eds

import (
"fmt"

"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/shwap"
)
Expand All @@ -20,3 +22,48 @@ func (a AxisHalf) ToRow() shwap.Row {
}
return shwap.NewRow(a.Shares, side)
}

// Extended returns full axis shares from half axis shares.
func (a AxisHalf) Extended() ([]share.Share, error) {
if a.IsParity {
return reconstructShares(a.Shares)
}
return extendShares(a.Shares)
}

// extendShares constructs full axis shares from original half axis shares.
func extendShares(original []share.Share) ([]share.Share, error) {
if len(original) == 0 {
return nil, fmt.Errorf("original shares are empty")
}

codec := share.DefaultRSMT2DCodec()
parity, err := codec.Encode(original)
if err != nil {
return nil, fmt.Errorf("encoding: %w", err)
}
shares := make([]share.Share, len(original)*2)
copy(shares, original)
copy(shares[len(original):], parity)
return shares, nil
Comment on lines +45 to +48
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like there is an optimization opportunity to avoid copying that we can return to later

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to prealloc shares anyway, so copying is necessary. Can do implicitly by append tho

}

// reconstructShares constructs full axis shares from parity half axis shares.
func reconstructShares(parity []share.Share) ([]share.Share, error) {
if len(parity) == 0 {
return nil, fmt.Errorf("parity shares are empty")
}

sqLen := len(parity) * 2
shares := make([]share.Share, sqLen)
for i := sqLen / 2; i < sqLen; i++ {
shares[i] = parity[i-sqLen/2]
}

codec := share.DefaultRSMT2DCodec()
shares, err := codec.Decode(shares)
if err != nil {
return nil, fmt.Errorf("reconstructing: %w", err)
}
return shares, nil
}
32 changes: 32 additions & 0 deletions share/new_eds/axis_half_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package eds

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/share/sharetest"
)

func TestExtendAxisHalf(t *testing.T) {
shares := sharetest.RandShares(t, 16)

original := AxisHalf{
Shares: shares,
IsParity: false,
}

extended, err := original.Extended()
require.NoError(t, err)
require.Len(t, extended, len(shares)*2)

parity := AxisHalf{
Shares: extended[len(shares):],
IsParity: true,
}

parityExtended, err := parity.Extended()
require.NoError(t, err)

require.Equal(t, extended, parityExtended)
}
10 changes: 7 additions & 3 deletions share/shwap/row_namespace_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,14 @@ func (rnd RowNamespaceData) Validate(dah *share.Root, namespace share.Namespace,
// verifyInclusion checks the inclusion of the row's shares in the provided root using NMT.
func (rnd RowNamespaceData) verifyInclusion(rowRoot []byte, namespace share.Namespace) bool {
leaves := make([][]byte, 0, len(rnd.Shares))
for _, shr := range rnd.Shares {
namespaceBytes := share.GetNamespace(shr)
leaves = append(leaves, append(namespaceBytes, shr...))
for _, sh := range rnd.Shares {
namespaceBytes := share.GetNamespace(sh)
leave := make([]byte, len(sh)+len(namespaceBytes))
copy(leave, namespaceBytes)
copy(leave[len(namespaceBytes):], sh)
leaves = append(leaves, leave)
Comment on lines +168 to +172
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a refactoring from appends to copies?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that you noticed. There was a very nasty mutations bug that took me few hours to fix.

}

return rnd.Proof.VerifyNamespace(
share.NewSHA256Hasher(),
namespace.ToNMT(),
Expand Down
22 changes: 22 additions & 0 deletions share/shwap/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"

"github.com/celestiaorg/celestia-app/pkg/wrapper"
"github.com/celestiaorg/nmt"
nmt_pb "github.com/celestiaorg/nmt/pb"
"github.com/celestiaorg/rsmt2d"
Expand All @@ -24,6 +25,27 @@ type Sample struct {
ProofType rsmt2d.Axis // ProofType indicates whether the proof is against a row or a column.
}

func SampleFromShares(shares []share.Share, proofType rsmt2d.Axis, axisIdx, shrIdx int) (Sample, error) {
tree := wrapper.NewErasuredNamespacedMerkleTree(uint64(len(shares)/2), uint(axisIdx))
for _, shr := range shares {
err := tree.Push(shr)
if err != nil {
return Sample{}, err
}
}

proof, err := tree.ProveRange(shrIdx, shrIdx+1)
if err != nil {
return Sample{}, err
}

return Sample{
Share: shares[shrIdx],
Proof: &proof,
ProofType: proofType,
}, nil
}

// SampleFromProto converts a protobuf Sample back into its domain model equivalent.
func SampleFromProto(s *pb.Sample) Sample {
proof := nmt.NewInclusionProof(
Expand Down
38 changes: 38 additions & 0 deletions store/file/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package file

import (
"sync"

"github.com/klauspost/reedsolomon"
)

var codec Codec

func init() {
codec = NewCodec()
}

type Codec interface {
Encoder(len int) (reedsolomon.Encoder, error)
}

type codecCache struct {
cache sync.Map
}

func NewCodec() Codec {
return &codecCache{}
}

func (l *codecCache) Encoder(len int) (reedsolomon.Encoder, error) {
enc, ok := l.cache.Load(len)
if !ok {
var err error
enc, err = reedsolomon.New(len/2, len/2, reedsolomon.WithLeopardGF(true))
if err != nil {
return nil, err
}
l.cache.Store(len, enc)
}
return enc.(reedsolomon.Encoder), nil
}
83 changes: 83 additions & 0 deletions store/file/codec_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package file

import (
"fmt"
"testing"

"github.com/klauspost/reedsolomon"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/share/sharetest"
)

func BenchmarkCodec(b *testing.B) {
minSize, maxSize := 32, 128

for size := minSize; size <= maxSize; size *= 2 {
// BenchmarkCodec/Leopard/size:32-10 409194 2793 ns/op
// BenchmarkCodec/Leopard/size:64-10 190969 6170 ns/op
// BenchmarkCodec/Leopard/size:128-10 82821 14287 ns/op
b.Run(fmt.Sprintf("Leopard/size:%v", size), func(b *testing.B) {
enc, err := reedsolomon.New(size/2, size/2, reedsolomon.WithLeopardGF(true))
require.NoError(b, err)

shards := newShards(b, size, true)

b.ResetTimer()
for i := 0; i < b.N; i++ {
err = enc.Encode(shards)
require.NoError(b, err)
}
})

// BenchmarkCodec/default/size:32-10 222153 5364 ns/op
// BenchmarkCodec/default/size:64-10 58831 20349 ns/op
// BenchmarkCodec/default/size:128-10 14940 80471 ns/op
b.Run(fmt.Sprintf("default/size:%v", size), func(b *testing.B) {
enc, err := reedsolomon.New(size/2, size/2, reedsolomon.WithLeopardGF(false))
require.NoError(b, err)

shards := newShards(b, size, true)

b.ResetTimer()
for i := 0; i < b.N; i++ {
err = enc.Encode(shards)
require.NoError(b, err)
}
})

// BenchmarkCodec/default-reconstructSome/size:32-10 1263585 954.4 ns/op
// BenchmarkCodec/default-reconstructSome/size:64-10 762273 1554 ns/op
// BenchmarkCodec/default-reconstructSome/size:128-10 429268 2974 ns/op
b.Run(fmt.Sprintf("default-reconstructSome/size:%v", size), func(b *testing.B) {
enc, err := reedsolomon.New(size/2, size/2, reedsolomon.WithLeopardGF(false))
require.NoError(b, err)

shards := newShards(b, size, false)
targets := make([]bool, size)
target := size - 2
targets[target] = true

b.ResetTimer()
for i := 0; i < b.N; i++ {
err = enc.ReconstructSome(shards, targets)
require.NoError(b, err)
shards[target] = nil
}
})
}
}

func newShards(b require.TestingT, size int, fillParity bool) [][]byte {
shards := make([][]byte, size)
original := sharetest.RandShares(b, size/2)
copy(shards, original)

if fillParity {
// fill with parity empty Shares
for j := len(original); j < len(shards); j++ {
shards[j] = make([]byte, len(original[0]))
}
}
return shards
}
98 changes: 98 additions & 0 deletions store/file/header.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package file

import (
"encoding/binary"
"fmt"
"io"

"github.com/celestiaorg/celestia-node/share"
)

type headerVersion uint8

const (
headerVersionV0 headerVersion = 1
headerVOSize int = 40
)

type headerV0 struct {
fileVersion fileVersion
fileType fileType

// Taken directly from EDS
shareSize uint16
squareSize uint16

datahash share.DataHash
}

type fileVersion uint8

const (
fileV0 fileVersion = iota + 1
)

type fileType uint8

const (
ods fileType = iota
q1q4
)

func readHeader(r io.Reader) (*headerV0, error) {
// read first byte to determine the fileVersion
var version headerVersion
if err := binary.Read(r, binary.LittleEndian, &version); err != nil {
return nil, fmt.Errorf("readHeader: %w", err)
}

switch version { //nolint:gocritic // gocritic wants to convert it to if statement.
case headerVersionV0:
h := &headerV0{}
_, err := h.ReadFrom(r)
return h, err
}
return nil, fmt.Errorf("unsupported header fileVersion: %d", version)
}

func writeHeader(w io.Writer, h *headerV0) error {
n, err := w.Write([]byte{byte(headerVersionV0)})
if err != nil {
return fmt.Errorf("writeHeader: %w", err)
}
if n != 1 {
return fmt.Errorf("writeHeader: wrote %d bytes, expected 1", n)
}
_, err = h.WriteTo(w)
return err
}

func (h *headerV0) Size() int {
return headerVOSize + 1
}

func (h *headerV0) WriteTo(w io.Writer) (int64, error) {
buf := make([]byte, headerVOSize)
buf[0] = byte(h.fileVersion)
buf[1] = byte(h.fileType)
binary.LittleEndian.PutUint16(buf[4:6], h.shareSize)
binary.LittleEndian.PutUint16(buf[6:8], h.squareSize)
copy(buf[8:40], h.datahash)
n, err := w.Write(buf)
return int64(n), err
}

func (h *headerV0) ReadFrom(r io.Reader) (int64, error) {
bytesHeader := make([]byte, headerVOSize)
n, err := io.ReadFull(r, bytesHeader)
if n != headerVOSize {
return 0, fmt.Errorf("readHeader: read %d bytes, expected %d", len(bytesHeader), headerVOSize)
}

h.fileVersion = fileVersion(bytesHeader[0])
h.fileType = fileType(bytesHeader[1])
h.shareSize = binary.LittleEndian.Uint16(bytesHeader[4:6])
h.squareSize = binary.LittleEndian.Uint16(bytesHeader[6:8])
h.datahash = bytesHeader[8:40]
return int64(headerVOSize), err
}
Loading
Loading