Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(shwap): Optimize ods file allocations #3512

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,14 @@ PB_CORE=$(shell go list -f {{.Dir}} -m github.com/tendermint/tendermint)
PB_GOGO=$(shell go list -f {{.Dir}} -m github.com/gogo/protobuf)
PB_CELESTIA_APP=$(shell go list -f {{.Dir}} -m github.com/celestiaorg/celestia-app)
PB_NMT=$(shell go list -f {{.Dir}} -m github.com/celestiaorg/nmt)
PB_NODE=$(shell pwd)

## pb-gen: Generate protobuf code for all /pb/*.proto files in the project.
pb-gen:
@echo '--> Generating protobuf'
@for dir in $(PB_PKGS); \
do for file in `find $$dir -type f -name "*.proto"`; \
do protoc -I=. -I=${PB_CORE}/proto/ -I=${PB_GOGO} -I=${PB_CELESTIA_APP}/proto -I=${PB_NMT} --gogofaster_out=paths=source_relative:. $$file; \
do protoc -I=. -I=${PB_CORE}/proto/ -I=${PB_NODE} -I=${PB_GOGO} -I=${PB_CELESTIA_APP}/proto -I=${PB_NMT} --gogofaster_out=paths=source_relative:. $$file; \
echo '-->' $$file; \
done; \
done;
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
github.com/ipfs/go-ipld-format v0.6.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipld/go-car v0.6.2
github.com/klauspost/reedsolomon v1.12.1
github.com/libp2p/go-libp2p v0.33.2
github.com/libp2p/go-libp2p-kad-dht v0.25.2
github.com/libp2p/go-libp2p-pubsub v0.10.1
Expand Down Expand Up @@ -228,7 +229,6 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/compress v1.17.6 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/klauspost/reedsolomon v1.12.1 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/lib/pq v1.10.7 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
Expand Down
17 changes: 0 additions & 17 deletions share/availability.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,12 @@ import (
"context"
"errors"

"github.com/celestiaorg/celestia-app/pkg/da"
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/header"
)

// ErrNotAvailable is returned whenever DA sampling fails.
var ErrNotAvailable = errors.New("share: data not available")

// Root represents root commitment to multiple Shares.
// In practice, it is a commitment to all the Data in a square.
type Root = da.DataAvailabilityHeader

// NewRoot generates Root(DataAvailabilityHeader) using the
// provided extended data square.
func NewRoot(eds *rsmt2d.ExtendedDataSquare) (*Root, error) {
dah, err := da.NewDataAvailabilityHeader(eds)
if err != nil {
return nil, err
}
return &dah, nil
}

// Availability defines interface for validation of Shares' availability.
//
//go:generate mockgen -destination=availability/mocks/availability.go -package=mocks . Availability
Expand Down
4 changes: 2 additions & 2 deletions share/eds/byzantine/share_proof.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func (s *ShareWithProof) ShareWithProofToProto() *pb.Share {
}
}

// GetShareWithProof attempts to get a share with proof for the given share. It first tries to get a row proof
// and if that fails or proof is invalid, it tries to get a column proof.
// GetShareWithProof attempts to get a share with proof for the given share. It first tries to get
// a row proof and if that fails or proof is invalid, it tries to get a column proof.
func GetShareWithProof(
ctx context.Context,
bGetter blockservice.BlockGetter,
Expand Down
6 changes: 4 additions & 2 deletions share/eds/edstest/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ func RandEDS(t require.TestingT, size int) *rsmt2d.ExtendedDataSquare {
return eds
}

// RandEDSWithNamespace generates EDS with given square size. Returned EDS will have
// namespacedAmount of shares with the given namespace.
func RandEDSWithNamespace(
t require.TestingT,
namespace share.Namespace,
size int,
namespacedAmount, size int,
) (*rsmt2d.ExtendedDataSquare, *share.Root) {
shares := sharetest.RandSharesWithNamespace(t, namespace, size*size)
shares := sharetest.RandSharesWithNamespace(t, namespace, namespacedAmount, size*size)
eds, err := rsmt2d.ComputeExtendedDataSquare(shares, share.DefaultRSMT2DCodec(), wrapper.NewConstructor(uint64(size)))
require.NoError(t, err, "failure to recompute the extended data square")
dah, err := share.NewRoot(eds)
Expand Down
14 changes: 7 additions & 7 deletions share/eds/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,24 +121,24 @@ func CollectSharesByNamespace(
utils.SetStatusAndEnd(span, err)
}()

rootCIDs := ipld.FilterRootByNamespace(root, namespace)
if len(rootCIDs) == 0 {
rowIdxs := share.RowsWithNamespace(root, namespace)
if len(rowIdxs) == 0 {
return []share.NamespacedRow{}, nil
}

errGroup, ctx := errgroup.WithContext(ctx)
shares = make([]share.NamespacedRow, len(rootCIDs))
for i, rootCID := range rootCIDs {
shares = make([]share.NamespacedRow, len(rowIdxs))
for i, rowIdx := range rowIdxs {
// shadow loop variables, to ensure correct values are captured
i, rootCID := i, rootCID
rowIdx, rowRoot := rowIdx, root.RowRoots[rowIdx]
errGroup.Go(func() error {
row, proof, err := ipld.GetSharesByNamespace(ctx, bg, rootCID, namespace, len(root.RowRoots))
row, proof, err := ipld.GetSharesByNamespace(ctx, bg, rowRoot, namespace, len(root.RowRoots))
shares[i] = share.NamespacedRow{
Shares: row,
Proof: proof,
}
if err != nil {
return fmt.Errorf("retrieving shares by namespace %s for row %x: %w", namespace.String(), rootCID, err)
return fmt.Errorf("retrieving shares by namespace %s for row %d: %w", namespace.String(), rowIdx, err)
}
return nil
})
Expand Down
9 changes: 6 additions & 3 deletions share/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,19 @@ func (ns NamespacedShares) Verify(root *Root, namespace Namespace) error {
}

for i, row := range ns {
if row.Proof == nil && row.Shares == nil {
return fmt.Errorf("row verification failed: no proofs and shares")
}
// verify row data against row hash from original root
if !row.verify(originalRoots[i], namespace) {
if !row.Verify(originalRoots[i], namespace) {
return fmt.Errorf("row verification failed: row %d doesn't match original root: %s", i, root.String())
}
}
return nil
}

// verify validates the row using nmt inclusion proof.
func (row *NamespacedRow) verify(rowRoot []byte, namespace Namespace) bool {
// Verify validates the row using nmt inclusion proof.
func (row *NamespacedRow) Verify(rowRoot []byte, namespace Namespace) bool {
// construct nmt leaves from shares by prepending namespace
leaves := make([][]byte, 0, len(row.Shares))
for _, shr := range row.Shares {
Expand Down
3 changes: 2 additions & 1 deletion share/getters/shrex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ func TestShrexGetter(t *testing.T) {
t.Cleanup(cancel)

// generate test data
size := 64
namespace := sharetest.RandV0Namespace()
randEDS, dah := edstest.RandEDSWithNamespace(t, namespace, 64)
randEDS, dah := edstest.RandEDSWithNamespace(t, namespace, size*size, size)
eh := headertest.RandExtendedHeaderWithRoot(t, dah)
require.NoError(t, edsStore.Put(ctx, dah.Hash(), randEDS))
peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{
Expand Down
5 changes: 3 additions & 2 deletions share/ipld/get_shares.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ func GetShares(ctx context.Context, bg blockservice.BlockGetter, root cid.Cid, s
func GetSharesByNamespace(
ctx context.Context,
bGetter blockservice.BlockGetter,
root cid.Cid,
root []byte,
namespace share.Namespace,
maxShares int,
) ([]share.Share, *nmt.Proof, error) {
rootCid := MustCidFromNamespacedSha256(root)
data := NewNamespaceData(maxShares, namespace, WithLeaves(), WithProofs())
err := data.CollectLeavesByNamespace(ctx, bGetter, root)
err := data.CollectLeavesByNamespace(ctx, bGetter, rootCid)
if err != nil {
return nil, nil, err
}
Expand Down
10 changes: 4 additions & 6 deletions share/ipld/get_shares_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,7 @@ func TestGetSharesByNamespace(t *testing.T) {
rowRoots, err := eds.RowRoots()
require.NoError(t, err)
for _, row := range rowRoots {
rcid := MustCidFromNamespacedSha256(row)
rowShares, _, err := GetSharesByNamespace(ctx, bServ, rcid, namespace, len(rowRoots))
rowShares, _, err := GetSharesByNamespace(ctx, bServ, row, namespace, len(rowRoots))
if errors.Is(err, ErrNamespaceOutsideRange) {
continue
}
Expand Down Expand Up @@ -363,8 +362,7 @@ func TestGetSharesWithProofsByNamespace(t *testing.T) {
rowRoots, err := eds.RowRoots()
require.NoError(t, err)
for _, row := range rowRoots {
rcid := MustCidFromNamespacedSha256(row)
rowShares, proof, err := GetSharesByNamespace(ctx, bServ, rcid, namespace, len(rowRoots))
rowShares, proof, err := GetSharesByNamespace(ctx, bServ, row, namespace, len(rowRoots))
if namespace.IsOutsideRange(row, row) {
require.ErrorIs(t, err, ErrNamespaceOutsideRange)
continue
Expand All @@ -386,15 +384,15 @@ func TestGetSharesWithProofsByNamespace(t *testing.T) {
share.NewSHA256Hasher(),
namespace.ToNMT(),
leaves,
NamespacedSha256FromCID(rcid))
row)
require.True(t, verified)

// verify inclusion
verified = proof.VerifyInclusion(
share.NewSHA256Hasher(),
namespace.ToNMT(),
rowShares,
NamespacedSha256FromCID(rcid))
row)
require.True(t, verified)
}
}
Expand Down
48 changes: 48 additions & 0 deletions share/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package share

import (
"bytes"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"

appns "github.com/celestiaorg/celestia-app/pkg/namespace"
Expand Down Expand Up @@ -182,3 +184,49 @@ func (n Namespace) IsGreater(target Namespace) bool {
func (n Namespace) IsGreaterOrEqualThan(target Namespace) bool {
return bytes.Compare(n, target) > -1
}

// AddInt adds arbitrary int value to namespace, treating namespace as big-endian
// implementation of int
func (n Namespace) AddInt(val int) (Namespace, error) {
if val == 0 {
return n, nil
}
// Convert the input integer to a byte slice and add it to result slice
result := make([]byte, len(n))
if val > 0 {
binary.BigEndian.PutUint64(result[len(n)-8:], uint64(val))
} else {
binary.BigEndian.PutUint64(result[len(n)-8:], uint64(-val))
}

// Perform addition byte by byte
var carry int
for i := len(n) - 1; i >= 0; i-- {
var sum int
if val > 0 {
sum = int(n[i]) + int(result[i]) + carry
} else {
sum = int(n[i]) - int(result[i]) + carry
}

switch {
case sum > 255:
carry = 1
sum -= 256
case sum < 0:
carry = -1
sum += 256
default:
carry = 0
}

result[i] = uint8(sum)
}

// Handle any remaining carry
if carry != 0 {
return nil, errors.New("namespace overflow")
}

return result, nil
}
34 changes: 34 additions & 0 deletions share/new_eds/accessor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package eds

import (
"context"
"io"

"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/shwap"
)

// Accessor is an interface for accessing extended data square data.
type Accessor interface {
// Size returns square size of the Accessor.
Size(ctx context.Context) int
// Sample returns share and corresponding proof for row and column indices. Implementation can
// choose which axis to use for proof. Chosen axis for proof should be indicated in the returned
// Sample.
Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error)
// AxisHalf returns half of shares axis of the given type and index. Side is determined by
// implementation. Implementations should indicate the side in the returned AxisHalf.
AxisHalf(ctx context.Context, axisType rsmt2d.Axis, axisIdx int) (AxisHalf, error)
// RowNamespaceData returns data for the given namespace and row index.
RowNamespaceData(ctx context.Context, namespace share.Namespace, rowIdx int) (shwap.RowNamespaceData, error)
// Shares returns data (ODS) shares extracted from the Accessor.
Shares(ctx context.Context) ([]share.Share, error)
}

// AccessorCloser is an interface that groups Accessor and io.Closer interfaces.
type AccessorCloser interface {
Accessor
io.Closer
}
69 changes: 69 additions & 0 deletions share/new_eds/axis_half.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package eds

import (
"fmt"

"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/shwap"
)

// AxisHalf represents a half of data for a row or column in the EDS.
type AxisHalf struct {
Shares []share.Share
// IsParity indicates whether the half is parity or data.
IsParity bool
}

// ToRow converts the AxisHalf to a shwap.Row.
func (a AxisHalf) ToRow() shwap.Row {
side := shwap.Left
if a.IsParity {
side = shwap.Right
}
return shwap.NewRow(a.Shares, side)
}

// Extended returns full axis shares from half axis shares.
func (a AxisHalf) Extended() ([]share.Share, error) {
if a.IsParity {
return reconstructShares(a.Shares)
}
return extendShares(a.Shares)
}

// extendShares constructs full axis shares from original half axis shares.
func extendShares(original []share.Share) ([]share.Share, error) {
if len(original) == 0 {
return nil, fmt.Errorf("original shares are empty")
}

codec := share.DefaultRSMT2DCodec()
parity, err := codec.Encode(original)
if err != nil {
return nil, fmt.Errorf("encoding: %w", err)
}
shares := make([]share.Share, len(original)*2)
copy(shares, original)
copy(shares[len(original):], parity)
return shares, nil
}

// reconstructShares constructs full axis shares from parity half axis shares.
func reconstructShares(parity []share.Share) ([]share.Share, error) {
if len(parity) == 0 {
return nil, fmt.Errorf("parity shares are empty")
}

sqLen := len(parity) * 2
shares := make([]share.Share, sqLen)
for i := sqLen / 2; i < sqLen; i++ {
shares[i] = parity[i-sqLen/2]
}

codec := share.DefaultRSMT2DCodec()
shares, err := codec.Decode(shares)
if err != nil {
return nil, fmt.Errorf("reconstructing: %w", err)
}
return shares, nil
}
Loading
Loading