From 735ba7d4d9c09e8b05130e4a6fc0ac5729e95e1a Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Mon, 10 Jun 2024 11:52:28 +0500 Subject: [PATCH 01/13] add ods file --- go.mod | 2 +- share/new_eds/axis_half.go | 47 ++++++ share/new_eds/axis_half_test.go | 32 ++++ share/shwap/row_namespace_data.go | 10 +- share/shwap/sample.go | 22 +++ store/file/codec.go | 38 +++++ store/file/codec_test.go | 83 +++++++++ store/file/header.go | 94 +++++++++++ store/file/ods.go | 269 ++++++++++++++++++++++++++++++ store/file/ods_test.go | 146 ++++++++++++++++ store/file/square.go | 132 +++++++++++++++ 11 files changed, 871 insertions(+), 4 deletions(-) create mode 100644 share/new_eds/axis_half_test.go create mode 100644 store/file/codec.go create mode 100644 store/file/codec_test.go create mode 100644 store/file/header.go create mode 100644 store/file/ods.go create mode 100644 store/file/ods_test.go create mode 100644 store/file/square.go diff --git a/go.mod b/go.mod index d852a48c61..a3b2525e82 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,7 @@ require ( github.com/ipfs/go-ipld-format v0.6.0 github.com/ipfs/go-log/v2 v2.5.1 github.com/ipld/go-car v0.6.2 + github.com/klauspost/reedsolomon v1.12.1 github.com/libp2p/go-libp2p v0.33.2 github.com/libp2p/go-libp2p-kad-dht v0.25.2 github.com/libp2p/go-libp2p-pubsub v0.10.1 @@ -228,7 +229,6 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/klauspost/compress v1.17.6 // indirect github.com/klauspost/cpuid/v2 v2.2.7 // indirect - github.com/klauspost/reedsolomon v1.12.1 // indirect github.com/koron/go-ssdp v0.0.4 // indirect github.com/lib/pq v1.10.7 // indirect github.com/libp2p/go-buffer-pool v0.1.0 // indirect diff --git a/share/new_eds/axis_half.go b/share/new_eds/axis_half.go index 17b29de591..dede70ebbc 100644 --- a/share/new_eds/axis_half.go +++ b/share/new_eds/axis_half.go @@ -1,6 +1,8 @@ package eds import ( + "fmt" + "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/shwap" ) @@ -20,3 +22,48 @@ func (a AxisHalf) ToRow() shwap.Row { } return shwap.NewRow(a.Shares, side) } + +// Extended returns full axis shares from half axis shares. +func (a AxisHalf) Extended() ([]share.Share, error) { + if a.IsParity { + return reconstructShares(a.Shares) + } + return extendShares(a.Shares) +} + +// extendShares constructs full axis shares from original half axis shares. +func extendShares(original []share.Share) ([]share.Share, error) { + if len(original) == 0 { + return nil, fmt.Errorf("original shares are empty") + } + + codec := share.DefaultRSMT2DCodec() + parity, err := codec.Encode(original) + if err != nil { + return nil, fmt.Errorf("encoding: %w", err) + } + shares := make([]share.Share, len(original)*2) + copy(shares, original) + copy(shares[len(original):], parity) + return shares, nil +} + +// reconstructShares constructs full axis shares from parity half axis shares. +func reconstructShares(parity []share.Share) ([]share.Share, error) { + if len(parity) == 0 { + return nil, fmt.Errorf("parity shares are empty") + } + + sqLen := len(parity) * 2 + shares := make([]share.Share, sqLen) + for i := sqLen / 2; i < sqLen; i++ { + shares[i] = parity[i-sqLen/2] + } + + codec := share.DefaultRSMT2DCodec() + shares, err := codec.Decode(shares) + if err != nil { + return nil, fmt.Errorf("reconstructing: %w", err) + } + return shares, nil +} diff --git a/share/new_eds/axis_half_test.go b/share/new_eds/axis_half_test.go new file mode 100644 index 0000000000..752add5acd --- /dev/null +++ b/share/new_eds/axis_half_test.go @@ -0,0 +1,32 @@ +package eds + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/celestiaorg/celestia-node/share/sharetest" +) + +func TestExtendAxisHalf(t *testing.T) { + shares := sharetest.RandShares(t, 16) + + original := AxisHalf{ + Shares: shares, + IsParity: false, + } + + extended, err := original.Extended() + require.NoError(t, err) + require.Len(t, extended, len(shares)*2) + + parity := AxisHalf{ + Shares: extended[len(shares):], + IsParity: true, + } + + parityExtended, err := parity.Extended() + require.NoError(t, err) + + require.Equal(t, extended, parityExtended) +} diff --git a/share/shwap/row_namespace_data.go b/share/shwap/row_namespace_data.go index fe7bd6da36..5d424ee0f3 100644 --- a/share/shwap/row_namespace_data.go +++ b/share/shwap/row_namespace_data.go @@ -164,10 +164,14 @@ func (rnd RowNamespaceData) Validate(dah *share.Root, namespace share.Namespace, // verifyInclusion checks the inclusion of the row's shares in the provided root using NMT. func (rnd RowNamespaceData) verifyInclusion(rowRoot []byte, namespace share.Namespace) bool { leaves := make([][]byte, 0, len(rnd.Shares)) - for _, shr := range rnd.Shares { - namespaceBytes := share.GetNamespace(shr) - leaves = append(leaves, append(namespaceBytes, shr...)) + for _, sh := range rnd.Shares { + namespaceBytes := share.GetNamespace(sh) + leave := make([]byte, len(sh)+len(namespaceBytes)) + copy(leave, namespaceBytes) + copy(leave[len(namespaceBytes):], sh) + leaves = append(leaves, leave) } + return rnd.Proof.VerifyNamespace( share.NewSHA256Hasher(), namespace.ToNMT(), diff --git a/share/shwap/sample.go b/share/shwap/sample.go index c521410e35..a58a41910e 100644 --- a/share/shwap/sample.go +++ b/share/shwap/sample.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" + "github.com/celestiaorg/celestia-app/pkg/wrapper" "github.com/celestiaorg/nmt" nmt_pb "github.com/celestiaorg/nmt/pb" "github.com/celestiaorg/rsmt2d" @@ -24,6 +25,27 @@ type Sample struct { ProofType rsmt2d.Axis // ProofType indicates whether the proof is against a row or a column. } +func SampleFromShares(shares []share.Share, proofType rsmt2d.Axis, axisIdx, shrIdx int) (Sample, error) { + tree := wrapper.NewErasuredNamespacedMerkleTree(uint64(len(shares)/2), uint(axisIdx)) + for _, shr := range shares { + err := tree.Push(shr) + if err != nil { + return Sample{}, err + } + } + + proof, err := tree.ProveRange(shrIdx, shrIdx+1) + if err != nil { + return Sample{}, err + } + + return Sample{ + Share: shares[shrIdx], + Proof: &proof, + ProofType: proofType, + }, nil +} + // SampleFromProto converts a protobuf Sample back into its domain model equivalent. func SampleFromProto(s *pb.Sample) Sample { proof := nmt.NewInclusionProof( diff --git a/store/file/codec.go b/store/file/codec.go new file mode 100644 index 0000000000..a27280be11 --- /dev/null +++ b/store/file/codec.go @@ -0,0 +1,38 @@ +package file + +import ( + "sync" + + "github.com/klauspost/reedsolomon" +) + +var codec Codec + +func init() { + codec = NewCodec() +} + +type Codec interface { + Encoder(len int) (reedsolomon.Encoder, error) +} + +type codecCache struct { + cache sync.Map +} + +func NewCodec() Codec { + return &codecCache{} +} + +func (l *codecCache) Encoder(len int) (reedsolomon.Encoder, error) { + enc, ok := l.cache.Load(len) + if !ok { + var err error + enc, err = reedsolomon.New(len/2, len/2, reedsolomon.WithLeopardGF(true)) + if err != nil { + return nil, err + } + l.cache.Store(len, enc) + } + return enc.(reedsolomon.Encoder), nil +} diff --git a/store/file/codec_test.go b/store/file/codec_test.go new file mode 100644 index 0000000000..d6fdbb3045 --- /dev/null +++ b/store/file/codec_test.go @@ -0,0 +1,83 @@ +package file + +import ( + "fmt" + "testing" + + "github.com/klauspost/reedsolomon" + "github.com/stretchr/testify/require" + + "github.com/celestiaorg/celestia-node/share/sharetest" +) + +func BenchmarkCodec(b *testing.B) { + minSize, maxSize := 32, 128 + + for size := minSize; size <= maxSize; size *= 2 { + // BenchmarkCodec/Leopard/size:32-10 409194 2793 ns/op + // BenchmarkCodec/Leopard/size:64-10 190969 6170 ns/op + // BenchmarkCodec/Leopard/size:128-10 82821 14287 ns/op + b.Run(fmt.Sprintf("Leopard/size:%v", size), func(b *testing.B) { + enc, err := reedsolomon.New(size/2, size/2, reedsolomon.WithLeopardGF(true)) + require.NoError(b, err) + + shards := newShards(b, size, true) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + err = enc.Encode(shards) + require.NoError(b, err) + } + }) + + // BenchmarkCodec/default/size:32-10 222153 5364 ns/op + // BenchmarkCodec/default/size:64-10 58831 20349 ns/op + // BenchmarkCodec/default/size:128-10 14940 80471 ns/op + b.Run(fmt.Sprintf("default/size:%v", size), func(b *testing.B) { + enc, err := reedsolomon.New(size/2, size/2, reedsolomon.WithLeopardGF(false)) + require.NoError(b, err) + + shards := newShards(b, size, true) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + err = enc.Encode(shards) + require.NoError(b, err) + } + }) + + // BenchmarkCodec/default-reconstructSome/size:32-10 1263585 954.4 ns/op + // BenchmarkCodec/default-reconstructSome/size:64-10 762273 1554 ns/op + // BenchmarkCodec/default-reconstructSome/size:128-10 429268 2974 ns/op + b.Run(fmt.Sprintf("default-reconstructSome/size:%v", size), func(b *testing.B) { + enc, err := reedsolomon.New(size/2, size/2, reedsolomon.WithLeopardGF(false)) + require.NoError(b, err) + + shards := newShards(b, size, false) + targets := make([]bool, size) + target := size - 2 + targets[target] = true + + b.ResetTimer() + for i := 0; i < b.N; i++ { + err = enc.ReconstructSome(shards, targets) + require.NoError(b, err) + shards[target] = nil + } + }) + } +} + +func newShards(b require.TestingT, size int, fillParity bool) [][]byte { + shards := make([][]byte, size) + original := sharetest.RandShares(b, size/2) + copy(shards, original) + + if fillParity { + // fill with parity empty Shares + for j := len(original); j < len(shards); j++ { + shards[j] = make([]byte, len(original[0])) + } + } + return shards +} diff --git a/store/file/header.go b/store/file/header.go new file mode 100644 index 0000000000..d7d0a7760a --- /dev/null +++ b/store/file/header.go @@ -0,0 +1,94 @@ +package file + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + + "github.com/celestiaorg/celestia-node/share" +) + +const headerSize = 64 + +type header struct { + version fileVersion + fileType fileType + + // Taken directly from EDS + shareSize uint16 + squareSize uint16 + + datahash share.DataHash +} + +type fileVersion uint8 + +const ( + fileV0 fileVersion = iota +) + +type fileType uint8 + +const ( + ods fileType = iota + q1q4 +) + +func (h *header) WriteTo(w io.Writer) (int64, error) { + b := bytes.NewBuffer(make([]byte, 0, headerSize)) + _ = b.WriteByte(byte(h.version)) + _ = b.WriteByte(byte(h.fileType)) + _ = binary.Write(b, binary.LittleEndian, h.shareSize) + _ = binary.Write(b, binary.LittleEndian, h.squareSize) + _, _ = b.Write(h.datahash) + // write padding + _, _ = b.Write(make([]byte, headerSize-b.Len()-1)) + return writeLenEncoded(w, b.Bytes()) +} + +func readHeader(r io.Reader) (*header, error) { + bytesHeader, err := readLenEncoded(r) + if err != nil { + return nil, err + } + if len(bytesHeader) != headerSize-1 { + return nil, fmt.Errorf("readHeader: read %d bytes, expected %d", len(bytesHeader), headerSize) + } + h := &header{ + version: fileVersion(bytesHeader[0]), + fileType: fileType(bytesHeader[1]), + shareSize: binary.LittleEndian.Uint16(bytesHeader[2:4]), + squareSize: binary.LittleEndian.Uint16(bytesHeader[4:6]), + datahash: make([]byte, 32), + } + + copy(h.datahash, bytesHeader[6:6+32]) + return h, err +} + +func writeLenEncoded(w io.Writer, data []byte) (int64, error) { + _, err := w.Write([]byte{byte(len(data))}) + if err != nil { + return 0, err + } + return io.Copy(w, bytes.NewBuffer(data)) +} + +func readLenEncoded(r io.Reader) ([]byte, error) { + lenBuf := make([]byte, 1) + _, err := io.ReadFull(r, lenBuf) + if err != nil { + return nil, err + } + + data := make([]byte, lenBuf[0]) + n, err := io.ReadFull(r, data) + if err != nil { + return nil, err + } + if n != len(data) { + return nil, fmt.Errorf("readLenEncoded: read %d bytes, expected %d", n, len(data)) + } + return data, nil +} diff --git a/store/file/ods.go b/store/file/ods.go new file mode 100644 index 0000000000..8ae71708ba --- /dev/null +++ b/store/file/ods.go @@ -0,0 +1,269 @@ +package file + +import ( + "context" + "fmt" + "io" + "os" + "sync" + + "github.com/celestiaorg/rsmt2d" + + "github.com/celestiaorg/celestia-node/share" + eds "github.com/celestiaorg/celestia-node/share/new_eds" + "github.com/celestiaorg/celestia-node/share/shwap" +) + +var _ eds.AccessorCloser = (*OdsFile)(nil) + +type OdsFile struct { + path string + hdr *header + fl *os.File + + lock sync.RWMutex + ods square +} + +// OpenOdsFile opens an existing file. File has to be closed after usage. +func OpenOdsFile(path string) (*OdsFile, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + + h, err := readHeader(f) + if err != nil { + return nil, err + } + + return &OdsFile{ + path: path, + hdr: h, + fl: f, + }, nil +} + +// CreateOdsFile creates a new file. File has to be closed after usage. +func CreateOdsFile( + path string, + datahash share.DataHash, + eds *rsmt2d.ExtendedDataSquare, +) (*OdsFile, error) { + f, err := os.Create(path) + if err != nil { + return nil, fmt.Errorf("file create: %w", err) + } + + h := &header{ + version: fileV0, + fileType: ods, + shareSize: share.Size, // TODO: rsmt2d should expose this field + squareSize: uint16(eds.Width()), + datahash: datahash, + } + + err = writeOdsFile(f, h, eds) + if err != nil { + return nil, fmt.Errorf("writing ODS file: %w", err) + } + + // TODO: fill ods field with data from eds + return &OdsFile{ + path: path, + fl: f, + hdr: h, + }, f.Sync() +} + +func writeOdsFile(w io.Writer, h *header, eds *rsmt2d.ExtendedDataSquare) error { + _, err := h.WriteTo(w) + if err != nil { + return err + } + + for _, shr := range eds.FlattenedODS() { + if _, err := w.Write(shr); err != nil { + return err + } + } + return nil +} + +// Size returns square size of the Accessor. +func (f *OdsFile) Size(context.Context) int { + return f.size() +} + +func (f *OdsFile) size() int { + return int(f.hdr.squareSize) +} + +// Close closes the file. +func (f *OdsFile) Close() error { + return f.fl.Close() +} + +// Sample returns share and corresponding proof for row and column indices. Implementation can +// choose which axis to use for proof. Chosen axis for proof should be indicated in the returned +// Sample. +func (f *OdsFile) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error) { + // Sample proof axis is selected to optimize read performance. + // - For the first and second quadrants, we read the row axis because it is more efficient to read + // single row than reading full ods to calculate single column + // - For the third quadrants, we read the column axis because it is more efficient to read single + // column than reading full ods to calculate single row + // - For the fourth quadrant, it does not matter which axis we read because we need to read full ods + // to calculate the sample + axisType, axisIdx, shrIdx := rsmt2d.Row, rowIdx, colIdx + if colIdx < f.size()/2 && rowIdx >= f.size()/2 { + axisType, axisIdx, shrIdx = rsmt2d.Col, colIdx, rowIdx + } + + axis, err := f.axis(ctx, axisType, axisIdx) + if err != nil { + return shwap.Sample{}, fmt.Errorf("reading axis: %w", err) + } + + return shwap.SampleFromShares(axis, axisType, axisIdx, shrIdx) +} + +// AxisHalf returns half of shares axis of the given type and index. Side is determined by +// implementation. Implementations should indicate the side in the returned AxisHalf. +func (f *OdsFile) AxisHalf(ctx context.Context, axisType rsmt2d.Axis, axisIdx int) (eds.AxisHalf, error) { + // read axis from file if axisis row and from top half of the square or if axis is column and from + // left half of the square + if axisIdx < f.size()/2 { + shares, err := f.readAxisHalf(axisType, axisIdx) + if err != nil { + return eds.AxisHalf{}, fmt.Errorf("reading axis half: %w", err) + } + return eds.AxisHalf{ + Shares: shares, + IsParity: false, + }, nil + } + + // if axis is from the second half of the square, read full ods and compute the axis half + err := f.readOds() + if err != nil { + return eds.AxisHalf{}, err + } + + shares, err := f.ods.computeAxisHalf(ctx, axisType, axisIdx) + if err != nil { + return eds.AxisHalf{}, fmt.Errorf("computing axis half: %w", err) + } + return eds.AxisHalf{ + Shares: shares, + IsParity: false, + }, nil +} + +// RowNamespaceData returns data for the given namespace and row index. +func (f *OdsFile) RowNamespaceData( + ctx context.Context, + namespace share.Namespace, + rowIdx int, +) (shwap.RowNamespaceData, error) { + shares, err := f.axis(ctx, rsmt2d.Row, rowIdx) + if err != nil { + return shwap.RowNamespaceData{}, err + } + return shwap.RowNamespaceDataFromShares(shares, namespace, rowIdx) +} + +// Shares returns data shares extracted from the Accessor. +func (f *OdsFile) Shares(context.Context) ([]share.Share, error) { + err := f.readOds() + if err != nil { + return nil, err + } + return f.ods.shares() +} + +func (f *OdsFile) readAxisHalf(axisType rsmt2d.Axis, axisIdx int) ([]share.Share, error) { + f.lock.RLock() + ods := f.ods + f.lock.RUnlock() + if ods != nil { + return f.ods.axisHalf(context.Background(), axisType, axisIdx) + } + + switch axisType { + case rsmt2d.Col: + return f.readCol(axisIdx, 0) + case rsmt2d.Row: + return f.readRow(axisIdx) + } + return nil, fmt.Errorf("unknown axis") +} + +func (f *OdsFile) readOds() error { + f.lock.Lock() + defer f.lock.Unlock() + if f.ods != nil { + return nil + } + + // reset file pointer to the beginning of the file shares data + _, err := f.fl.Seek(headerSize, io.SeekStart) + if err != nil { + return fmt.Errorf("discarding header: %w", err) + } + + square, err := readSquare(f.fl, share.Size, f.size()) + if err != nil { + return fmt.Errorf("reading ods: %w", err) + } + f.ods = square + return nil +} + +func (f *OdsFile) readRow(idx int) ([]share.Share, error) { + shrLn := int(f.hdr.shareSize) + odsLn := int(f.hdr.squareSize) / 2 + + shares := make([]share.Share, odsLn) + + pos := idx * odsLn + offset := pos*shrLn + headerSize + + axsData := make([]byte, odsLn*shrLn) + if _, err := f.fl.ReadAt(axsData, int64(offset)); err != nil { + return nil, err + } + + for i := range shares { + shares[i] = axsData[i*shrLn : (i+1)*shrLn] + } + return shares, nil +} + +func (f *OdsFile) readCol(axisIdx, quadrantIdx int) ([]share.Share, error) { + shrLn := int(f.hdr.shareSize) + odsLn := int(f.hdr.squareSize) / 2 + quadrantOffset := quadrantIdx * odsLn * odsLn * shrLn + + shares := make([]share.Share, odsLn) + for i := range shares { + pos := axisIdx + i*odsLn + offset := pos*shrLn + headerSize + quadrantOffset + + shr := make(share.Share, shrLn) + if _, err := f.fl.ReadAt(shr, int64(offset)); err != nil { + return nil, err + } + shares[i] = shr + } + return shares, nil +} + +func (f *OdsFile) axis(ctx context.Context, axisType rsmt2d.Axis, axisIdx int) ([]share.Share, error) { + half, err := f.AxisHalf(ctx, axisType, axisIdx) + if err != nil { + return nil, err + } + + return half.Extended() +} diff --git a/store/file/ods_test.go b/store/file/ods_test.go new file mode 100644 index 0000000000..eb8731e13a --- /dev/null +++ b/store/file/ods_test.go @@ -0,0 +1,146 @@ +package file + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/libs/rand" + + "github.com/celestiaorg/rsmt2d" + + "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/eds/edstest" + eds "github.com/celestiaorg/celestia-node/share/new_eds" +) + +func TestCreateOdsFile(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + + path := t.TempDir() + "/testfile" + edsIn := edstest.RandEDS(t, 8) + datahash := share.DataHash(rand.Bytes(32)) + f, err := CreateOdsFile(path, datahash, edsIn) + require.NoError(t, err) + + shares, err := f.Shares(ctx) + require.NoError(t, err) + expected := edsIn.FlattenedODS() + require.Equal(t, expected, shares) + require.Equal(t, datahash, f.hdr.datahash) + require.NoError(t, f.Close()) + + f, err = OpenOdsFile(path) + require.NoError(t, err) + shares, err = f.Shares(ctx) + require.NoError(t, err) + require.Equal(t, expected, shares) + require.Equal(t, datahash, f.hdr.datahash) + require.NoError(t, f.Close()) +} + +func TestReadOdsFromFile(t *testing.T) { + eds := edstest.RandEDS(t, 8) + path := t.TempDir() + "/testfile" + f, err := CreateOdsFile(path, []byte{}, eds) + require.NoError(t, err) + + err = f.readOds() + require.NoError(t, err) + for i, row := range f.ods { + original := eds.Row(uint(i))[:eds.Width()/2] + require.True(t, len(original) == len(row)) + require.Equal(t, original, row) + } +} + +func TestOdsFile(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + + odsSize := 8 + createOdsFile := func(eds *rsmt2d.ExtendedDataSquare) eds.Accessor { + path := t.TempDir() + "/testfile" + fl, err := CreateOdsFile(path, []byte{}, eds) + require.NoError(t, err) + return fl + } + + t.Run("Sample", func(t *testing.T) { + eds.TestAccessorSample(ctx, t, createOdsFile, odsSize) + }) + + t.Run("AxisHalf", func(t *testing.T) { + eds.TestAccessorAxisHalf(ctx, t, createOdsFile, odsSize) + }) + + t.Run("RowNamespaceData", func(t *testing.T) { + eds.TestAccessorRowNamespaceData(ctx, t, createOdsFile, odsSize) + }) + + t.Run("Shares", func(t *testing.T) { + eds.TestAccessorShares(ctx, t, createOdsFile, odsSize) + }) +} + +// ReconstructSome, default codec +// BenchmarkAxisFromOdsFile/Size:32/Axis:row/squareHalf:first(original)-10 455848 2588 ns/op +// BenchmarkAxisFromOdsFile/Size:32/Axis:row/squareHalf:second(extended)-10 9015 203950 ns/op +// BenchmarkAxisFromOdsFile/Size:32/Axis:col/squareHalf:first(original)-10 52734 21178 ns/op +// BenchmarkAxisFromOdsFile/Size:32/Axis:col/squareHalf:second(extended)-10 8830 127452 ns/op +// BenchmarkAxisFromOdsFile/Size:64/Axis:row/squareHalf:first(original)-10 303834 4763 ns/op +// BenchmarkAxisFromOdsFile/Size:64/Axis:row/squareHalf:second(extended)-10 2940 426246 ns/op +// BenchmarkAxisFromOdsFile/Size:64/Axis:col/squareHalf:first(original)-10 27758 42842 ns/op +// BenchmarkAxisFromOdsFile/Size:64/Axis:col/squareHalf:second(extended)-10 3385 353868 ns/op +// BenchmarkAxisFromOdsFile/Size:128/Axis:row/squareHalf:first(original)-10 172086 6455 ns/op +// BenchmarkAxisFromOdsFile/Size:128/Axis:row/squareHalf:second(extended)-10 672 1550386 ns/op +// BenchmarkAxisFromOdsFile/Size:128/Axis:col/squareHalf:first(original)-10 14202 84316 ns/op +// BenchmarkAxisFromOdsFile/Size:128/Axis:col/squareHalf:second(extended)-10 978 1230980 ns/op +func BenchmarkAxisFromOdsFile(b *testing.B) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + b.Cleanup(cancel) + + minSize, maxSize := 32, 128 + dir := b.TempDir() + + newFile := func(size int) eds.Accessor { + eds := edstest.RandEDS(b, size) + path := dir + "/testfile" + f, err := CreateOdsFile(path, []byte{}, eds) + require.NoError(b, err) + return f + } + eds.BenchGetHalfAxisFromAccessor(ctx, b, newFile, minSize, maxSize) +} + +// BenchmarkShareFromOdsFile/Size:32/Axis:row/squareHalf:first(original)-10 10339 111328 ns/op +// BenchmarkShareFromOdsFile/Size:32/Axis:row/squareHalf:second(extended)-10 3392 359180 ns/op +// BenchmarkShareFromOdsFile/Size:32/Axis:col/squareHalf:first(original)-10 8925 131352 ns/op +// BenchmarkShareFromOdsFile/Size:32/Axis:col/squareHalf:second(extended)-10 3447 346218 ns/op +// BenchmarkShareFromOdsFile/Size:64/Axis:row/squareHalf:first(original)-10 5503 215833 ns/op +// BenchmarkShareFromOdsFile/Size:64/Axis:row/squareHalf:second(extended)-10 1231 1001053 ns/op +// BenchmarkShareFromOdsFile/Size:64/Axis:col/squareHalf:first(original)-10 4711 250001 ns/op +// BenchmarkShareFromOdsFile/Size:64/Axis:col/squareHalf:second(extended)-10 1315 910079 ns/op +// BenchmarkShareFromOdsFile/Size:128/Axis:row/squareHalf:first(original)-10 2364 435748 ns/op +// BenchmarkShareFromOdsFile/Size:128/Axis:row/squareHalf:second(extended)-10 358 3330620 ns/op +// BenchmarkShareFromOdsFile/Size:128/Axis:col/squareHalf:first(original)-10 2114 514642 ns/op +// BenchmarkShareFromOdsFile/Size:128/Axis:col/squareHalf:second(extended)-10 373 3068104 ns/op +func BenchmarkShareFromOdsFile(b *testing.B) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + b.Cleanup(cancel) + + minSize, maxSize := 32, 128 + dir := b.TempDir() + + newFile := func(size int) eds.Accessor { + eds := edstest.RandEDS(b, size) + path := dir + "/testfile" + f, err := CreateOdsFile(path, []byte{}, eds) + require.NoError(b, err) + return f + } + + eds.BenchGetSampleFromAccessor(ctx, b, newFile, minSize, maxSize) +} diff --git a/store/file/square.go b/store/file/square.go new file mode 100644 index 0000000000..f4bc7e8416 --- /dev/null +++ b/store/file/square.go @@ -0,0 +1,132 @@ +package file + +import ( + "bufio" + "context" + "fmt" + "io" + + "golang.org/x/sync/errgroup" + + "github.com/celestiaorg/rsmt2d" + + "github.com/celestiaorg/celestia-node/share" +) + +type square [][]share.Share + +// readSquare reads Shares from the reader and returns a square. It assumes that the reader is +// positioned at the beginning of the Shares. It knows the size of the Shares and the size of the +// square, so reads from reader are limited to exactly the amount of data required. +func readSquare(r io.Reader, shareSize, edsSize int) (square, error) { + odsLn := edsSize / 2 + + square := make(square, odsLn) + for i := range square { + square[i] = make([]share.Share, odsLn) + for j := range square[i] { + square[i][j] = make(share.Share, shareSize) + } + } + + // TODO(@walldiss): run benchmark to find optimal size for this buffer + br := bufio.NewReaderSize(r, 4096) + var total int + for i := 0; i < odsLn; i++ { + for j := 0; j < odsLn; j++ { + n, err := io.ReadFull(br, square[i][j]) + if err != nil { + return nil, fmt.Errorf("reading share: %w, bytes read: %v", err, total+n) + } + if n != shareSize { + return nil, fmt.Errorf("share size mismatch: expected %v, got %v", shareSize, n) + } + total += n + } + } + return square, nil +} + +func (s square) size() int { + return len(s) +} + +func (s square) axisHalf(_ context.Context, axisType rsmt2d.Axis, axisIdx int) ([]share.Share, error) { + if s == nil { + return nil, fmt.Errorf("square is nil") + } + + if axisIdx >= s.size() { + return nil, fmt.Errorf("index is out of square bounds") + } + + // square stores rows directly in high level slice, so we can return by accessing row by index + if axisType == rsmt2d.Row { + return s[axisIdx], nil + } + + // construct half column from row ordered square + col := make([]share.Share, s.size()) + for i := 0; i < s.size(); i++ { + col[i] = s[i][axisIdx] + } + return col, nil +} + +func (s square) shares() ([]share.Share, error) { + shares := make([]share.Share, 0, s.size()*s.size()) + for _, row := range s { + shares = append(shares, row...) + } + return shares, nil +} + +func (s square) computeAxisHalf( + ctx context.Context, + axisType rsmt2d.Axis, + axisIdx int, +) ([]share.Share, error) { + shares := make([]share.Share, s.size()) + + // extend opposite half of the square while collecting Shares for the first half of required axis + g, ctx := errgroup.WithContext(ctx) + opposite := oppositeAxis(axisType) + for i := 0; i < s.size(); i++ { + i := i + g.Go(func() error { + original, err := s.axisHalf(ctx, opposite, i) + if err != nil { + return err + } + + enc, err := codec.Encoder(s.size() * 2) + if err != nil { + return fmt.Errorf("encoder: %w", err) + } + + shards := make([][]byte, s.size()*2) + copy(shards, original) + + target := make([]bool, s.size()*2) + target[axisIdx] = true + + err = enc.ReconstructSome(shards, target) + if err != nil { + return fmt.Errorf("reconstruct some: %w", err) + } + + shares[i] = shards[axisIdx] + return nil + }) + } + + err := g.Wait() + return shares, err +} + +func oppositeAxis(axis rsmt2d.Axis) rsmt2d.Axis { + if axis == rsmt2d.Col { + return rsmt2d.Row + } + return rsmt2d.Col +} From 47141b8bda053da2a924d358859702ef248f5f5e Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Tue, 11 Jun 2024 21:36:11 +0500 Subject: [PATCH 02/13] various fixes + applied review suggestion --- store/file/header.go | 106 ++++++++++++++++++++++------------------- store/file/ods.go | 30 +++++++----- store/file/ods_test.go | 16 +------ store/file/square.go | 1 - 4 files changed, 74 insertions(+), 79 deletions(-) diff --git a/store/file/header.go b/store/file/header.go index d7d0a7760a..f6a746ca0c 100644 --- a/store/file/header.go +++ b/store/file/header.go @@ -1,7 +1,6 @@ package file import ( - "bytes" "encoding/binary" "fmt" "io" @@ -9,11 +8,16 @@ import ( "github.com/celestiaorg/celestia-node/share" ) -const headerSize = 64 +type headerVersion uint8 -type header struct { - version fileVersion - fileType fileType +const ( + headerVersionV0 headerVersion = 1 + headerVOSize = 40 +) + +type headerV0 struct { + fileVersion fileVersion + fileType fileType // Taken directly from EDS shareSize uint16 @@ -25,7 +29,7 @@ type header struct { type fileVersion uint8 const ( - fileV0 fileVersion = iota + fileV0 fileVersion = iota + 1 ) type fileType uint8 @@ -35,60 +39,62 @@ const ( q1q4 ) -func (h *header) WriteTo(w io.Writer) (int64, error) { - b := bytes.NewBuffer(make([]byte, 0, headerSize)) - _ = b.WriteByte(byte(h.version)) - _ = b.WriteByte(byte(h.fileType)) - _ = binary.Write(b, binary.LittleEndian, h.shareSize) - _ = binary.Write(b, binary.LittleEndian, h.squareSize) - _, _ = b.Write(h.datahash) - // write padding - _, _ = b.Write(make([]byte, headerSize-b.Len()-1)) - return writeLenEncoded(w, b.Bytes()) +func readHeader(r io.Reader) (*headerV0, error) { + // read first byte to determine the fileVersion + var version headerVersion + if err := binary.Read(r, binary.LittleEndian, &version); err != nil { + return nil, fmt.Errorf("readHeader: %w", err) + } + + switch headerVersion(version) { + case headerVersionV0: + h := &headerV0{} + _, err := h.ReadFrom(r) + return h, err + } + return nil, fmt.Errorf("unsupported header fileVersion: %d", version) } -func readHeader(r io.Reader) (*header, error) { - bytesHeader, err := readLenEncoded(r) +func writeHeader(w io.Writer, h *headerV0) error { + n, err := w.Write([]byte{byte(headerVersionV0)}) if err != nil { - return nil, err + return fmt.Errorf("writeHeader: %w", err) } - if len(bytesHeader) != headerSize-1 { - return nil, fmt.Errorf("readHeader: read %d bytes, expected %d", len(bytesHeader), headerSize) - } - h := &header{ - version: fileVersion(bytesHeader[0]), - fileType: fileType(bytesHeader[1]), - shareSize: binary.LittleEndian.Uint16(bytesHeader[2:4]), - squareSize: binary.LittleEndian.Uint16(bytesHeader[4:6]), - datahash: make([]byte, 32), + if n != 1 { + return fmt.Errorf("writeHeader: wrote %d bytes, expected 1", n) + } + _, err = h.WriteTo(w) + return err +} - copy(h.datahash, bytesHeader[6:6+32]) - return h, err +func (h *headerV0) Size() int { + return headerVOSize + 1 } -func writeLenEncoded(w io.Writer, data []byte) (int64, error) { - _, err := w.Write([]byte{byte(len(data))}) - if err != nil { - return 0, err - } - return io.Copy(w, bytes.NewBuffer(data)) +func (h *headerV0) WriteTo(w io.Writer) (int64, error) { + buf := make([]byte, headerVOSize) + buf[0] = byte(h.fileVersion) + buf[1] = byte(h.fileType) + binary.LittleEndian.PutUint16(buf[4:6], h.shareSize) + binary.LittleEndian.PutUint16(buf[6:8], h.squareSize) + copy(buf[8:40], h.datahash) + n, err := w.Write(buf) + return int64(n), err } -func readLenEncoded(r io.Reader) ([]byte, error) { - lenBuf := make([]byte, 1) - _, err := io.ReadFull(r, lenBuf) - if err != nil { - return nil, err +func (h *headerV0) ReadFrom(r io.Reader) (int64, error) { + bytesHeader := make([]byte, headerVOSize) + n, err := io.ReadFull(r, bytesHeader) + if n != headerVOSize { + return 0, fmt.Errorf("readHeader: read %d bytes, expected %d", len(bytesHeader), headerVOSize) } - data := make([]byte, lenBuf[0]) - n, err := io.ReadFull(r, data) - if err != nil { - return nil, err - } - if n != len(data) { - return nil, fmt.Errorf("readLenEncoded: read %d bytes, expected %d", n, len(data)) - } - return data, nil + h.fileVersion = fileVersion(bytesHeader[0]) + h.fileType = fileType(bytesHeader[1]) + h.shareSize = binary.LittleEndian.Uint16(bytesHeader[4:6]) + h.squareSize = binary.LittleEndian.Uint16(bytesHeader[6:8]) + h.datahash = bytesHeader[8:40] + fmt.Println(h.fileVersion, h.fileType, h.shareSize, h.squareSize) + return headerVOSize, err } diff --git a/store/file/ods.go b/store/file/ods.go index 8ae71708ba..adb763b742 100644 --- a/store/file/ods.go +++ b/store/file/ods.go @@ -18,7 +18,7 @@ var _ eds.AccessorCloser = (*OdsFile)(nil) type OdsFile struct { path string - hdr *header + hdr *headerV0 fl *os.File lock sync.RWMutex @@ -55,12 +55,12 @@ func CreateOdsFile( return nil, fmt.Errorf("file create: %w", err) } - h := &header{ - version: fileV0, - fileType: ods, - shareSize: share.Size, // TODO: rsmt2d should expose this field - squareSize: uint16(eds.Width()), - datahash: datahash, + h := &headerV0{ + fileVersion: fileV0, + fileType: ods, + shareSize: share.Size, // TODO: rsmt2d should expose this field + squareSize: uint16(eds.Width()), + datahash: datahash, } err = writeOdsFile(f, h, eds) @@ -68,16 +68,20 @@ func CreateOdsFile( return nil, fmt.Errorf("writing ODS file: %w", err) } + err = f.Sync() + if err != nil { + return nil, fmt.Errorf("syncing file: %w", err) + } // TODO: fill ods field with data from eds return &OdsFile{ path: path, fl: f, hdr: h, - }, f.Sync() + }, nil } -func writeOdsFile(w io.Writer, h *header, eds *rsmt2d.ExtendedDataSquare) error { - _, err := h.WriteTo(w) +func writeOdsFile(w io.Writer, h *headerV0, eds *rsmt2d.ExtendedDataSquare) error { + err := writeHeader(w, h) if err != nil { return err } @@ -207,7 +211,7 @@ func (f *OdsFile) readOds() error { } // reset file pointer to the beginning of the file shares data - _, err := f.fl.Seek(headerSize, io.SeekStart) + _, err := f.fl.Seek(int64(f.hdr.Size()), io.SeekStart) if err != nil { return fmt.Errorf("discarding header: %w", err) } @@ -227,7 +231,7 @@ func (f *OdsFile) readRow(idx int) ([]share.Share, error) { shares := make([]share.Share, odsLn) pos := idx * odsLn - offset := pos*shrLn + headerSize + offset := f.hdr.Size() + pos*shrLn axsData := make([]byte, odsLn*shrLn) if _, err := f.fl.ReadAt(axsData, int64(offset)); err != nil { @@ -248,7 +252,7 @@ func (f *OdsFile) readCol(axisIdx, quadrantIdx int) ([]share.Share, error) { shares := make([]share.Share, odsLn) for i := range shares { pos := axisIdx + i*odsLn - offset := pos*shrLn + headerSize + quadrantOffset + offset := f.hdr.Size() + quadrantOffset + pos*shrLn shr := make(share.Share, shrLn) if _, err := f.fl.ReadAt(shr, int64(offset)); err != nil { diff --git a/store/file/ods_test.go b/store/file/ods_test.go index eb8731e13a..6e2208ab56 100644 --- a/store/file/ods_test.go +++ b/store/file/ods_test.go @@ -68,21 +68,7 @@ func TestOdsFile(t *testing.T) { return fl } - t.Run("Sample", func(t *testing.T) { - eds.TestAccessorSample(ctx, t, createOdsFile, odsSize) - }) - - t.Run("AxisHalf", func(t *testing.T) { - eds.TestAccessorAxisHalf(ctx, t, createOdsFile, odsSize) - }) - - t.Run("RowNamespaceData", func(t *testing.T) { - eds.TestAccessorRowNamespaceData(ctx, t, createOdsFile, odsSize) - }) - - t.Run("Shares", func(t *testing.T) { - eds.TestAccessorShares(ctx, t, createOdsFile, odsSize) - }) + eds.TestSuiteAccessor(ctx, t, createOdsFile, odsSize) } // ReconstructSome, default codec diff --git a/store/file/square.go b/store/file/square.go index f4bc7e8416..a965ffcf49 100644 --- a/store/file/square.go +++ b/store/file/square.go @@ -92,7 +92,6 @@ func (s square) computeAxisHalf( g, ctx := errgroup.WithContext(ctx) opposite := oppositeAxis(axisType) for i := 0; i < s.size(); i++ { - i := i g.Go(func() error { original, err := s.axisHalf(ctx, opposite, i) if err != nil { From b289c2d47242bed009046085993637a8b71d4f96 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Tue, 11 Jun 2024 21:41:11 +0500 Subject: [PATCH 03/13] fix lint --- store/file/header.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/store/file/header.go b/store/file/header.go index f6a746ca0c..2a0996e068 100644 --- a/store/file/header.go +++ b/store/file/header.go @@ -12,7 +12,7 @@ type headerVersion uint8 const ( headerVersionV0 headerVersion = 1 - headerVOSize = 40 + headerVOSize int = 40 ) type headerV0 struct { @@ -46,7 +46,7 @@ func readHeader(r io.Reader) (*headerV0, error) { return nil, fmt.Errorf("readHeader: %w", err) } - switch headerVersion(version) { + switch version { //nolint:gocritic // gocritic wants to convert it to if statement. case headerVersionV0: h := &headerV0{} _, err := h.ReadFrom(r) @@ -62,7 +62,6 @@ func writeHeader(w io.Writer, h *headerV0) error { } if n != 1 { return fmt.Errorf("writeHeader: wrote %d bytes, expected 1", n) - } _, err = h.WriteTo(w) return err @@ -95,6 +94,5 @@ func (h *headerV0) ReadFrom(r io.Reader) (int64, error) { h.shareSize = binary.LittleEndian.Uint16(bytesHeader[4:6]) h.squareSize = binary.LittleEndian.Uint16(bytesHeader[6:8]) h.datahash = bytesHeader[8:40] - fmt.Println(h.fileVersion, h.fileType, h.shareSize, h.squareSize) - return headerVOSize, err + return int64(headerVOSize), err } From 1868dd8b74d417b787f4c7c1c4a49e08840ea357 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Tue, 11 Jun 2024 22:12:15 +0500 Subject: [PATCH 04/13] fix old linter --- store/file/ods_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/store/file/ods_test.go b/store/file/ods_test.go index 6e2208ab56..e9dec0474f 100644 --- a/store/file/ods_test.go +++ b/store/file/ods_test.go @@ -1,3 +1,4 @@ +//nolint:goconst package file import ( From 4d7810dd351f99c1c7a9d5716a744bc38e9b181f Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Thu, 13 Jun 2024 21:21:23 +0500 Subject: [PATCH 05/13] apply review suggestions --- store/file/header.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/store/file/header.go b/store/file/header.go index 2a0996e068..eb248403a7 100644 --- a/store/file/header.go +++ b/store/file/header.go @@ -46,13 +46,14 @@ func readHeader(r io.Reader) (*headerV0, error) { return nil, fmt.Errorf("readHeader: %w", err) } - switch version { //nolint:gocritic // gocritic wants to convert it to if statement. + switch version { case headerVersionV0: h := &headerV0{} _, err := h.ReadFrom(r) return h, err + default: + return nil, fmt.Errorf("unsupported header fileVersion: %d", version) } - return nil, fmt.Errorf("unsupported header fileVersion: %d", version) } func writeHeader(w io.Writer, h *headerV0) error { @@ -68,6 +69,7 @@ func writeHeader(w io.Writer, h *headerV0) error { } func (h *headerV0) Size() int { + // header size + 1 byte for header fileVersion return headerVOSize + 1 } @@ -86,7 +88,7 @@ func (h *headerV0) ReadFrom(r io.Reader) (int64, error) { bytesHeader := make([]byte, headerVOSize) n, err := io.ReadFull(r, bytesHeader) if n != headerVOSize { - return 0, fmt.Errorf("readHeader: read %d bytes, expected %d", len(bytesHeader), headerVOSize) + return 0, fmt.Errorf("headerV0 ReadFrom: read %d bytes, expected %d", len(bytesHeader), headerVOSize) } h.fileVersion = fileVersion(bytesHeader[0]) From ce65121d42817a919f770bd99e2b83cdf7708ac2 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Fri, 14 Jun 2024 10:45:09 +0500 Subject: [PATCH 06/13] small tests refactoring --- share/new_eds/rsmt2d_test.go | 2 +- share/new_eds/testing.go | 12 ++++++------ store/file/ods_test.go | 31 ++++++++++--------------------- 3 files changed, 17 insertions(+), 28 deletions(-) diff --git a/share/new_eds/rsmt2d_test.go b/share/new_eds/rsmt2d_test.go index abb96010ec..eafcb607ae 100644 --- a/share/new_eds/rsmt2d_test.go +++ b/share/new_eds/rsmt2d_test.go @@ -16,7 +16,7 @@ import ( func TestMemFile(t *testing.T) { odsSize := 8 - newAccessor := func(eds *rsmt2d.ExtendedDataSquare) Accessor { + newAccessor := func(tb testing.TB, eds *rsmt2d.ExtendedDataSquare) Accessor { return &Rsmt2D{ExtendedDataSquare: eds} } ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/share/new_eds/testing.go b/share/new_eds/testing.go index cf0a2c5e49..afaa88b88c 100644 --- a/share/new_eds/testing.go +++ b/share/new_eds/testing.go @@ -18,7 +18,7 @@ import ( "github.com/celestiaorg/celestia-node/share/shwap" ) -type createAccessor func(eds *rsmt2d.ExtendedDataSquare) Accessor +type createAccessor func(testing.TB, *rsmt2d.ExtendedDataSquare) Accessor // TestSuiteAccessor runs a suite of tests for the given Accessor implementation. func TestSuiteAccessor( @@ -51,7 +51,7 @@ func testAccessorSample( odsSize int, ) { eds := edstest.RandEDS(t, odsSize) - fl := createAccessor(eds) + fl := createAccessor(t, eds) dah, err := share.NewRoot(eds) require.NoError(t, err) @@ -108,7 +108,7 @@ func testAccessorRowNamespaceData( for amount := 1; amount < sharesAmount; amount++ { // select random amount of shares, but not less than 1 eds, dah := edstest.RandEDSWithNamespace(t, namespace, amount, odsSize) - f := createAccessor(eds) + f := createAccessor(t, eds) var actualSharesAmount int // loop over all rows and check that the amount of shares in the namespace is equal to the expected @@ -149,7 +149,7 @@ func testAccessorRowNamespaceData( absentNs, err := share.Namespace(maxNs).AddInt(-1) require.NoError(t, err) - f := createAccessor(eds) + f := createAccessor(t, eds) rowData, err := f.RowNamespaceData(ctx, absentNs, i) require.NoError(t, err) @@ -170,7 +170,7 @@ func testAccessorAxisHalf( odsSize int, ) { eds := edstest.RandEDS(t, odsSize) - fl := createAccessor(eds) + fl := createAccessor(t, eds) t.Run("single thread", func(t *testing.T) { for _, axisType := range []rsmt2d.Axis{rsmt2d.Col, rsmt2d.Row} { @@ -224,7 +224,7 @@ func testAccessorShares( odsSize int, ) { eds := edstest.RandEDS(t, odsSize) - fl := createAccessor(eds) + fl := createAccessor(t, eds) shares, err := fl.Shares(ctx) require.NoError(t, err) diff --git a/store/file/ods_test.go b/store/file/ods_test.go index e9dec0474f..b719f13142 100644 --- a/store/file/ods_test.go +++ b/store/file/ods_test.go @@ -1,8 +1,8 @@ -//nolint:goconst package file import ( "context" + "strconv" "testing" "time" @@ -62,13 +62,6 @@ func TestOdsFile(t *testing.T) { t.Cleanup(cancel) odsSize := 8 - createOdsFile := func(eds *rsmt2d.ExtendedDataSquare) eds.Accessor { - path := t.TempDir() + "/testfile" - fl, err := CreateOdsFile(path, []byte{}, eds) - require.NoError(t, err) - return fl - } - eds.TestSuiteAccessor(ctx, t, createOdsFile, odsSize) } @@ -90,14 +83,9 @@ func BenchmarkAxisFromOdsFile(b *testing.B) { b.Cleanup(cancel) minSize, maxSize := 32, 128 - dir := b.TempDir() - newFile := func(size int) eds.Accessor { eds := edstest.RandEDS(b, size) - path := dir + "/testfile" - f, err := CreateOdsFile(path, []byte{}, eds) - require.NoError(b, err) - return f + return createOdsFile(b, eds) } eds.BenchGetHalfAxisFromAccessor(ctx, b, newFile, minSize, maxSize) } @@ -119,15 +107,16 @@ func BenchmarkShareFromOdsFile(b *testing.B) { b.Cleanup(cancel) minSize, maxSize := 32, 128 - dir := b.TempDir() - newFile := func(size int) eds.Accessor { eds := edstest.RandEDS(b, size) - path := dir + "/testfile" - f, err := CreateOdsFile(path, []byte{}, eds) - require.NoError(b, err) - return f + return createOdsFile(b, eds) } - eds.BenchGetSampleFromAccessor(ctx, b, newFile, minSize, maxSize) } + +func createOdsFile(t testing.TB, eds *rsmt2d.ExtendedDataSquare) eds.Accessor { + path := t.TempDir() + "/testfile" + strconv.Itoa(rand.Intn(1000)) + fl, err := CreateOdsFile(path, []byte{}, eds) + require.NoError(t, err) + return fl +} From faffbc24a351e27a7a6d62b7c7092905178d41fb Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Mon, 17 Jun 2024 12:30:16 +0500 Subject: [PATCH 07/13] improve comments --- store/file/codec.go | 1 + store/file/ods.go | 9 ++++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/store/file/codec.go b/store/file/codec.go index a27280be11..a1ccfbe90a 100644 --- a/store/file/codec.go +++ b/store/file/codec.go @@ -6,6 +6,7 @@ import ( "github.com/klauspost/reedsolomon" ) +// TODO: codec will be removed after support for reconstructSome is added to rsmt2d. var codec Codec func init() { diff --git a/store/file/ods.go b/store/file/ods.go index adb763b742..09bfe7e694 100644 --- a/store/file/ods.go +++ b/store/file/ods.go @@ -22,7 +22,14 @@ type OdsFile struct { fl *os.File lock sync.RWMutex - ods square + // ods stores an in-memory cache of the original data square to enhance read performance. This cache is particularly + // beneficial for operations that require reading the entire square, such as: + // - Serving samples from the fourth quadrant of the square, which necessitates reconstructing data from all rows. + // - Streaming the entire ODS by Reader(), ensuring efficient data delivery without repeated file reads. + // - Serving full ods data by Shares(). + // Storing the square in memory allows for efficient single-read operations, avoiding the need for piecemeal + // reads by rows or columns, and facilitates quick access to data for these operations. + ods square } // OpenOdsFile opens an existing file. File has to be closed after usage. From 2a1ab4cfd1e6756a3a7977d8ac7e07cdc6309cb0 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Mon, 17 Jun 2024 13:30:51 +0500 Subject: [PATCH 08/13] fix lint --- store/file/ods_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/store/file/ods_test.go b/store/file/ods_test.go index b719f13142..ba8e7c8bd7 100644 --- a/store/file/ods_test.go +++ b/store/file/ods_test.go @@ -20,9 +20,9 @@ func TestCreateOdsFile(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) t.Cleanup(cancel) - path := t.TempDir() + "/testfile" edsIn := edstest.RandEDS(t, 8) datahash := share.DataHash(rand.Bytes(32)) + path := t.TempDir() + "/" + datahash.String() f, err := CreateOdsFile(path, datahash, edsIn) require.NoError(t, err) @@ -115,7 +115,7 @@ func BenchmarkShareFromOdsFile(b *testing.B) { } func createOdsFile(t testing.TB, eds *rsmt2d.ExtendedDataSquare) eds.Accessor { - path := t.TempDir() + "/testfile" + strconv.Itoa(rand.Intn(1000)) + path := t.TempDir() + "/" + strconv.Itoa(rand.Intn(1000)) fl, err := CreateOdsFile(path, []byte{}, eds) require.NoError(t, err) return fl From 4eb12ad9ceb39f23a6a4f3d45cc105f9510ec8be Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Mon, 17 Jun 2024 18:58:44 +0500 Subject: [PATCH 09/13] fix lint --- store/file/header.go | 2 +- store/file/ods.go | 104 ++++++++++++++++++++--------------------- store/file/ods_test.go | 78 +++++++++++++++---------------- store/file/square.go | 13 +++--- 4 files changed, 99 insertions(+), 98 deletions(-) diff --git a/store/file/header.go b/store/file/header.go index eb248403a7..846aa17bac 100644 --- a/store/file/header.go +++ b/store/file/header.go @@ -35,7 +35,7 @@ const ( type fileType uint8 const ( - ods fileType = iota + ODS fileType = iota q1q4 ) diff --git a/store/file/ods.go b/store/file/ods.go index 09bfe7e694..3a5898a89e 100644 --- a/store/file/ods.go +++ b/store/file/ods.go @@ -14,26 +14,26 @@ import ( "github.com/celestiaorg/celestia-node/share/shwap" ) -var _ eds.AccessorCloser = (*OdsFile)(nil) +var _ eds.AccessorCloser = (*ODSFile)(nil) -type OdsFile struct { +type ODSFile struct { path string hdr *headerV0 fl *os.File lock sync.RWMutex - // ods stores an in-memory cache of the original data square to enhance read performance. This cache is particularly - // beneficial for operations that require reading the entire square, such as: - // - Serving samples from the fourth quadrant of the square, which necessitates reconstructing data from all rows. - // - Streaming the entire ODS by Reader(), ensuring efficient data delivery without repeated file reads. - // - Serving full ods data by Shares(). - // Storing the square in memory allows for efficient single-read operations, avoiding the need for piecemeal - // reads by rows or columns, and facilitates quick access to data for these operations. + // ods stores an in-memory cache of the original data square to enhance read performance. This + // cache is particularly beneficial for operations that require reading the entire square, such as: + // - Serving samples from the fourth quadrant of the square, which necessitates reconstructing data + // from all rows. - Streaming the entire ODS by Reader(), ensuring efficient data delivery without + // repeated file reads. - Serving full ODS data by Shares(). + // Storing the square in memory allows for efficient single-read operations, avoiding the need for + // piecemeal reads by rows or columns, and facilitates quick access to data for these operations. ods square } -// OpenOdsFile opens an existing file. File has to be closed after usage. -func OpenOdsFile(path string) (*OdsFile, error) { +// OpenODSFile opens an existing file. File has to be closed after usage. +func OpenODSFile(path string) (*ODSFile, error) { f, err := os.Open(path) if err != nil { return nil, err @@ -44,19 +44,19 @@ func OpenOdsFile(path string) (*OdsFile, error) { return nil, err } - return &OdsFile{ + return &ODSFile{ path: path, hdr: h, fl: f, }, nil } -// CreateOdsFile creates a new file. File has to be closed after usage. -func CreateOdsFile( +// CreateODSFile creates a new file. File has to be closed after usage. +func CreateODSFile( path string, datahash share.DataHash, eds *rsmt2d.ExtendedDataSquare, -) (*OdsFile, error) { +) (*ODSFile, error) { f, err := os.Create(path) if err != nil { return nil, fmt.Errorf("file create: %w", err) @@ -64,13 +64,13 @@ func CreateOdsFile( h := &headerV0{ fileVersion: fileV0, - fileType: ods, - shareSize: share.Size, // TODO: rsmt2d should expose this field + fileType: ODS, + shareSize: share.Size, squareSize: uint16(eds.Width()), datahash: datahash, } - err = writeOdsFile(f, h, eds) + err = writeODSFile(f, h, eds) if err != nil { return nil, fmt.Errorf("writing ODS file: %w", err) } @@ -80,14 +80,14 @@ func CreateOdsFile( return nil, fmt.Errorf("syncing file: %w", err) } // TODO: fill ods field with data from eds - return &OdsFile{ + return &ODSFile{ path: path, fl: f, hdr: h, }, nil } -func writeOdsFile(w io.Writer, h *headerV0, eds *rsmt2d.ExtendedDataSquare) error { +func writeODSFile(w io.Writer, h *headerV0, eds *rsmt2d.ExtendedDataSquare) error { err := writeHeader(w, h) if err != nil { return err @@ -102,29 +102,29 @@ func writeOdsFile(w io.Writer, h *headerV0, eds *rsmt2d.ExtendedDataSquare) erro } // Size returns square size of the Accessor. -func (f *OdsFile) Size(context.Context) int { +func (f *ODSFile) Size(context.Context) int { return f.size() } -func (f *OdsFile) size() int { +func (f *ODSFile) size() int { return int(f.hdr.squareSize) } // Close closes the file. -func (f *OdsFile) Close() error { +func (f *ODSFile) Close() error { return f.fl.Close() } // Sample returns share and corresponding proof for row and column indices. Implementation can // choose which axis to use for proof. Chosen axis for proof should be indicated in the returned // Sample. -func (f *OdsFile) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error) { +func (f *ODSFile) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error) { // Sample proof axis is selected to optimize read performance. // - For the first and second quadrants, we read the row axis because it is more efficient to read - // single row than reading full ods to calculate single column - // - For the third quadrants, we read the column axis because it is more efficient to read single - // column than reading full ods to calculate single row - // - For the fourth quadrant, it does not matter which axis we read because we need to read full ods + // single row than reading full ODS to calculate single column + // - For the third quadrant, we read the column axis because it is more efficient to read single + // column than reading full ODS to calculate single row + // - For the fourth quadrant, it does not matter which axis we read because we need to read full ODS // to calculate the sample axisType, axisIdx, shrIdx := rsmt2d.Row, rowIdx, colIdx if colIdx < f.size()/2 && rowIdx >= f.size()/2 { @@ -141,9 +141,9 @@ func (f *OdsFile) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, // AxisHalf returns half of shares axis of the given type and index. Side is determined by // implementation. Implementations should indicate the side in the returned AxisHalf. -func (f *OdsFile) AxisHalf(ctx context.Context, axisType rsmt2d.Axis, axisIdx int) (eds.AxisHalf, error) { - // read axis from file if axisis row and from top half of the square or if axis is column and from - // left half of the square +func (f *ODSFile) AxisHalf(ctx context.Context, axisType rsmt2d.Axis, axisIdx int) (eds.AxisHalf, error) { + // Read the axis from the file if the axis is a row and from the top half of the square, or if the + // axis is a column and from the left half of the square. if axisIdx < f.size()/2 { shares, err := f.readAxisHalf(axisType, axisIdx) if err != nil { @@ -155,8 +155,8 @@ func (f *OdsFile) AxisHalf(ctx context.Context, axisType rsmt2d.Axis, axisIdx in }, nil } - // if axis is from the second half of the square, read full ods and compute the axis half - err := f.readOds() + // if axis is from the second half of the square, read full ODS and compute the axis half + err := f.readODS() if err != nil { return eds.AxisHalf{}, err } @@ -172,7 +172,7 @@ func (f *OdsFile) AxisHalf(ctx context.Context, axisType rsmt2d.Axis, axisIdx in } // RowNamespaceData returns data for the given namespace and row index. -func (f *OdsFile) RowNamespaceData( +func (f *ODSFile) RowNamespaceData( ctx context.Context, namespace share.Namespace, rowIdx int, @@ -185,19 +185,19 @@ func (f *OdsFile) RowNamespaceData( } // Shares returns data shares extracted from the Accessor. -func (f *OdsFile) Shares(context.Context) ([]share.Share, error) { - err := f.readOds() +func (f *ODSFile) Shares(context.Context) ([]share.Share, error) { + err := f.readODS() if err != nil { return nil, err } return f.ods.shares() } -func (f *OdsFile) readAxisHalf(axisType rsmt2d.Axis, axisIdx int) ([]share.Share, error) { +func (f *ODSFile) readAxisHalf(axisType rsmt2d.Axis, axisIdx int) ([]share.Share, error) { f.lock.RLock() - ods := f.ods + ODS := f.ods f.lock.RUnlock() - if ods != nil { + if ODS != nil { return f.ods.axisHalf(context.Background(), axisType, axisIdx) } @@ -210,7 +210,7 @@ func (f *OdsFile) readAxisHalf(axisType rsmt2d.Axis, axisIdx int) ([]share.Share return nil, fmt.Errorf("unknown axis") } -func (f *OdsFile) readOds() error { +func (f *ODSFile) readODS() error { f.lock.Lock() defer f.lock.Unlock() if f.ods != nil { @@ -225,22 +225,22 @@ func (f *OdsFile) readOds() error { square, err := readSquare(f.fl, share.Size, f.size()) if err != nil { - return fmt.Errorf("reading ods: %w", err) + return fmt.Errorf("reading ODS: %w", err) } f.ods = square return nil } -func (f *OdsFile) readRow(idx int) ([]share.Share, error) { +func (f *ODSFile) readRow(idx int) ([]share.Share, error) { shrLn := int(f.hdr.shareSize) - odsLn := int(f.hdr.squareSize) / 2 + ODSLn := int(f.size()) / 2 - shares := make([]share.Share, odsLn) + shares := make([]share.Share, ODSLn) - pos := idx * odsLn + pos := idx * ODSLn offset := f.hdr.Size() + pos*shrLn - axsData := make([]byte, odsLn*shrLn) + axsData := make([]byte, ODSLn*shrLn) if _, err := f.fl.ReadAt(axsData, int64(offset)); err != nil { return nil, err } @@ -251,14 +251,14 @@ func (f *OdsFile) readRow(idx int) ([]share.Share, error) { return shares, nil } -func (f *OdsFile) readCol(axisIdx, quadrantIdx int) ([]share.Share, error) { +func (f *ODSFile) readCol(axisIdx, quadrantIdx int) ([]share.Share, error) { shrLn := int(f.hdr.shareSize) - odsLn := int(f.hdr.squareSize) / 2 - quadrantOffset := quadrantIdx * odsLn * odsLn * shrLn + ODSLn := int(f.size()) / 2 + quadrantOffset := quadrantIdx * ODSLn * ODSLn * shrLn - shares := make([]share.Share, odsLn) + shares := make([]share.Share, ODSLn) for i := range shares { - pos := axisIdx + i*odsLn + pos := axisIdx + i*ODSLn offset := f.hdr.Size() + quadrantOffset + pos*shrLn shr := make(share.Share, shrLn) @@ -270,7 +270,7 @@ func (f *OdsFile) readCol(axisIdx, quadrantIdx int) ([]share.Share, error) { return shares, nil } -func (f *OdsFile) axis(ctx context.Context, axisType rsmt2d.Axis, axisIdx int) ([]share.Share, error) { +func (f *ODSFile) axis(ctx context.Context, axisType rsmt2d.Axis, axisIdx int) ([]share.Share, error) { half, err := f.AxisHalf(ctx, axisType, axisIdx) if err != nil { return nil, err diff --git a/store/file/ods_test.go b/store/file/ods_test.go index ba8e7c8bd7..d1fbdd40d8 100644 --- a/store/file/ods_test.go +++ b/store/file/ods_test.go @@ -16,14 +16,14 @@ import ( eds "github.com/celestiaorg/celestia-node/share/new_eds" ) -func TestCreateOdsFile(t *testing.T) { +func TestCreateODSFile(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) t.Cleanup(cancel) edsIn := edstest.RandEDS(t, 8) datahash := share.DataHash(rand.Bytes(32)) path := t.TempDir() + "/" + datahash.String() - f, err := CreateOdsFile(path, datahash, edsIn) + f, err := CreateODSFile(path, datahash, edsIn) require.NoError(t, err) shares, err := f.Shares(ctx) @@ -33,7 +33,7 @@ func TestCreateOdsFile(t *testing.T) { require.Equal(t, datahash, f.hdr.datahash) require.NoError(t, f.Close()) - f, err = OpenOdsFile(path) + f, err = OpenODSFile(path) require.NoError(t, err) shares, err = f.Shares(ctx) require.NoError(t, err) @@ -42,13 +42,13 @@ func TestCreateOdsFile(t *testing.T) { require.NoError(t, f.Close()) } -func TestReadOdsFromFile(t *testing.T) { +func TestReadODSFromFile(t *testing.T) { eds := edstest.RandEDS(t, 8) path := t.TempDir() + "/testfile" - f, err := CreateOdsFile(path, []byte{}, eds) + f, err := CreateODSFile(path, []byte{}, eds) require.NoError(t, err) - err = f.readOds() + err = f.readODS() require.NoError(t, err) for i, row := range f.ods { original := eds.Row(uint(i))[:eds.Width()/2] @@ -57,66 +57,66 @@ func TestReadOdsFromFile(t *testing.T) { } } -func TestOdsFile(t *testing.T) { +func TestODSFile(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) t.Cleanup(cancel) - odsSize := 8 - eds.TestSuiteAccessor(ctx, t, createOdsFile, odsSize) + ODSSize := 8 + eds.TestSuiteAccessor(ctx, t, createODSFile, ODSSize) } // ReconstructSome, default codec -// BenchmarkAxisFromOdsFile/Size:32/Axis:row/squareHalf:first(original)-10 455848 2588 ns/op -// BenchmarkAxisFromOdsFile/Size:32/Axis:row/squareHalf:second(extended)-10 9015 203950 ns/op -// BenchmarkAxisFromOdsFile/Size:32/Axis:col/squareHalf:first(original)-10 52734 21178 ns/op -// BenchmarkAxisFromOdsFile/Size:32/Axis:col/squareHalf:second(extended)-10 8830 127452 ns/op -// BenchmarkAxisFromOdsFile/Size:64/Axis:row/squareHalf:first(original)-10 303834 4763 ns/op -// BenchmarkAxisFromOdsFile/Size:64/Axis:row/squareHalf:second(extended)-10 2940 426246 ns/op -// BenchmarkAxisFromOdsFile/Size:64/Axis:col/squareHalf:first(original)-10 27758 42842 ns/op -// BenchmarkAxisFromOdsFile/Size:64/Axis:col/squareHalf:second(extended)-10 3385 353868 ns/op -// BenchmarkAxisFromOdsFile/Size:128/Axis:row/squareHalf:first(original)-10 172086 6455 ns/op -// BenchmarkAxisFromOdsFile/Size:128/Axis:row/squareHalf:second(extended)-10 672 1550386 ns/op -// BenchmarkAxisFromOdsFile/Size:128/Axis:col/squareHalf:first(original)-10 14202 84316 ns/op -// BenchmarkAxisFromOdsFile/Size:128/Axis:col/squareHalf:second(extended)-10 978 1230980 ns/op -func BenchmarkAxisFromOdsFile(b *testing.B) { +// BenchmarkAxisFromODSFile/Size:32/Axis:row/squareHalf:first(original)-10 455848 2588 ns/op +// BenchmarkAxisFromODSFile/Size:32/Axis:row/squareHalf:second(extended)-10 9015 203950 ns/op +// BenchmarkAxisFromODSFile/Size:32/Axis:col/squareHalf:first(original)-10 52734 21178 ns/op +// BenchmarkAxisFromODSFile/Size:32/Axis:col/squareHalf:second(extended)-10 8830 127452 ns/op +// BenchmarkAxisFromODSFile/Size:64/Axis:row/squareHalf:first(original)-10 303834 4763 ns/op +// BenchmarkAxisFromODSFile/Size:64/Axis:row/squareHalf:second(extended)-10 2940 426246 ns/op +// BenchmarkAxisFromODSFile/Size:64/Axis:col/squareHalf:first(original)-10 27758 42842 ns/op +// BenchmarkAxisFromODSFile/Size:64/Axis:col/squareHalf:second(extended)-10 3385 353868 ns/op +// BenchmarkAxisFromODSFile/Size:128/Axis:row/squareHalf:first(original)-10 172086 6455 ns/op +// BenchmarkAxisFromODSFile/Size:128/Axis:row/squareHalf:second(extended)-10 672 1550386 ns/op +// BenchmarkAxisFromODSFile/Size:128/Axis:col/squareHalf:first(original)-10 14202 84316 ns/op +// BenchmarkAxisFromODSFile/Size:128/Axis:col/squareHalf:second(extended)-10 978 1230980 ns/op +func BenchmarkAxisFromODSFile(b *testing.B) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) b.Cleanup(cancel) minSize, maxSize := 32, 128 newFile := func(size int) eds.Accessor { eds := edstest.RandEDS(b, size) - return createOdsFile(b, eds) + return createODSFile(b, eds) } eds.BenchGetHalfAxisFromAccessor(ctx, b, newFile, minSize, maxSize) } -// BenchmarkShareFromOdsFile/Size:32/Axis:row/squareHalf:first(original)-10 10339 111328 ns/op -// BenchmarkShareFromOdsFile/Size:32/Axis:row/squareHalf:second(extended)-10 3392 359180 ns/op -// BenchmarkShareFromOdsFile/Size:32/Axis:col/squareHalf:first(original)-10 8925 131352 ns/op -// BenchmarkShareFromOdsFile/Size:32/Axis:col/squareHalf:second(extended)-10 3447 346218 ns/op -// BenchmarkShareFromOdsFile/Size:64/Axis:row/squareHalf:first(original)-10 5503 215833 ns/op -// BenchmarkShareFromOdsFile/Size:64/Axis:row/squareHalf:second(extended)-10 1231 1001053 ns/op -// BenchmarkShareFromOdsFile/Size:64/Axis:col/squareHalf:first(original)-10 4711 250001 ns/op -// BenchmarkShareFromOdsFile/Size:64/Axis:col/squareHalf:second(extended)-10 1315 910079 ns/op -// BenchmarkShareFromOdsFile/Size:128/Axis:row/squareHalf:first(original)-10 2364 435748 ns/op -// BenchmarkShareFromOdsFile/Size:128/Axis:row/squareHalf:second(extended)-10 358 3330620 ns/op -// BenchmarkShareFromOdsFile/Size:128/Axis:col/squareHalf:first(original)-10 2114 514642 ns/op -// BenchmarkShareFromOdsFile/Size:128/Axis:col/squareHalf:second(extended)-10 373 3068104 ns/op -func BenchmarkShareFromOdsFile(b *testing.B) { +// BenchmarkShareFromODSFile/Size:32/Axis:row/squareHalf:first(original)-10 10339 111328 ns/op +// BenchmarkShareFromODSFile/Size:32/Axis:row/squareHalf:second(extended)-10 3392 359180 ns/op +// BenchmarkShareFromODSFile/Size:32/Axis:col/squareHalf:first(original)-10 8925 131352 ns/op +// BenchmarkShareFromODSFile/Size:32/Axis:col/squareHalf:second(extended)-10 3447 346218 ns/op +// BenchmarkShareFromODSFile/Size:64/Axis:row/squareHalf:first(original)-10 5503 215833 ns/op +// BenchmarkShareFromODSFile/Size:64/Axis:row/squareHalf:second(extended)-10 1231 1001053 ns/op +// BenchmarkShareFromODSFile/Size:64/Axis:col/squareHalf:first(original)-10 4711 250001 ns/op +// BenchmarkShareFromODSFile/Size:64/Axis:col/squareHalf:second(extended)-10 1315 910079 ns/op +// BenchmarkShareFromODSFile/Size:128/Axis:row/squareHalf:first(original)-10 2364 435748 ns/op +// BenchmarkShareFromODSFile/Size:128/Axis:row/squareHalf:second(extended)-10 358 3330620 ns/op +// BenchmarkShareFromODSFile/Size:128/Axis:col/squareHalf:first(original)-10 2114 514642 ns/op +// BenchmarkShareFromODSFile/Size:128/Axis:col/squareHalf:second(extended)-10 373 3068104 ns/op +func BenchmarkShareFromODSFile(b *testing.B) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) b.Cleanup(cancel) minSize, maxSize := 32, 128 newFile := func(size int) eds.Accessor { eds := edstest.RandEDS(b, size) - return createOdsFile(b, eds) + return createODSFile(b, eds) } eds.BenchGetSampleFromAccessor(ctx, b, newFile, minSize, maxSize) } -func createOdsFile(t testing.TB, eds *rsmt2d.ExtendedDataSquare) eds.Accessor { +func createODSFile(t testing.TB, eds *rsmt2d.ExtendedDataSquare) eds.Accessor { path := t.TempDir() + "/" + strconv.Itoa(rand.Intn(1000)) - fl, err := CreateOdsFile(path, []byte{}, eds) + fl, err := CreateODSFile(path, []byte{}, eds) require.NoError(t, err) return fl } diff --git a/store/file/square.go b/store/file/square.go index a965ffcf49..2ab615fc6e 100644 --- a/store/file/square.go +++ b/store/file/square.go @@ -19,11 +19,11 @@ type square [][]share.Share // positioned at the beginning of the Shares. It knows the size of the Shares and the size of the // square, so reads from reader are limited to exactly the amount of data required. func readSquare(r io.Reader, shareSize, edsSize int) (square, error) { - odsLn := edsSize / 2 + ODSLn := edsSize / 2 - square := make(square, odsLn) + square := make(square, ODSLn) for i := range square { - square[i] = make([]share.Share, odsLn) + square[i] = make([]share.Share, ODSLn) for j := range square[i] { square[i][j] = make(share.Share, shareSize) } @@ -32,8 +32,8 @@ func readSquare(r io.Reader, shareSize, edsSize int) (square, error) { // TODO(@walldiss): run benchmark to find optimal size for this buffer br := bufio.NewReaderSize(r, 4096) var total int - for i := 0; i < odsLn; i++ { - for j := 0; j < odsLn; j++ { + for i := 0; i < ODSLn; i++ { + for j := 0; j < ODSLn; j++ { n, err := io.ReadFull(br, square[i][j]) if err != nil { return nil, fmt.Errorf("reading share: %w, bytes read: %v", err, total+n) @@ -81,6 +81,7 @@ func (s square) shares() ([]share.Share, error) { return shares, nil } +// TODO(@walldiss): make comments with diagram of computed axis. Add more comment on actual algo func (s square) computeAxisHalf( ctx context.Context, axisType rsmt2d.Axis, @@ -100,7 +101,7 @@ func (s square) computeAxisHalf( enc, err := codec.Encoder(s.size() * 2) if err != nil { - return fmt.Errorf("encoder: %w", err) + return fmt.Errorf("getting encoder: %w", err) } shards := make([][]byte, s.size()*2) From d11413cfa0bf08e29fbd9e9ef1abccf3fb7683e2 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Tue, 18 Jun 2024 16:04:34 +0500 Subject: [PATCH 10/13] improve after review 2 --- share/shwap/sample.go | 2 ++ store/file/ods.go | 38 +++++++++++++++-------------- store/file/square.go | 56 +++++++++++++++++++++++++++---------------- 3 files changed, 57 insertions(+), 39 deletions(-) diff --git a/share/shwap/sample.go b/share/shwap/sample.go index a58a41910e..cb263415ad 100644 --- a/share/shwap/sample.go +++ b/share/shwap/sample.go @@ -25,6 +25,8 @@ type Sample struct { ProofType rsmt2d.Axis // ProofType indicates whether the proof is against a row or a column. } +// SampleFromShares creates a Sample from a list of shares, using the specified proof type and +// the share index to be included in the sample. func SampleFromShares(shares []share.Share, proofType rsmt2d.Axis, axisIdx, shrIdx int) (Sample, error) { tree := wrapper.NewErasuredNamespacedMerkleTree(uint64(len(shares)/2), uint(axisIdx)) for _, shr := range shares { diff --git a/store/file/ods.go b/store/file/ods.go index 3a5898a89e..4a3a0501d8 100644 --- a/store/file/ods.go +++ b/store/file/ods.go @@ -141,18 +141,15 @@ func (f *ODSFile) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, // AxisHalf returns half of shares axis of the given type and index. Side is determined by // implementation. Implementations should indicate the side in the returned AxisHalf. -func (f *ODSFile) AxisHalf(ctx context.Context, axisType rsmt2d.Axis, axisIdx int) (eds.AxisHalf, error) { +func (f *ODSFile) AxisHalf(_ context.Context, axisType rsmt2d.Axis, axisIdx int) (eds.AxisHalf, error) { // Read the axis from the file if the axis is a row and from the top half of the square, or if the // axis is a column and from the left half of the square. if axisIdx < f.size()/2 { - shares, err := f.readAxisHalf(axisType, axisIdx) + half, err := f.readAxisHalf(axisType, axisIdx) if err != nil { return eds.AxisHalf{}, fmt.Errorf("reading axis half: %w", err) } - return eds.AxisHalf{ - Shares: shares, - IsParity: false, - }, nil + return half, nil } // if axis is from the second half of the square, read full ODS and compute the axis half @@ -161,14 +158,11 @@ func (f *ODSFile) AxisHalf(ctx context.Context, axisType rsmt2d.Axis, axisIdx in return eds.AxisHalf{}, err } - shares, err := f.ods.computeAxisHalf(ctx, axisType, axisIdx) + half, err := f.ods.computeAxisHalf(axisType, axisIdx) if err != nil { return eds.AxisHalf{}, fmt.Errorf("computing axis half: %w", err) } - return eds.AxisHalf{ - Shares: shares, - IsParity: false, - }, nil + return half, nil } // RowNamespaceData returns data for the given namespace and row index. @@ -193,21 +187,29 @@ func (f *ODSFile) Shares(context.Context) ([]share.Share, error) { return f.ods.shares() } -func (f *ODSFile) readAxisHalf(axisType rsmt2d.Axis, axisIdx int) ([]share.Share, error) { +func (f *ODSFile) readAxisHalf(axisType rsmt2d.Axis, axisIdx int) (eds.AxisHalf, error) { f.lock.RLock() ODS := f.ods f.lock.RUnlock() if ODS != nil { - return f.ods.axisHalf(context.Background(), axisType, axisIdx) + return f.ods.axisHalf(axisType, axisIdx) } switch axisType { case rsmt2d.Col: - return f.readCol(axisIdx, 0) + col, err := f.readCol(axisIdx, 0) + return eds.AxisHalf{ + Shares: col, + IsParity: false, + }, err case rsmt2d.Row: - return f.readRow(axisIdx) + row, err := f.readRow(axisIdx) + return eds.AxisHalf{ + Shares: row, + IsParity: false, + }, err } - return nil, fmt.Errorf("unknown axis") + return eds.AxisHalf{}, fmt.Errorf("unknown axis") } func (f *ODSFile) readODS() error { @@ -233,7 +235,7 @@ func (f *ODSFile) readODS() error { func (f *ODSFile) readRow(idx int) ([]share.Share, error) { shrLn := int(f.hdr.shareSize) - ODSLn := int(f.size()) / 2 + ODSLn := f.size() / 2 shares := make([]share.Share, ODSLn) @@ -253,7 +255,7 @@ func (f *ODSFile) readRow(idx int) ([]share.Share, error) { func (f *ODSFile) readCol(axisIdx, quadrantIdx int) ([]share.Share, error) { shrLn := int(f.hdr.shareSize) - ODSLn := int(f.size()) / 2 + ODSLn := f.size() / 2 quadrantOffset := quadrantIdx * ODSLn * ODSLn * shrLn shares := make([]share.Share, ODSLn) diff --git a/store/file/square.go b/store/file/square.go index 2ab615fc6e..10c129aa83 100644 --- a/store/file/square.go +++ b/store/file/square.go @@ -2,7 +2,6 @@ package file import ( "bufio" - "context" "fmt" "io" @@ -11,6 +10,7 @@ import ( "github.com/celestiaorg/rsmt2d" "github.com/celestiaorg/celestia-node/share" + eds "github.com/celestiaorg/celestia-node/share/new_eds" ) type square [][]share.Share @@ -51,18 +51,30 @@ func (s square) size() int { return len(s) } -func (s square) axisHalf(_ context.Context, axisType rsmt2d.Axis, axisIdx int) ([]share.Share, error) { +func (s square) shares() ([]share.Share, error) { + shares := make([]share.Share, 0, s.size()*s.size()) + for _, row := range s { + shares = append(shares, row...) + } + return shares, nil +} + +func (s square) axisHalf(axisType rsmt2d.Axis, axisIdx int) (eds.AxisHalf, error) { if s == nil { - return nil, fmt.Errorf("square is nil") + return eds.AxisHalf{}, fmt.Errorf("square is nil") } if axisIdx >= s.size() { - return nil, fmt.Errorf("index is out of square bounds") + return eds.AxisHalf{}, fmt.Errorf("index is out of square bounds") } // square stores rows directly in high level slice, so we can return by accessing row by index if axisType == rsmt2d.Row { - return s[axisIdx], nil + row := s[axisIdx] + return eds.AxisHalf{ + Shares: row, + IsParity: false, + }, nil } // construct half column from row ordered square @@ -70,31 +82,26 @@ func (s square) axisHalf(_ context.Context, axisType rsmt2d.Axis, axisIdx int) ( for i := 0; i < s.size(); i++ { col[i] = s[i][axisIdx] } - return col, nil + return eds.AxisHalf{ + Shares: col, + IsParity: false, + }, nil } -func (s square) shares() ([]share.Share, error) { - shares := make([]share.Share, 0, s.size()*s.size()) - for _, row := range s { - shares = append(shares, row...) - } - return shares, nil -} - -// TODO(@walldiss): make comments with diagram of computed axis. Add more comment on actual algo +// TODO(@walldiss): Add more comment on actual algo and support it with visual diagram of computed +// axis. func (s square) computeAxisHalf( - ctx context.Context, axisType rsmt2d.Axis, axisIdx int, -) ([]share.Share, error) { +) (eds.AxisHalf, error) { shares := make([]share.Share, s.size()) // extend opposite half of the square while collecting Shares for the first half of required axis - g, ctx := errgroup.WithContext(ctx) + g := errgroup.Group{} opposite := oppositeAxis(axisType) for i := 0; i < s.size(); i++ { g.Go(func() error { - original, err := s.axisHalf(ctx, opposite, i) + half, err := s.axisHalf(opposite, i) if err != nil { return err } @@ -105,7 +112,11 @@ func (s square) computeAxisHalf( } shards := make([][]byte, s.size()*2) - copy(shards, original) + if half.IsParity { + copy(shards[s.size():], half.Shares) + } else { + copy(shards, half.Shares) + } target := make([]bool, s.size()*2) target[axisIdx] = true @@ -121,7 +132,10 @@ func (s square) computeAxisHalf( } err := g.Wait() - return shares, err + return eds.AxisHalf{ + Shares: shares, + IsParity: false, + }, err } func oppositeAxis(axis rsmt2d.Axis) rsmt2d.Axis { From 221d8d390298bf8f1885492e71f8c841b92cef84 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Tue, 18 Jun 2024 16:07:59 +0500 Subject: [PATCH 11/13] remove todo --- store/file/codec.go | 1 - store/file/ods.go | 2 +- store/file/square.go | 3 --- 3 files changed, 1 insertion(+), 5 deletions(-) diff --git a/store/file/codec.go b/store/file/codec.go index a1ccfbe90a..a27280be11 100644 --- a/store/file/codec.go +++ b/store/file/codec.go @@ -6,7 +6,6 @@ import ( "github.com/klauspost/reedsolomon" ) -// TODO: codec will be removed after support for reconstructSome is added to rsmt2d. var codec Codec func init() { diff --git a/store/file/ods.go b/store/file/ods.go index 4a3a0501d8..e0fbfe879c 100644 --- a/store/file/ods.go +++ b/store/file/ods.go @@ -79,7 +79,7 @@ func CreateODSFile( if err != nil { return nil, fmt.Errorf("syncing file: %w", err) } - // TODO: fill ods field with data from eds + return &ODSFile{ path: path, fl: f, diff --git a/store/file/square.go b/store/file/square.go index 10c129aa83..21b8e5c07f 100644 --- a/store/file/square.go +++ b/store/file/square.go @@ -29,7 +29,6 @@ func readSquare(r io.Reader, shareSize, edsSize int) (square, error) { } } - // TODO(@walldiss): run benchmark to find optimal size for this buffer br := bufio.NewReaderSize(r, 4096) var total int for i := 0; i < ODSLn; i++ { @@ -88,8 +87,6 @@ func (s square) axisHalf(axisType rsmt2d.Axis, axisIdx int) (eds.AxisHalf, error }, nil } -// TODO(@walldiss): Add more comment on actual algo and support it with visual diagram of computed -// axis. func (s square) computeAxisHalf( axisType rsmt2d.Axis, axisIdx int, From 60670d76fdeb6b3778406ddbfaaa5a4f4e179a13 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Tue, 18 Jun 2024 17:51:10 +0500 Subject: [PATCH 12/13] fix naming --- store/file/ods.go | 16 ++++++++-------- store/file/square.go | 10 +++++----- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/store/file/ods.go b/store/file/ods.go index e0fbfe879c..be69f1dae1 100644 --- a/store/file/ods.go +++ b/store/file/ods.go @@ -235,14 +235,14 @@ func (f *ODSFile) readODS() error { func (f *ODSFile) readRow(idx int) ([]share.Share, error) { shrLn := int(f.hdr.shareSize) - ODSLn := f.size() / 2 + odsLn := f.size() / 2 - shares := make([]share.Share, ODSLn) + shares := make([]share.Share, odsLn) - pos := idx * ODSLn + pos := idx * odsLn offset := f.hdr.Size() + pos*shrLn - axsData := make([]byte, ODSLn*shrLn) + axsData := make([]byte, odsLn*shrLn) if _, err := f.fl.ReadAt(axsData, int64(offset)); err != nil { return nil, err } @@ -255,12 +255,12 @@ func (f *ODSFile) readRow(idx int) ([]share.Share, error) { func (f *ODSFile) readCol(axisIdx, quadrantIdx int) ([]share.Share, error) { shrLn := int(f.hdr.shareSize) - ODSLn := f.size() / 2 - quadrantOffset := quadrantIdx * ODSLn * ODSLn * shrLn + odsLn := f.size() / 2 + quadrantOffset := quadrantIdx * odsLn * odsLn * shrLn - shares := make([]share.Share, ODSLn) + shares := make([]share.Share, odsLn) for i := range shares { - pos := axisIdx + i*ODSLn + pos := axisIdx + i*odsLn offset := f.hdr.Size() + quadrantOffset + pos*shrLn shr := make(share.Share, shrLn) diff --git a/store/file/square.go b/store/file/square.go index 21b8e5c07f..85a0e5aa9e 100644 --- a/store/file/square.go +++ b/store/file/square.go @@ -19,11 +19,11 @@ type square [][]share.Share // positioned at the beginning of the Shares. It knows the size of the Shares and the size of the // square, so reads from reader are limited to exactly the amount of data required. func readSquare(r io.Reader, shareSize, edsSize int) (square, error) { - ODSLn := edsSize / 2 + odsLn := edsSize / 2 - square := make(square, ODSLn) + square := make(square, odsLn) for i := range square { - square[i] = make([]share.Share, ODSLn) + square[i] = make([]share.Share, odsLn) for j := range square[i] { square[i][j] = make(share.Share, shareSize) } @@ -31,8 +31,8 @@ func readSquare(r io.Reader, shareSize, edsSize int) (square, error) { br := bufio.NewReaderSize(r, 4096) var total int - for i := 0; i < ODSLn; i++ { - for j := 0; j < ODSLn; j++ { + for i := 0; i < odsLn; i++ { + for j := 0; j < odsLn; j++ { n, err := io.ReadFull(br, square[i][j]) if err != nil { return nil, fmt.Errorf("reading share: %w, bytes read: %v", err, total+n) From 951440eb8be597841bc68e60831aee30fce134d9 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Wed, 19 Jun 2024 18:25:55 +0300 Subject: [PATCH 13/13] fix review suggestions --- store/file/header.go | 7 ++----- store/file/ods.go | 2 +- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/store/file/header.go b/store/file/header.go index 846aa17bac..67121a03f3 100644 --- a/store/file/header.go +++ b/store/file/header.go @@ -35,7 +35,7 @@ const ( type fileType uint8 const ( - ODS fileType = iota + ods fileType = iota q1q4 ) @@ -57,13 +57,10 @@ func readHeader(r io.Reader) (*headerV0, error) { } func writeHeader(w io.Writer, h *headerV0) error { - n, err := w.Write([]byte{byte(headerVersionV0)}) + err := binary.Write(w, binary.LittleEndian, headerVersionV0) if err != nil { return fmt.Errorf("writeHeader: %w", err) } - if n != 1 { - return fmt.Errorf("writeHeader: wrote %d bytes, expected 1", n) - } _, err = h.WriteTo(w) return err } diff --git a/store/file/ods.go b/store/file/ods.go index be69f1dae1..64a224dde8 100644 --- a/store/file/ods.go +++ b/store/file/ods.go @@ -64,7 +64,7 @@ func CreateODSFile( h := &headerV0{ fileVersion: fileV0, - fileType: ODS, + fileType: ods, shareSize: share.Size, squareSize: uint16(eds.Width()), datahash: datahash,