From be87f6535b4fae1c680f477694ef9fffa67fb3c1 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Wed, 26 Jun 2024 23:18:28 +0500 Subject: [PATCH 1/8] add file streaming --- share/new_eds/accessor.go | 3 + share/new_eds/close_once.go | 8 +++ share/new_eds/close_once_test.go | 6 ++ share/new_eds/nd_test.go | 2 +- share/new_eds/reader.go | 42 ++++++++++++ share/new_eds/reader_test.go | 85 ++++++++++++++++++++++++ share/new_eds/rsmt2d.go | 90 +++++++++++++++++++++++--- share/new_eds/rsmt2d_test.go | 2 +- share/new_eds/testing.go | 42 ++++++++++++ share/shwap/row_namespace_data_test.go | 4 +- store/file/ods.go | 10 +++ store/file/square.go | 40 ++++++++++++ 12 files changed, 322 insertions(+), 12 deletions(-) create mode 100644 share/new_eds/reader.go create mode 100644 share/new_eds/reader_test.go diff --git a/share/new_eds/accessor.go b/share/new_eds/accessor.go index 8fe740b29e..acd272cc73 100644 --- a/share/new_eds/accessor.go +++ b/share/new_eds/accessor.go @@ -25,6 +25,9 @@ type Accessor interface { RowNamespaceData(ctx context.Context, namespace share.Namespace, rowIdx int) (shwap.RowNamespaceData, error) // Shares returns data (ODS) shares extracted from the Accessor. Shares(ctx context.Context) ([]share.Share, error) + // Reader returns binary reader for the file (ODS) shares. It should read the shares from the + // ODS part of the square row by row. + Reader() (io.Reader, error) } // AccessorCloser is an interface that groups Accessor and io.Closer interfaces. diff --git a/share/new_eds/close_once.go b/share/new_eds/close_once.go index f6d90a6332..e8b533c118 100644 --- a/share/new_eds/close_once.go +++ b/share/new_eds/close_once.go @@ -3,6 +3,7 @@ package eds import ( "context" "errors" + "io" "sync/atomic" "github.com/celestiaorg/rsmt2d" @@ -76,3 +77,10 @@ func (c *closeOnce) Shares(ctx context.Context) ([]share.Share, error) { } return c.f.Shares(ctx) } + +func (c *closeOnce) Reader() (io.Reader, error) { + if c.closed.Load() { + return nil, errAccessorClosed + } + return c.f.Reader() +} diff --git a/share/new_eds/close_once_test.go b/share/new_eds/close_once_test.go index 7ba9ada94b..a063423a1c 100644 --- a/share/new_eds/close_once_test.go +++ b/share/new_eds/close_once_test.go @@ -2,7 +2,9 @@ package eds import ( "context" + "io" "testing" + "testing/iotest" "github.com/stretchr/testify/require" @@ -68,6 +70,10 @@ func (s *stubEdsAccessorCloser) Shares(context.Context) ([]share.Share, error) { return nil, nil } +func (s *stubEdsAccessorCloser) Reader() (io.Reader, error) { + return iotest.ErrReader(nil), nil +} + func (s *stubEdsAccessorCloser) Close() error { s.closed = true return nil diff --git a/share/new_eds/nd_test.go b/share/new_eds/nd_test.go index a0780292ef..60fd9888c3 100644 --- a/share/new_eds/nd_test.go +++ b/share/new_eds/nd_test.go @@ -20,7 +20,7 @@ func TestNamespacedData(t *testing.T) { namespace := sharetest.RandV0Namespace() for amount := 1; amount < sharesAmount; amount++ { eds, root := edstest.RandEDSWithNamespace(t, namespace, amount, odsSize) - rsmt2d := Rsmt2D{ExtendedDataSquare: eds} + rsmt2d := &Rsmt2D{ExtendedDataSquare: eds} nd, err := NamespacedData(ctx, root, rsmt2d, namespace) require.NoError(t, err) require.True(t, len(nd) > 0) diff --git a/share/new_eds/reader.go b/share/new_eds/reader.go new file mode 100644 index 0000000000..9d122afcf1 --- /dev/null +++ b/share/new_eds/reader.go @@ -0,0 +1,42 @@ +package eds + +import ( + "bytes" + "errors" + "io" +) + +func NewBufferedReader(w minWriterTo) *BufferedReader { + return &BufferedReader{ + w: w, + buf: bytes.NewBuffer(nil), + } +} + +// BufferedReader will read Shares from inMemOds into the buffer. +// It exposes the buffer to be read by io.Reader interface implementation +type BufferedReader struct { + w minWriterTo + buf *bytes.Buffer +} + +func (r *BufferedReader) Read(p []byte) (int, error) { + // if provided array is smaller than data in buf, read from buf + if len(p) <= r.buf.Len() { + return r.buf.Read(p) + } + + // fill the buffer with data from writer + min := len(p) - r.buf.Len() + n, err := r.w.WriteTo(r.buf, min) + if err != nil && !errors.Is(err, io.EOF) { + return n, err + } + // save remaining buffer for next read + return r.buf.Read(p) +} + +// minWriterTo writes to provided writer at least min amount of bytes +type minWriterTo interface { + WriteTo(writer io.Writer, minAmount int) (int, error) +} diff --git a/share/new_eds/reader_test.go b/share/new_eds/reader_test.go new file mode 100644 index 0000000000..13e9ebd593 --- /dev/null +++ b/share/new_eds/reader_test.go @@ -0,0 +1,85 @@ +package eds + +import ( + "bytes" + crand "crypto/rand" + "errors" + "io" + "math/rand" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNewBufferedReaderMany(t *testing.T) { + // create io.Writer that write random data + for i := 0; i < 10000; i++ { + TestNewBufferedReader(t) + } +} + +func TestNewBufferedReader(t *testing.T) { + // create io.Writer that write random data + size := 200 + randAmount := size + rand.Intn(size) + randBytes := make([]byte, randAmount) + _, err := crand.Read(randBytes) + require.NoError(t, err) + + // randBytes := bytes.Repeat([]byte("1234567890"), 10) + + reader := NewBufferedReader(randMinWriter{bytes.NewReader(randBytes)}) + readBytes, err := readWithRandomBuffer(reader, size/10) + require.NoError(t, err) + require.Equal(t, randBytes, readBytes) +} + +// testRandReader reads from reader with buffers of random sizes. +func readWithRandomBuffer(reader io.Reader, maxBufSize int) ([]byte, error) { + // create buffer of random size + data := make([]byte, 0, maxBufSize) + for { + bufSize := rand.Intn(maxBufSize-1) + 1 + buf := make([]byte, bufSize) + n, err := reader.Read(buf) + if err != nil && !errors.Is(err, io.EOF) { + return nil, err + } + if n < bufSize { + buf = buf[:n] + } + data = append(data, buf...) + if errors.Is(err, io.EOF) { + break + } + } + return data, nil +} + +type randMinWriter struct { + *bytes.Reader +} + +func (lwt randMinWriter) WriteTo(writer io.Writer, limit int) (int, error) { + var amount int + for amount < limit { + bufLn := limit + if bufLn > 1 { + bufLn = rand.Intn(limit-1) + 1 + } + buf := make([]byte, bufLn) + n, err := lwt.Read(buf) + if err != nil { + return amount, err + } + n, err = writer.Write(buf[:n]) + amount += n + if err != nil { + return amount, err + } + if n < bufLn { + return amount, io.EOF + } + } + return amount, nil +} diff --git a/share/new_eds/rsmt2d.go b/share/new_eds/rsmt2d.go index e5ffe6704b..a1d254e45d 100644 --- a/share/new_eds/rsmt2d.go +++ b/share/new_eds/rsmt2d.go @@ -3,6 +3,7 @@ package eds import ( "context" "fmt" + "io" "github.com/celestiaorg/celestia-app/pkg/wrapper" "github.com/celestiaorg/rsmt2d" @@ -11,7 +12,7 @@ import ( "github.com/celestiaorg/celestia-node/share/shwap" ) -var _ Accessor = Rsmt2D{} +// var _ Accessor = Rsmt2D{} // Rsmt2D is a rsmt2d based in-memory implementation of Accessor. type Rsmt2D struct { @@ -19,12 +20,12 @@ type Rsmt2D struct { } // Size returns the size of the Extended Data Square. -func (eds Rsmt2D) Size(context.Context) int { +func (eds *Rsmt2D) Size(context.Context) int { return int(eds.Width()) } // Sample returns share and corresponding proof for row and column indices. -func (eds Rsmt2D) Sample( +func (eds *Rsmt2D) Sample( _ context.Context, rowIdx, colIdx int, ) (shwap.Sample, error) { @@ -33,7 +34,7 @@ func (eds Rsmt2D) Sample( // SampleForProofAxis samples a share from an Extended Data Square based on the provided // row and column indices and proof axis. It returns a sample with the share and proof. -func (eds Rsmt2D) SampleForProofAxis( +func (eds *Rsmt2D) SampleForProofAxis( rowIdx, colIdx int, proofType rsmt2d.Axis, ) (shwap.Sample, error) { @@ -61,7 +62,7 @@ func (eds Rsmt2D) SampleForProofAxis( } // AxisHalf returns Shares for the first half of the axis of the given type and index. -func (eds Rsmt2D) AxisHalf(_ context.Context, axisType rsmt2d.Axis, axisIdx int) (AxisHalf, error) { +func (eds *Rsmt2D) AxisHalf(_ context.Context, axisType rsmt2d.Axis, axisIdx int) (AxisHalf, error) { shares := getAxis(eds.ExtendedDataSquare, axisType, axisIdx) halfShares := shares[:eds.Width()/2] return AxisHalf{ @@ -72,13 +73,13 @@ func (eds Rsmt2D) AxisHalf(_ context.Context, axisType rsmt2d.Axis, axisIdx int) // HalfRow constructs a new shwap.Row from an Extended Data Square based on the specified index and // side. -func (eds Rsmt2D) HalfRow(idx int, side shwap.RowSide) shwap.Row { +func (eds *Rsmt2D) HalfRow(idx int, side shwap.RowSide) shwap.Row { shares := eds.ExtendedDataSquare.Row(uint(idx)) return shwap.RowFromShares(shares, side) } // RowNamespaceData returns data for the given namespace and row index. -func (eds Rsmt2D) RowNamespaceData( +func (eds *Rsmt2D) RowNamespaceData( _ context.Context, namespace share.Namespace, rowIdx int, @@ -89,10 +90,38 @@ func (eds Rsmt2D) RowNamespaceData( // Shares returns data (ODS) shares extracted from the EDS. It returns new copy of the shares each // time. -func (eds Rsmt2D) Shares(_ context.Context) ([]share.Share, error) { +func (eds *Rsmt2D) Shares(_ context.Context) ([]share.Share, error) { return eds.ExtendedDataSquare.FlattenedODS(), nil } +// Reader returns binary reader for the file. +func (eds *Rsmt2D) Reader() (io.Reader, error) { + reader := NewBufferedReader(&edsWriter{ + eds: eds.ExtendedDataSquare, + current: 0, + total: eds.Width() * eds.Width() / 4, + }) + return reader, nil +} + +// ReadFrom reads shares from the provided reader and constructs an Extended Data Square. Provided +// reader should contain shares in row-major order. +func (eds *Rsmt2D) ReadFrom(_ context.Context, r io.Reader, shareSize, edsSize int) error { + odsSize := edsSize / 2 + shares, err := readShares(r, shareSize, odsSize) + if err != nil { + return fmt.Errorf("reading shares: %w", err) + } + treeFn := wrapper.NewConstructor(uint64(odsSize)) + square, err := rsmt2d.ComputeExtendedDataSquare(shares, share.DefaultRSMT2DCodec(), treeFn) + if err != nil { + return fmt.Errorf("computing extended data square: %w", err) + } + + eds.ExtendedDataSquare = square + return nil +} + func getAxis(eds *rsmt2d.ExtendedDataSquare, axisType rsmt2d.Axis, axisIdx int) []share.Share { switch axisType { case rsmt2d.Row: @@ -114,3 +143,48 @@ func relativeIndexes(rowIdx, colIdx int, axisType rsmt2d.Axis) (axisIdx, shrIdx panic(fmt.Sprintf("invalid proof type: %d", axisType)) } } + +type edsWriter struct { + eds *rsmt2d.ExtendedDataSquare + // current is the amount of Shares stored in square that have been read from reader. When current + // reaches total, bufferedODSReader will prevent further reads by returning io.EOF + current, total uint +} + +func (w *edsWriter) WriteTo(writer io.Writer, limit int) (int, error) { + // read Shares to the buffer until it has sufficient data to fill provided container or full square + // is read + if w.current >= w.total { + return 0, io.EOF + } + odsLn := w.eds.Width() / 2 + var written int + for w.current < w.total && written < limit { + rowIdx, colIdx := w.current/(odsLn), w.current%(odsLn) + n, err := writer.Write(w.eds.GetCell(rowIdx, colIdx)) + w.current++ + written += n + if err != nil { + return written, err + } + } + return written, nil +} + +func readShares(r io.Reader, shareSize, odsSize int) ([]share.Share, error) { + shares := make([]share.Share, odsSize*odsSize) + var total int + for i := range shares { + share := make(share.Share, shareSize) + n, err := io.ReadFull(r, share) + if err != nil { + return nil, fmt.Errorf("reading share: %w, bytes read: %v", err, total+n) + } + if n != shareSize { + return nil, fmt.Errorf("share size mismatch: expected %v, got %v", shareSize, n) + } + shares[i] = share + total += n + } + return shares, nil +} diff --git a/share/new_eds/rsmt2d_test.go b/share/new_eds/rsmt2d_test.go index eafcb607ae..0dc9be845a 100644 --- a/share/new_eds/rsmt2d_test.go +++ b/share/new_eds/rsmt2d_test.go @@ -14,7 +14,7 @@ import ( "github.com/celestiaorg/celestia-node/share/shwap" ) -func TestMemFile(t *testing.T) { +func TestRsmt2dFile(t *testing.T) { odsSize := 8 newAccessor := func(tb testing.TB, eds *rsmt2d.ExtendedDataSquare) Accessor { return &Rsmt2D{ExtendedDataSquare: eds} diff --git a/share/new_eds/testing.go b/share/new_eds/testing.go index afaa88b88c..e4e5a5f0a8 100644 --- a/share/new_eds/testing.go +++ b/share/new_eds/testing.go @@ -42,6 +42,10 @@ func TestSuiteAccessor( t.Run("Shares", func(t *testing.T) { testAccessorShares(ctx, t, createAccessor, odsSize) }) + + t.Run("Reader", func(t *testing.T) { + testAccessorReader(ctx, t, createAccessor, odsSize) + }) } func testAccessorSample( @@ -232,6 +236,44 @@ func testAccessorShares( require.Equal(t, expected, shares) } +func testAccessorReader( + ctx context.Context, + t *testing.T, + createAccessor createAccessor, + odsSize int, +) { + eds := edstest.RandEDS(t, odsSize) + f := createAccessor(t, eds) + + // verify that the reader represented by file can be read from + // multiple times, without exhausting the underlying reader. + wg := sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + testReader(ctx, t, f) + }() + } + wg.Wait() +} + +func testReader(ctx context.Context, t *testing.T, ac Accessor) { + reader, err := ac.Reader() + require.NoError(t, err) + + newAccessor := &Rsmt2D{} + err = newAccessor.ReadFrom(ctx, reader, share.Size, ac.Size(ctx)) + require.NoError(t, err) + + require.Equal(t, ac.Size(ctx), newAccessor.Size(ctx)) + expected, err := ac.Shares(ctx) + require.NoError(t, err) + actual, err := newAccessor.Shares(ctx) + require.NoError(t, err) + require.Equal(t, expected, actual) +} + func BenchGetHalfAxisFromAccessor( ctx context.Context, b *testing.B, diff --git a/share/shwap/row_namespace_data_test.go b/share/shwap/row_namespace_data_test.go index 07f434b87e..19f15ef7a6 100644 --- a/share/shwap/row_namespace_data_test.go +++ b/share/shwap/row_namespace_data_test.go @@ -65,7 +65,7 @@ func TestValidateNamespacedRow(t *testing.T) { namespace := sharetest.RandV0Namespace() for amount := 1; amount < sharesAmount; amount++ { randEDS, root := edstest.RandEDSWithNamespace(t, namespace, amount, odsSize) - rsmt2d := eds.Rsmt2D{ExtendedDataSquare: randEDS} + rsmt2d := &eds.Rsmt2D{ExtendedDataSquare: randEDS} nd, err := eds.NamespacedData(ctx, root, rsmt2d, namespace) require.NoError(t, err) require.True(t, len(nd) > 0) @@ -87,7 +87,7 @@ func TestNamespacedRowProtoEncoding(t *testing.T) { const odsSize = 8 namespace := sharetest.RandV0Namespace() randEDS, root := edstest.RandEDSWithNamespace(t, namespace, odsSize, odsSize) - rsmt2d := eds.Rsmt2D{ExtendedDataSquare: randEDS} + rsmt2d := &eds.Rsmt2D{ExtendedDataSquare: randEDS} nd, err := eds.NamespacedData(ctx, root, rsmt2d, namespace) require.NoError(t, err) require.True(t, len(nd) > 0) diff --git a/store/file/ods.go b/store/file/ods.go index 64a224dde8..0d88426ef9 100644 --- a/store/file/ods.go +++ b/store/file/ods.go @@ -187,6 +187,16 @@ func (f *ODSFile) Shares(context.Context) ([]share.Share, error) { return f.ods.shares() } +// Reader returns binary reader for the file. It reads the shares from the ODS part of the square +// row by row. +func (f *ODSFile) Reader() (io.Reader, error) { + err := f.readODS() + if err != nil { + return nil, err + } + return f.ods.reader() +} + func (f *ODSFile) readAxisHalf(axisType rsmt2d.Axis, axisIdx int) (eds.AxisHalf, error) { f.lock.RLock() ODS := f.ods diff --git a/store/file/square.go b/store/file/square.go index 85a0e5aa9e..279afa54b0 100644 --- a/store/file/square.go +++ b/store/file/square.go @@ -46,6 +46,15 @@ func readSquare(r io.Reader, shareSize, edsSize int) (square, error) { return square, nil } +func (s square) reader() (io.Reader, error) { + if s == nil { + return nil, fmt.Errorf("ods file not cached") + } + odsW := newODSWriter(s, s.size()) + odsR := eds.NewBufferedReader(odsW) + return odsR, nil +} + func (s square) size() int { return len(s) } @@ -141,3 +150,34 @@ func oppositeAxis(axis rsmt2d.Axis) rsmt2d.Axis { } return rsmt2d.Col } + +func newODSWriter(s square, size int) *odsWriter { + return &odsWriter{ + ods: s, + total: size * size, + } +} + +type odsWriter struct { + ods square + // current is the amount of Shares stored in square that have been written by odsWriter. When + // current reaches total, odsWriter will prevent further reads by returning io.EOF + current, total int +} + +func (w *odsWriter) WriteTo(writer io.Writer, limit int) (int, error) { + if w.current >= w.total { + return 0, io.EOF + } + var written int + for w.current < w.total && written < limit { + rowIdx, colIdx := w.current/(w.ods.size()), w.current%(w.ods.size()) + n, err := writer.Write(w.ods[rowIdx][colIdx]) + w.current++ + written += n + if err != nil { + return written, err + } + } + return written, nil +} From 71dc4e342f3389c429084838cf0c943d693594b5 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Wed, 26 Jun 2024 23:39:06 +0500 Subject: [PATCH 2/8] fix lint --- share/shwap/p2p/bitswap/block_fetch.go | 6 ++++-- share/shwap/p2p/bitswap/block_fetch_test.go | 2 +- share/shwap/p2p/bitswap/block_store.go | 3 ++- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/share/shwap/p2p/bitswap/block_fetch.go b/share/shwap/p2p/bitswap/block_fetch.go index 4a66eb72ef..7b4543fb26 100644 --- a/share/shwap/p2p/bitswap/block_fetch.go +++ b/share/shwap/p2p/bitswap/block_fetch.go @@ -52,8 +52,10 @@ func Fetch(ctx context.Context, exchg exchange.Interface, root *share.Root, blks } // maxPerFetch sets the limit for maximum items in a single fetch. -// This limit comes from server side default limit size on max possible simultaneous CID WANTs from a peer. -// https://github.com/ipfs/boxo/blob/dfd4a53ba828a368cec8d61c3fe12969ac6aa94c/bitswap/internal/defaults/defaults.go#L29-L30 +// This limit comes from server side default limit size on max possible simultaneous CID WANTs from +// a peer. +// +//https:github.com/ipfs/boxo/blob/dfd4a53ba828a368cec8d61c3fe12969ac6aa94c/bitswap/internal/defaults/defaults.go#L29-L30 const maxPerFetch = 1024 // fetch fetches given Blocks. diff --git a/share/shwap/p2p/bitswap/block_fetch_test.go b/share/shwap/p2p/bitswap/block_fetch_test.go index 1c8aa367e1..59e9384e65 100644 --- a/share/shwap/p2p/bitswap/block_fetch_test.go +++ b/share/shwap/p2p/bitswap/block_fetch_test.go @@ -117,7 +117,7 @@ func TestFetch_Duplicates(t *testing.T) { func newExchangeOverEDS(ctx context.Context, t *testing.T, rsmt2d *rsmt2d.ExtendedDataSquare) exchange.SessionExchange { bstore := &Blockstore{ Getter: testAccessorGetter{ - Accessor: eds.Rsmt2D{ExtendedDataSquare: rsmt2d}, + Accessor: &eds.Rsmt2D{ExtendedDataSquare: rsmt2d}, }, } return newExchange(ctx, t, bstore) diff --git a/share/shwap/p2p/bitswap/block_store.go b/share/shwap/p2p/bitswap/block_store.go index ffc6b0ede7..d42b40704e 100644 --- a/share/shwap/p2p/bitswap/block_store.go +++ b/share/shwap/p2p/bitswap/block_store.go @@ -10,7 +10,8 @@ import ( eds "github.com/celestiaorg/celestia-node/share/new_eds" ) -// AccessorGetter abstracts storage system that indexes and manages multiple eds.AccessorGetter by network height. +// AccessorGetter abstracts storage system that indexes and manages multiple eds.AccessorGetter by +// network height. type AccessorGetter interface { // GetByHeight returns an Accessor by its height. GetByHeight(ctx context.Context, height uint64) (eds.Accessor, error) From b11b360f0b3ae38cd77063be6d48ed41f66b20dd Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Sat, 29 Jun 2024 14:20:24 +0500 Subject: [PATCH 3/8] rework streaming and caching --- share/new_eds/accessor.go | 22 ++++---- share/new_eds/close_once.go | 6 +-- share/new_eds/proofs_cache.go | 52 ++++++++++++++---- share/new_eds/proofs_cache_test.go | 11 ++-- share/new_eds/reader.go | 85 ++++++++++++++++++++++++------ share/new_eds/reader_test.go | 64 ++++++++-------------- share/new_eds/rsmt2d.go | 77 ++++++--------------------- share/new_eds/rsmt2d_test.go | 7 ++- share/new_eds/testing.go | 36 +++++++------ share/new_eds/validation_test.go | 6 +-- store/file/ods.go | 47 +++++++++++++---- store/file/ods_test.go | 21 +++++++- store/file/q1q4_file.go | 6 ++- store/file/square.go | 64 ++++------------------ 14 files changed, 274 insertions(+), 230 deletions(-) diff --git a/share/new_eds/accessor.go b/share/new_eds/accessor.go index acd272cc73..7e3ab5436d 100644 --- a/share/new_eds/accessor.go +++ b/share/new_eds/accessor.go @@ -25,22 +25,26 @@ type Accessor interface { RowNamespaceData(ctx context.Context, namespace share.Namespace, rowIdx int) (shwap.RowNamespaceData, error) // Shares returns data (ODS) shares extracted from the Accessor. Shares(ctx context.Context) ([]share.Share, error) - // Reader returns binary reader for the file (ODS) shares. It should read the shares from the - // ODS part of the square row by row. - Reader() (io.Reader, error) } -// AccessorCloser is an interface that groups Accessor and io.Closer interfaces. -type AccessorCloser interface { +// AccessorStreamer is an interface that groups Accessor and Streamer interfaces. +type AccessorStreamer interface { Accessor + Streamer +} + +type Streamer interface { + // Reader returns binary reader for the file (ODS) shares. It should read the shares from the + // ODS part of the square row by row. + Reader() (io.Reader, error) io.Closer } -type accessorCloser struct { +type accessorStreamer struct { Accessor - io.Closer + Streamer } -func WithCloser(a Accessor, c io.Closer) AccessorCloser { - return &accessorCloser{a, c} +func WithStreamer(a Accessor, s Streamer) AccessorStreamer { + return &accessorStreamer{a, s} } diff --git a/share/new_eds/close_once.go b/share/new_eds/close_once.go index e8b533c118..c05851db7c 100644 --- a/share/new_eds/close_once.go +++ b/share/new_eds/close_once.go @@ -12,16 +12,16 @@ import ( "github.com/celestiaorg/celestia-node/share/shwap" ) -var _ AccessorCloser = (*closeOnce)(nil) +var _ AccessorStreamer = (*closeOnce)(nil) var errAccessorClosed = errors.New("accessor is closed") type closeOnce struct { - f AccessorCloser + f AccessorStreamer closed atomic.Bool } -func WithClosedOnce(f AccessorCloser) AccessorCloser { +func WithClosedOnce(f AccessorStreamer) AccessorStreamer { return &closeOnce{f: f} } diff --git a/share/new_eds/proofs_cache.go b/share/new_eds/proofs_cache.go index 068b4c4d0b..34fab7bbb6 100644 --- a/share/new_eds/proofs_cache.go +++ b/share/new_eds/proofs_cache.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "sync" "sync/atomic" @@ -20,13 +21,13 @@ import ( "github.com/celestiaorg/celestia-node/share/shwap" ) -var _ Accessor = (*proofsCache)(nil) +var _ AccessorStreamer = (*proofsCache)(nil) // proofsCache is eds accessor that caches proofs for rows and columns. It also caches extended // axis Shares. It is used to speed up the process of building proofs for rows and columns, // reducing the number of reads from the underlying accessor. type proofsCache struct { - inner Accessor + inner AccessorStreamer // lock protects axisCache lock sync.RWMutex @@ -57,7 +58,7 @@ type axisWithProofs struct { // WithProofsCache creates a new eds accessor with caching of proofs for rows and columns. It is // used to speed up the process of building proofs for rows and columns, reducing the number of // reads from the underlying accessor. -func WithProofsCache(ac Accessor) Accessor { +func WithProofsCache(ac AccessorStreamer) AccessorStreamer { rows := make(map[int]axisWithProofs) cols := make(map[int]axisWithProofs) axisCache := []map[int]axisWithProofs{rows, cols} @@ -211,21 +212,28 @@ func (c *proofsCache) Shares(ctx context.Context) ([]share.Share, error) { return shares, nil } +func (c *proofsCache) Reader() (io.Reader, error) { + odsSize := c.Size(context.TODO()) / 2 + reader := NewSharesReader(odsSize, c.getShare) + return reader, nil +} + +func (c *proofsCache) Close() error { + return c.inner.Close() +} + func (c *proofsCache) axisShares(ctx context.Context, axisType rsmt2d.Axis, axisIdx int) ([]share.Share, error) { ax, ok := c.getAxisFromCache(axisType, axisIdx) if ok && ax.shares != nil { return ax.shares, nil } - if len(ax.half.Shares) == 0 { - half, err := c.AxisHalf(ctx, axisType, axisIdx) - if err != nil { - return nil, err - } - ax.half = half + half, err := c.AxisHalf(ctx, axisType, axisIdx) + if err != nil { + return nil, err } - shares, err := ax.half.Extended() + shares, err := half.Extended() if err != nil { return nil, fmt.Errorf("extending shares: %w", err) } @@ -250,6 +258,30 @@ func (c *proofsCache) getAxisFromCache(axisType rsmt2d.Axis, axisIdx int) (axisW return ax, ok } +func (c *proofsCache) getShare(rowIdx, colIdx int) ([]byte, error) { + ctx := context.TODO() + odsSize := c.Size(ctx) / 2 + half, err := c.AxisHalf(ctx, rsmt2d.Row, rowIdx) + if err != nil { + return nil, fmt.Errorf("reading axis half: %w", err) + } + + // if share is from the same side of axis return share right away + if colIdx > odsSize == half.IsParity { + if half.IsParity { + colIdx = colIdx - odsSize + } + return half.Shares[colIdx], nil + } + + // if share index is from opposite part of axis, obtain full axis shares + shares, err := c.axisShares(ctx, rsmt2d.Row, rowIdx) + if err != nil { + return nil, fmt.Errorf("reading axis shares: %w", err) + } + return shares[colIdx], nil +} + // rowProofsGetter implements blockservice.BlockGetter interface type rowProofsGetter struct { proofs map[cid.Cid]blocks.Block diff --git a/share/new_eds/proofs_cache_test.go b/share/new_eds/proofs_cache_test.go index 8b22af6e4f..76f285de4d 100644 --- a/share/new_eds/proofs_cache_test.go +++ b/share/new_eds/proofs_cache_test.go @@ -9,14 +9,19 @@ import ( ) func TestCache(t *testing.T) { - size := 8 + ODSSize := 8 ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) - withProofsCache := func(tb testing.TB, inner *rsmt2d.ExtendedDataSquare) Accessor { + newAccessor := func(tb testing.TB, inner *rsmt2d.ExtendedDataSquare) Accessor { accessor := &Rsmt2D{ExtendedDataSquare: inner} return WithProofsCache(accessor) } + TestSuiteAccessor(ctx, t, newAccessor, ODSSize) - TestSuiteAccessor(ctx, t, withProofsCache, size) + newAccessorStreamer := func(tb testing.TB, inner *rsmt2d.ExtendedDataSquare) AccessorStreamer { + accessor := &Rsmt2D{ExtendedDataSquare: inner} + return WithProofsCache(accessor) + } + TestStreamer(ctx, t, newAccessorStreamer, ODSSize) } diff --git a/share/new_eds/reader.go b/share/new_eds/reader.go index 9d122afcf1..ca999c0286 100644 --- a/share/new_eds/reader.go +++ b/share/new_eds/reader.go @@ -3,40 +3,95 @@ package eds import ( "bytes" "errors" + "fmt" "io" + + "github.com/celestiaorg/celestia-node/share" ) -func NewBufferedReader(w minWriterTo) *BufferedReader { +func NewSharesReader(odsSize int, getShare func(rowIdx, colIdx int) ([]byte, error)) *BufferedReader { return &BufferedReader{ - w: w, - buf: bytes.NewBuffer(nil), + getShare: getShare, + buf: bytes.NewBuffer(nil), + odsSize: odsSize, + total: odsSize * odsSize, } } // BufferedReader will read Shares from inMemOds into the buffer. // It exposes the buffer to be read by io.Reader interface implementation type BufferedReader struct { - w minWriterTo - buf *bytes.Buffer + buf *bytes.Buffer + getShare func(rowIdx, colIdx int) ([]byte, error) + // current is the amount of Shares stored in square that have been written by squareCopy. When + // current reaches total, squareCopy will prevent further reads by returning io.EOF + current, odsSize, total int } func (r *BufferedReader) Read(p []byte) (int, error) { + if r.current >= r.total && r.buf.Len() == 0 { + return 0, io.EOF + } // if provided array is smaller than data in buf, read from buf if len(p) <= r.buf.Len() { return r.buf.Read(p) } + n, err := io.ReadFull(r.buf, p) + if err == nil { + return n, nil + } + if !errors.Is(err, io.ErrUnexpectedEOF) && !errors.Is(err, io.EOF) { + return n, fmt.Errorf("unexpected error reading from buf: %w", err) + } - // fill the buffer with data from writer - min := len(p) - r.buf.Len() - n, err := r.w.WriteTo(r.buf, min) - if err != nil && !errors.Is(err, io.EOF) { - return n, err + written := n + for r.current < r.total { + rowIdx, colIdx := r.current/r.odsSize, r.current%r.odsSize + share, err := r.getShare(rowIdx, colIdx) + if err != nil { + return 0, fmt.Errorf("get share; %w", err) + } + + // copy share to provided buffer + emptySpace := len(p) - written + r.current++ + if len(share) < emptySpace { + n := copy(p[written:], share) + written += n + continue + } + + // if share didn't fit into buffer fully, store remaining bytes into inner buf + n := copy(p[written:], share[:emptySpace]) + written += n + n, err = r.buf.Write(share[emptySpace:]) + if err != nil { + return 0, fmt.Errorf("write share to inner buffer: %w", err) + } + if n != len(share)-emptySpace { + return 0, fmt.Errorf("share was not written fully: %w", io.ErrShortWrite) + } + return written, nil } - // save remaining buffer for next read - return r.buf.Read(p) + return written, nil } -// minWriterTo writes to provided writer at least min amount of bytes -type minWriterTo interface { - WriteTo(writer io.Writer, minAmount int) (int, error) +// ReadShares reads shares from the provided reader and constructs an Extended Data Square. Provided +// reader should contain shares in row-major order. +func ReadShares(r io.Reader, shareSize, odsSize int) ([]share.Share, error) { + shares := make([]share.Share, odsSize*odsSize) + var total int + for i := range shares { + share := make(share.Share, shareSize) + n, err := io.ReadFull(r, share) + if err != nil { + return nil, fmt.Errorf("reading share: %w, bytes read: %v", err, total+n) + } + if n != shareSize { + return nil, fmt.Errorf("share size mismatch: expected %v, got %v", shareSize, n) + } + shares[i] = share + total += n + } + return shares, nil } diff --git a/share/new_eds/reader_test.go b/share/new_eds/reader_test.go index 13e9ebd593..7f90fc815a 100644 --- a/share/new_eds/reader_test.go +++ b/share/new_eds/reader_test.go @@ -1,9 +1,10 @@ package eds import ( - "bytes" - crand "crypto/rand" "errors" + "fmt" + "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/eds/edstest" "io" "math/rand" "testing" @@ -11,27 +12,31 @@ import ( "github.com/stretchr/testify/require" ) -func TestNewBufferedReaderMany(t *testing.T) { +func TestSharesReaderMany(t *testing.T) { // create io.Writer that write random data for i := 0; i < 10000; i++ { - TestNewBufferedReader(t) + TestSharesReader(t) } } -func TestNewBufferedReader(t *testing.T) { +func TestSharesReader(t *testing.T) { // create io.Writer that write random data - size := 200 - randAmount := size + rand.Intn(size) - randBytes := make([]byte, randAmount) - _, err := crand.Read(randBytes) - require.NoError(t, err) - - // randBytes := bytes.Repeat([]byte("1234567890"), 10) + odsSize := 16 + eds := edstest.RandEDS(t, odsSize) + getShare := func(rowIdx, colIdx int) ([]byte, error) { + fmt.Println("get", rowIdx, colIdx) + return eds.GetCell(uint(rowIdx), uint(colIdx)), nil + } - reader := NewBufferedReader(randMinWriter{bytes.NewReader(randBytes)}) - readBytes, err := readWithRandomBuffer(reader, size/10) + reader := NewSharesReader(odsSize, getShare) + readBytes, err := readWithRandomBuffer(reader, 1024) require.NoError(t, err) - require.Equal(t, randBytes, readBytes) + expected := make([]byte, 0, odsSize*odsSize*share.Size) + for _, share := range eds.FlattenedODS() { + expected = append(expected, share...) + } + require.Len(t, readBytes, len(expected)) + require.Equal(t, expected, readBytes) } // testRandReader reads from reader with buffers of random sizes. @@ -50,36 +55,9 @@ func readWithRandomBuffer(reader io.Reader, maxBufSize int) ([]byte, error) { } data = append(data, buf...) if errors.Is(err, io.EOF) { + fmt.Println("eof?") break } } return data, nil } - -type randMinWriter struct { - *bytes.Reader -} - -func (lwt randMinWriter) WriteTo(writer io.Writer, limit int) (int, error) { - var amount int - for amount < limit { - bufLn := limit - if bufLn > 1 { - bufLn = rand.Intn(limit-1) + 1 - } - buf := make([]byte, bufLn) - n, err := lwt.Read(buf) - if err != nil { - return amount, err - } - n, err = writer.Write(buf[:n]) - amount += n - if err != nil { - return amount, err - } - if n < bufLn { - return amount, io.EOF - } - } - return amount, nil -} diff --git a/share/new_eds/rsmt2d.go b/share/new_eds/rsmt2d.go index a1d254e45d..6bf33fd061 100644 --- a/share/new_eds/rsmt2d.go +++ b/share/new_eds/rsmt2d.go @@ -12,7 +12,7 @@ import ( "github.com/celestiaorg/celestia-node/share/shwap" ) -// var _ Accessor = Rsmt2D{} +var _ AccessorStreamer = (*Rsmt2D)(nil) // Rsmt2D is a rsmt2d based in-memory implementation of Accessor. type Rsmt2D struct { @@ -94,32 +94,30 @@ func (eds *Rsmt2D) Shares(_ context.Context) ([]share.Share, error) { return eds.ExtendedDataSquare.FlattenedODS(), nil } +func (eds *Rsmt2D) Close() error { + return nil +} + // Reader returns binary reader for the file. func (eds *Rsmt2D) Reader() (io.Reader, error) { - reader := NewBufferedReader(&edsWriter{ - eds: eds.ExtendedDataSquare, - current: 0, - total: eds.Width() * eds.Width() / 4, - }) + getShare := func(rowIdx, colIdx int) ([]byte, error) { + return eds.GetCell(uint(rowIdx), uint(colIdx)), nil + } + odsSize := int(eds.Width() / 2) + reader := NewSharesReader(odsSize, getShare) return reader, nil } -// ReadFrom reads shares from the provided reader and constructs an Extended Data Square. Provided +// Rsmt2DFromShares reads shares from the provided reader and constructs an Extended Data Square. Provided // reader should contain shares in row-major order. -func (eds *Rsmt2D) ReadFrom(_ context.Context, r io.Reader, shareSize, edsSize int) error { - odsSize := edsSize / 2 - shares, err := readShares(r, shareSize, odsSize) - if err != nil { - return fmt.Errorf("reading shares: %w", err) - } +func Rsmt2DFromShares(shares []share.Share, odsSize int) (Rsmt2D, error) { treeFn := wrapper.NewConstructor(uint64(odsSize)) - square, err := rsmt2d.ComputeExtendedDataSquare(shares, share.DefaultRSMT2DCodec(), treeFn) + eds, err := rsmt2d.ComputeExtendedDataSquare(shares, share.DefaultRSMT2DCodec(), treeFn) if err != nil { - return fmt.Errorf("computing extended data square: %w", err) + return Rsmt2D{}, fmt.Errorf("computing extended data square: %w", err) } - eds.ExtendedDataSquare = square - return nil + return Rsmt2D{eds}, nil } func getAxis(eds *rsmt2d.ExtendedDataSquare, axisType rsmt2d.Axis, axisIdx int) []share.Share { @@ -143,48 +141,3 @@ func relativeIndexes(rowIdx, colIdx int, axisType rsmt2d.Axis) (axisIdx, shrIdx panic(fmt.Sprintf("invalid proof type: %d", axisType)) } } - -type edsWriter struct { - eds *rsmt2d.ExtendedDataSquare - // current is the amount of Shares stored in square that have been read from reader. When current - // reaches total, bufferedODSReader will prevent further reads by returning io.EOF - current, total uint -} - -func (w *edsWriter) WriteTo(writer io.Writer, limit int) (int, error) { - // read Shares to the buffer until it has sufficient data to fill provided container or full square - // is read - if w.current >= w.total { - return 0, io.EOF - } - odsLn := w.eds.Width() / 2 - var written int - for w.current < w.total && written < limit { - rowIdx, colIdx := w.current/(odsLn), w.current%(odsLn) - n, err := writer.Write(w.eds.GetCell(rowIdx, colIdx)) - w.current++ - written += n - if err != nil { - return written, err - } - } - return written, nil -} - -func readShares(r io.Reader, shareSize, odsSize int) ([]share.Share, error) { - shares := make([]share.Share, odsSize*odsSize) - var total int - for i := range shares { - share := make(share.Share, shareSize) - n, err := io.ReadFull(r, share) - if err != nil { - return nil, fmt.Errorf("reading share: %w, bytes read: %v", err, total+n) - } - if n != shareSize { - return nil, fmt.Errorf("share size mismatch: expected %v, got %v", shareSize, n) - } - shares[i] = share - total += n - } - return shares, nil -} diff --git a/share/new_eds/rsmt2d_test.go b/share/new_eds/rsmt2d_test.go index 0dc9be845a..764b8a1ea8 100644 --- a/share/new_eds/rsmt2d_test.go +++ b/share/new_eds/rsmt2d_test.go @@ -14,7 +14,7 @@ import ( "github.com/celestiaorg/celestia-node/share/shwap" ) -func TestRsmt2dFile(t *testing.T) { +func TestRsmt2dAccessor(t *testing.T) { odsSize := 8 newAccessor := func(tb testing.TB, eds *rsmt2d.ExtendedDataSquare) Accessor { return &Rsmt2D{ExtendedDataSquare: eds} @@ -23,6 +23,11 @@ func TestRsmt2dFile(t *testing.T) { t.Cleanup(cancel) TestSuiteAccessor(ctx, t, newAccessor, odsSize) + + newStreamer := func(tb testing.TB, eds *rsmt2d.ExtendedDataSquare) AccessorStreamer { + return &Rsmt2D{ExtendedDataSquare: eds} + } + TestStreamer(ctx, t, newStreamer, odsSize) } func TestRsmt2dHalfRow(t *testing.T) { diff --git a/share/new_eds/testing.go b/share/new_eds/testing.go index 624cd5bbb6..c195a8cb98 100644 --- a/share/new_eds/testing.go +++ b/share/new_eds/testing.go @@ -18,7 +18,10 @@ import ( "github.com/celestiaorg/celestia-node/share/shwap" ) -type createAccessor func(testing.TB, *rsmt2d.ExtendedDataSquare) Accessor +type ( + createAccessor func(testing.TB, *rsmt2d.ExtendedDataSquare) Accessor + createAccessorStreamer func(testing.TB, *rsmt2d.ExtendedDataSquare) AccessorStreamer +) // TestSuiteAccessor runs a suite of tests for the given Accessor implementation. func TestSuiteAccessor( @@ -42,9 +45,16 @@ func TestSuiteAccessor( t.Run("Shares", func(t *testing.T) { testAccessorShares(ctx, t, createAccessor, odsSize) }) +} +func TestStreamer( + ctx context.Context, + t *testing.T, + create createAccessorStreamer, + odsSize int, +) { t.Run("Reader", func(t *testing.T) { - testAccessorReader(ctx, t, createAccessor, odsSize) + testAccessorReader(ctx, t, create, odsSize) }) } @@ -239,11 +249,11 @@ func testAccessorShares( func testAccessorReader( ctx context.Context, t *testing.T, - createAccessor createAccessor, + create createAccessorStreamer, odsSize int, ) { eds := edstest.RandEDS(t, odsSize) - f := createAccessor(t, eds) + f := create(t, eds) // verify that the reader represented by file can be read from // multiple times, without exhausting the underlying reader. @@ -252,26 +262,22 @@ func testAccessorReader( wg.Add(1) go func() { defer wg.Done() - testReader(ctx, t, f) + testReader(ctx, t, eds, f) }() } wg.Wait() } -func testReader(ctx context.Context, t *testing.T, ac Accessor) { - reader, err := ac.Reader() - require.NoError(t, err) - - newAccessor := &Rsmt2D{} - err = newAccessor.ReadFrom(ctx, reader, share.Size, ac.Size(ctx)) +func testReader(ctx context.Context, t *testing.T, eds *rsmt2d.ExtendedDataSquare, as AccessorStreamer) { + reader, err := as.Reader() require.NoError(t, err) - require.Equal(t, ac.Size(ctx), newAccessor.Size(ctx)) - expected, err := ac.Shares(ctx) + odsSize := as.Size(ctx) / 2 + shares, err := ReadShares(reader, share.Size, odsSize) require.NoError(t, err) - actual, err := newAccessor.Shares(ctx) + actual, err := Rsmt2DFromShares(shares, odsSize) require.NoError(t, err) - require.Equal(t, expected, actual) + require.True(t, eds.Equals(actual.ExtendedDataSquare)) } func BenchGetHalfAxisFromAccessor( diff --git a/share/new_eds/validation_test.go b/share/new_eds/validation_test.go index ad4e6f2efc..aa75116259 100644 --- a/share/new_eds/validation_test.go +++ b/share/new_eds/validation_test.go @@ -32,7 +32,7 @@ func TestValidation_Sample(t *testing.T) { t.Run(tt.name, func(t *testing.T) { randEDS := edstest.RandEDS(t, tt.odsSize) accessor := &Rsmt2D{ExtendedDataSquare: randEDS} - validation := WithValidation(WithCloser(accessor, nil)) + validation := WithValidation(WithStreamer(accessor, nil)) _, err := validation.Sample(context.Background(), tt.rowIdx, tt.colIdx) if tt.expectFail { @@ -61,7 +61,7 @@ func TestValidation_AxisHalf(t *testing.T) { t.Run(tt.name, func(t *testing.T) { randEDS := edstest.RandEDS(t, tt.odsSize) accessor := &Rsmt2D{ExtendedDataSquare: randEDS} - validation := WithValidation(WithCloser(accessor, nil)) + validation := WithValidation(WithStreamer(accessor, nil)) _, err := validation.AxisHalf(context.Background(), tt.axisType, tt.axisIdx) if tt.expectFail { @@ -89,7 +89,7 @@ func TestValidation_RowNamespaceData(t *testing.T) { t.Run(tt.name, func(t *testing.T) { randEDS := edstest.RandEDS(t, tt.odsSize) accessor := &Rsmt2D{ExtendedDataSquare: randEDS} - validation := WithValidation(WithCloser(accessor, nil)) + validation := WithValidation(WithStreamer(accessor, nil)) ns := sharetest.RandV0Namespace() _, err := validation.RowNamespaceData(context.Background(), ns, tt.rowIdx) diff --git a/store/file/ods.go b/store/file/ods.go index a8334883aa..f21e3350f9 100644 --- a/store/file/ods.go +++ b/store/file/ods.go @@ -14,7 +14,7 @@ import ( "github.com/celestiaorg/celestia-node/share/shwap" ) -var _ eds.AccessorCloser = (*ODSFile)(nil) +var _ eds.AccessorStreamer = (*ODSFile)(nil) type ODSFile struct { path string @@ -194,18 +194,23 @@ func (f *ODSFile) Shares(context.Context) ([]share.Share, error) { // Reader returns binary reader for the file. It reads the shares from the ODS part of the square // row by row. func (f *ODSFile) Reader() (io.Reader, error) { - err := f.readODS() - if err != nil { - return nil, err + f.lock.RLock() + ods := f.ods + f.lock.RUnlock() + if ods != nil { + return ods.reader() } - return f.ods.reader() + + offset := f.hdr.Size() + reader := newFileReader(f.fl, offset, int(f.hdr.shareSize), f.size()) + return reader, nil } func (f *ODSFile) readAxisHalf(axisType rsmt2d.Axis, axisIdx int) (eds.AxisHalf, error) { f.lock.RLock() - ODS := f.ods + ods := f.ods f.lock.RUnlock() - if ODS != nil { + if ods != nil { return f.ods.axisHalf(axisType, axisIdx) } @@ -227,11 +232,12 @@ func (f *ODSFile) readAxisHalf(axisType rsmt2d.Axis, axisIdx int) (eds.AxisHalf, } func (f *ODSFile) readODS() (square, error) { - f.lock.Lock() - defer f.lock.Unlock() + f.lock.RLock() if f.ods != nil { + f.lock.RUnlock() return f.ods, nil } + f.lock.RUnlock() // reset file pointer to the beginning of the file shares data _, err := f.fl.Seek(int64(f.hdr.Size()), io.SeekStart) @@ -245,7 +251,9 @@ func (f *ODSFile) readODS() (square, error) { } if !f.disableCache { + f.lock.Lock() f.ods = square + f.lock.Unlock() } return square, nil } @@ -298,3 +306,24 @@ func (f *ODSFile) axis(ctx context.Context, axisType rsmt2d.Axis, axisIdx int) ( return half.Extended() } + +type fileReader struct { + r io.ReaderAt + current, total int64 +} + +func newFileReader(r io.ReaderAt, offset, shareSize, odsSize int) *fileReader { + return &fileReader{ + r: r, + current: int64(offset), + total: int64(odsSize*odsSize*shareSize + offset), + } +} + +func (w *fileReader) Read(p []byte) (int, error) { + if w.current >= w.total { + return 0, io.EOF + } + + return w.r.ReadAt(p, w.current) +} diff --git a/store/file/ods_test.go b/store/file/ods_test.go index ee61c84281..155c885eaf 100644 --- a/store/file/ods_test.go +++ b/store/file/ods_test.go @@ -62,7 +62,9 @@ func TestODSFile(t *testing.T) { t.Cleanup(cancel) ODSSize := 8 - eds.TestSuiteAccessor(ctx, t, createODSFile, ODSSize) + eds.TestSuiteAccessor(ctx, t, createAccessor, ODSSize) + eds.TestStreamer(ctx, t, createCachedStreamer, ODSSize) + eds.TestStreamer(ctx, t, createStreamer, ODSSize) } // BenchmarkAxisFromODSFile/Size:32/ProofType:row/squareHalf:0-10 460231 2555 ns/op @@ -161,7 +163,22 @@ func BenchmarkSampleFromODSFileDisabledCache(b *testing.B) { eds.BenchGetSampleFromAccessor(ctx, b, newFile, minSize, maxSize) } -func createODSFile(t testing.TB, eds *rsmt2d.ExtendedDataSquare) eds.Accessor { +func createAccessor(t testing.TB, eds *rsmt2d.ExtendedDataSquare) eds.Accessor { + return createODSFile(t, eds) +} + +func createStreamer(t testing.TB, eds *rsmt2d.ExtendedDataSquare) eds.AccessorStreamer { + return createODSFile(t, eds) +} + +func createCachedStreamer(t testing.TB, eds *rsmt2d.ExtendedDataSquare) eds.AccessorStreamer { + f := createODSFile(t, eds) + _, err := f.readODS() + require.NoError(t, err) + return f +} + +func createODSFile(t testing.TB, eds *rsmt2d.ExtendedDataSquare) *ODSFile { path := t.TempDir() + "/" + strconv.Itoa(rand.Intn(1000)) fl, err := CreateODSFile(path, []byte{}, eds) require.NoError(t, err) diff --git a/store/file/q1q4_file.go b/store/file/q1q4_file.go index b23bd5f9b4..8b0bed86a9 100644 --- a/store/file/q1q4_file.go +++ b/store/file/q1q4_file.go @@ -12,7 +12,7 @@ import ( "github.com/celestiaorg/celestia-node/share/shwap" ) -var _ eds.AccessorCloser = (*Q1Q4File)(nil) +var _ eds.AccessorStreamer = (*Q1Q4File)(nil) // Q1Q4File represents a file that contains the first and fourth quadrants of an extended data // square. It extends the ODSFile with the ability to read the fourth quadrant of the square. @@ -98,6 +98,10 @@ func (f *Q1Q4File) Shares(ctx context.Context) ([]share.Share, error) { return f.ods.Shares(ctx) } +func (f *Q1Q4File) Reader() (io.Reader, error) { + return f.ods.Reader() +} + func (f *Q1Q4File) Close() error { return f.ods.Close() } diff --git a/store/file/square.go b/store/file/square.go index 279afa54b0..cd0b8ea5aa 100644 --- a/store/file/square.go +++ b/store/file/square.go @@ -1,7 +1,6 @@ package file import ( - "bufio" "fmt" "io" @@ -21,27 +20,13 @@ type square [][]share.Share func readSquare(r io.Reader, shareSize, edsSize int) (square, error) { odsLn := edsSize / 2 + shares, err := eds.ReadShares(r, shareSize, odsLn) + if err != nil { + return nil, fmt.Errorf("reading shares: %w", err) + } square := make(square, odsLn) for i := range square { - square[i] = make([]share.Share, odsLn) - for j := range square[i] { - square[i][j] = make(share.Share, shareSize) - } - } - - br := bufio.NewReaderSize(r, 4096) - var total int - for i := 0; i < odsLn; i++ { - for j := 0; j < odsLn; j++ { - n, err := io.ReadFull(br, square[i][j]) - if err != nil { - return nil, fmt.Errorf("reading share: %w, bytes read: %v", err, total+n) - } - if n != shareSize { - return nil, fmt.Errorf("share size mismatch: expected %v, got %v", shareSize, n) - } - total += n - } + square[i] = shares[i*odsLn : (i+1)*odsLn] } return square, nil } @@ -50,9 +35,11 @@ func (s square) reader() (io.Reader, error) { if s == nil { return nil, fmt.Errorf("ods file not cached") } - odsW := newODSWriter(s, s.size()) - odsR := eds.NewBufferedReader(odsW) - return odsR, nil + getShare := func(rowIdx, colIdx int) ([]byte, error) { + return s[rowIdx][colIdx], nil + } + reader := eds.NewSharesReader(s.size(), getShare) + return reader, nil } func (s square) size() int { @@ -150,34 +137,3 @@ func oppositeAxis(axis rsmt2d.Axis) rsmt2d.Axis { } return rsmt2d.Col } - -func newODSWriter(s square, size int) *odsWriter { - return &odsWriter{ - ods: s, - total: size * size, - } -} - -type odsWriter struct { - ods square - // current is the amount of Shares stored in square that have been written by odsWriter. When - // current reaches total, odsWriter will prevent further reads by returning io.EOF - current, total int -} - -func (w *odsWriter) WriteTo(writer io.Writer, limit int) (int, error) { - if w.current >= w.total { - return 0, io.EOF - } - var written int - for w.current < w.total && written < limit { - rowIdx, colIdx := w.current/(w.ods.size()), w.current%(w.ods.size()) - n, err := writer.Write(w.ods[rowIdx][colIdx]) - w.current++ - written += n - if err != nil { - return written, err - } - } - return written, nil -} From dd917de8e08c65f036ee012135f398bb0d46efbc Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Mon, 1 Jul 2024 17:39:08 +0200 Subject: [PATCH 4/8] fix linter --- share/new_eds/reader_test.go | 5 +++-- share/new_eds/rsmt2d.go | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/share/new_eds/reader_test.go b/share/new_eds/reader_test.go index 7f90fc815a..7c2ab57e82 100644 --- a/share/new_eds/reader_test.go +++ b/share/new_eds/reader_test.go @@ -3,13 +3,14 @@ package eds import ( "errors" "fmt" - "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/eds/edstest" "io" "math/rand" "testing" "github.com/stretchr/testify/require" + + "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/eds/edstest" ) func TestSharesReaderMany(t *testing.T) { diff --git a/share/new_eds/rsmt2d.go b/share/new_eds/rsmt2d.go index 6bf33fd061..7af740acc3 100644 --- a/share/new_eds/rsmt2d.go +++ b/share/new_eds/rsmt2d.go @@ -108,8 +108,8 @@ func (eds *Rsmt2D) Reader() (io.Reader, error) { return reader, nil } -// Rsmt2DFromShares reads shares from the provided reader and constructs an Extended Data Square. Provided -// reader should contain shares in row-major order. +// Rsmt2DFromShares reads shares from the provided reader and constructs an Extended Data Square. +// Provided reader should contain shares in row-major order. func Rsmt2DFromShares(shares []share.Share, odsSize int) (Rsmt2D, error) { treeFn := wrapper.NewConstructor(uint64(odsSize)) eds, err := rsmt2d.ComputeExtendedDataSquare(shares, share.DefaultRSMT2DCodec(), treeFn) From ac08071aa9005c7cadd3c45fb658dac86be6e995 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Tue, 2 Jul 2024 15:54:51 +0200 Subject: [PATCH 5/8] fix linter --- share/new_eds/accessor.go | 4 +- share/new_eds/proofs_cache.go | 2 +- share/new_eds/proofs_cache_test.go | 2 +- share/new_eds/reader.go | 22 +++++----- share/new_eds/rsmt2d.go | 3 +- share/new_eds/rsmt2d_test.go | 2 +- share/new_eds/testing.go | 46 ++++++++++++++------- share/new_eds/validation_test.go | 6 +-- share/shwap/p2p/bitswap/block_fetch_test.go | 4 +- store/file/ods.go | 34 ++++----------- store/file/ods_test.go | 2 +- store/file/q1q4_file_test.go | 2 +- 12 files changed, 61 insertions(+), 68 deletions(-) diff --git a/share/new_eds/accessor.go b/share/new_eds/accessor.go index 7e3ab5436d..8ced738fa4 100644 --- a/share/new_eds/accessor.go +++ b/share/new_eds/accessor.go @@ -34,7 +34,7 @@ type AccessorStreamer interface { } type Streamer interface { - // Reader returns binary reader for the file (ODS) shares. It should read the shares from the + // Reader returns binary reader for the shares. It should read the shares from the // ODS part of the square row by row. Reader() (io.Reader, error) io.Closer @@ -45,6 +45,6 @@ type accessorStreamer struct { Streamer } -func WithStreamer(a Accessor, s Streamer) AccessorStreamer { +func AccessorAndStreamer(a Accessor, s Streamer) AccessorStreamer { return &accessorStreamer{a, s} } diff --git a/share/new_eds/proofs_cache.go b/share/new_eds/proofs_cache.go index 34fab7bbb6..faf3e580ef 100644 --- a/share/new_eds/proofs_cache.go +++ b/share/new_eds/proofs_cache.go @@ -269,7 +269,7 @@ func (c *proofsCache) getShare(rowIdx, colIdx int) ([]byte, error) { // if share is from the same side of axis return share right away if colIdx > odsSize == half.IsParity { if half.IsParity { - colIdx = colIdx - odsSize + colIdx -= odsSize } return half.Shares[colIdx], nil } diff --git a/share/new_eds/proofs_cache_test.go b/share/new_eds/proofs_cache_test.go index 76f285de4d..4552e4b862 100644 --- a/share/new_eds/proofs_cache_test.go +++ b/share/new_eds/proofs_cache_test.go @@ -9,7 +9,7 @@ import ( ) func TestCache(t *testing.T) { - ODSSize := 8 + ODSSize := 64 ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) diff --git a/share/new_eds/reader.go b/share/new_eds/reader.go index ca999c0286..bef11527e2 100644 --- a/share/new_eds/reader.go +++ b/share/new_eds/reader.go @@ -9,16 +9,7 @@ import ( "github.com/celestiaorg/celestia-node/share" ) -func NewSharesReader(odsSize int, getShare func(rowIdx, colIdx int) ([]byte, error)) *BufferedReader { - return &BufferedReader{ - getShare: getShare, - buf: bytes.NewBuffer(nil), - odsSize: odsSize, - total: odsSize * odsSize, - } -} - -// BufferedReader will read Shares from inMemOds into the buffer. +// BufferedReader will read Shares from getShare function into the buffer. // It exposes the buffer to be read by io.Reader interface implementation type BufferedReader struct { buf *bytes.Buffer @@ -28,6 +19,15 @@ type BufferedReader struct { current, odsSize, total int } +func NewSharesReader(odsSize int, getShare func(rowIdx, colIdx int) ([]byte, error)) *BufferedReader { + return &BufferedReader{ + getShare: getShare, + buf: bytes.NewBuffer(nil), + odsSize: odsSize, + total: odsSize * odsSize, + } +} + func (r *BufferedReader) Read(p []byte) (int, error) { if r.current >= r.total && r.buf.Len() == 0 { return 0, io.EOF @@ -49,7 +49,7 @@ func (r *BufferedReader) Read(p []byte) (int, error) { rowIdx, colIdx := r.current/r.odsSize, r.current%r.odsSize share, err := r.getShare(rowIdx, colIdx) if err != nil { - return 0, fmt.Errorf("get share; %w", err) + return 0, fmt.Errorf("get share: %w", err) } // copy share to provided buffer diff --git a/share/new_eds/rsmt2d.go b/share/new_eds/rsmt2d.go index 7af740acc3..9bec30db66 100644 --- a/share/new_eds/rsmt2d.go +++ b/share/new_eds/rsmt2d.go @@ -108,8 +108,7 @@ func (eds *Rsmt2D) Reader() (io.Reader, error) { return reader, nil } -// Rsmt2DFromShares reads shares from the provided reader and constructs an Extended Data Square. -// Provided reader should contain shares in row-major order. +// Rsmt2DFromShares constructs an Extended Data Square from shares. func Rsmt2DFromShares(shares []share.Share, odsSize int) (Rsmt2D, error) { treeFn := wrapper.NewConstructor(uint64(odsSize)) eds, err := rsmt2d.ComputeExtendedDataSquare(shares, share.DefaultRSMT2DCodec(), treeFn) diff --git a/share/new_eds/rsmt2d_test.go b/share/new_eds/rsmt2d_test.go index 764b8a1ea8..3a571027f6 100644 --- a/share/new_eds/rsmt2d_test.go +++ b/share/new_eds/rsmt2d_test.go @@ -15,7 +15,7 @@ import ( ) func TestRsmt2dAccessor(t *testing.T) { - odsSize := 8 + odsSize := 64 newAccessor := func(tb testing.TB, eds *rsmt2d.ExtendedDataSquare) Accessor { return &Rsmt2D{ExtendedDataSquare: eds} } diff --git a/share/new_eds/testing.go b/share/new_eds/testing.go index c195a8cb98..cbfa81f219 100644 --- a/share/new_eds/testing.go +++ b/share/new_eds/testing.go @@ -28,23 +28,29 @@ func TestSuiteAccessor( ctx context.Context, t *testing.T, createAccessor createAccessor, - odsSize int, + maxSize int, ) { - t.Run("Sample", func(t *testing.T) { - testAccessorSample(ctx, t, createAccessor, odsSize) - }) - - t.Run("AxisHalf", func(t *testing.T) { - testAccessorAxisHalf(ctx, t, createAccessor, odsSize) - }) - - t.Run("RowNamespaceData", func(t *testing.T) { - testAccessorRowNamespaceData(ctx, t, createAccessor, odsSize) - }) - - t.Run("Shares", func(t *testing.T) { - testAccessorShares(ctx, t, createAccessor, odsSize) - }) + minSize := 2 + if !checkPowerOfTwo(maxSize) { + t.Errorf("minSize must be power of 2: %v", maxSize) + } + for size := minSize; size <= maxSize; size *= 2 { + t.Run(fmt.Sprintf("Sample:%d", size), func(t *testing.T) { + testAccessorSample(ctx, t, createAccessor, size) + }) + + t.Run(fmt.Sprintf("AxisHalf:%d", size), func(t *testing.T) { + testAccessorAxisHalf(ctx, t, createAccessor, size) + }) + + t.Run(fmt.Sprintf("RowNamespaceData:%d", size), func(t *testing.T) { + testAccessorRowNamespaceData(ctx, t, createAccessor, size) + }) + + t.Run(fmt.Sprintf("Shares:%d", size), func(t *testing.T) { + testAccessorShares(ctx, t, createAccessor, size) + }) + } } func TestStreamer( @@ -350,3 +356,11 @@ func (q quadrant) coordinates(edsSize int) (rowIdx, colIdx int) { rowIdx = edsSize/2*(int(q-1)/2) + 1 return rowIdx, colIdx } + +func checkPowerOfTwo(n int) bool { + // added one corner case if n is zero it will also consider as power 2 + if n == 0 { + return true + } + return n&(n-1) == 0 +} diff --git a/share/new_eds/validation_test.go b/share/new_eds/validation_test.go index aa75116259..98445c94ac 100644 --- a/share/new_eds/validation_test.go +++ b/share/new_eds/validation_test.go @@ -32,7 +32,7 @@ func TestValidation_Sample(t *testing.T) { t.Run(tt.name, func(t *testing.T) { randEDS := edstest.RandEDS(t, tt.odsSize) accessor := &Rsmt2D{ExtendedDataSquare: randEDS} - validation := WithValidation(WithStreamer(accessor, nil)) + validation := WithValidation(AccessorAndStreamer(accessor, nil)) _, err := validation.Sample(context.Background(), tt.rowIdx, tt.colIdx) if tt.expectFail { @@ -61,7 +61,7 @@ func TestValidation_AxisHalf(t *testing.T) { t.Run(tt.name, func(t *testing.T) { randEDS := edstest.RandEDS(t, tt.odsSize) accessor := &Rsmt2D{ExtendedDataSquare: randEDS} - validation := WithValidation(WithStreamer(accessor, nil)) + validation := WithValidation(AccessorAndStreamer(accessor, nil)) _, err := validation.AxisHalf(context.Background(), tt.axisType, tt.axisIdx) if tt.expectFail { @@ -89,7 +89,7 @@ func TestValidation_RowNamespaceData(t *testing.T) { t.Run(tt.name, func(t *testing.T) { randEDS := edstest.RandEDS(t, tt.odsSize) accessor := &Rsmt2D{ExtendedDataSquare: randEDS} - validation := WithValidation(WithStreamer(accessor, nil)) + validation := WithValidation(AccessorAndStreamer(accessor, nil)) ns := sharetest.RandV0Namespace() _, err := validation.RowNamespaceData(context.Background(), ns, tt.rowIdx) diff --git a/share/shwap/p2p/bitswap/block_fetch_test.go b/share/shwap/p2p/bitswap/block_fetch_test.go index 59e9384e65..04c90e8a5a 100644 --- a/share/shwap/p2p/bitswap/block_fetch_test.go +++ b/share/shwap/p2p/bitswap/block_fetch_test.go @@ -2,7 +2,7 @@ package bitswap import ( "context" - "math/rand/v2" + "math/rand" "sync" "testing" "time" @@ -94,7 +94,7 @@ func TestFetch_Duplicates(t *testing.T) { wg.Add(1) go func(i int) { - rint := rand.IntN(10) + rint := rand.Intn(10) // this sleep ensures fetches aren't started simultaneously, allowing to check for edge-cases time.Sleep(time.Millisecond * time.Duration(rint)) diff --git a/store/file/ods.go b/store/file/ods.go index f21e3350f9..20e966e9d4 100644 --- a/store/file/ods.go +++ b/store/file/ods.go @@ -201,8 +201,9 @@ func (f *ODSFile) Reader() (io.Reader, error) { return ods.reader() } - offset := f.hdr.Size() - reader := newFileReader(f.fl, offset, int(f.hdr.shareSize), f.size()) + offset := int64(f.hdr.Size()) + total := int64(f.hdr.shareSize) * int64(f.size()*f.size()/4) + reader := io.NewSectionReader(f.fl, offset, total) return reader, nil } @@ -233,11 +234,11 @@ func (f *ODSFile) readAxisHalf(axisType rsmt2d.Axis, axisIdx int) (eds.AxisHalf, func (f *ODSFile) readODS() (square, error) { f.lock.RLock() - if f.ods != nil { - f.lock.RUnlock() - return f.ods, nil - } + ods := f.ods f.lock.RUnlock() + if ods != nil { + return ods, nil + } // reset file pointer to the beginning of the file shares data _, err := f.fl.Seek(int64(f.hdr.Size()), io.SeekStart) @@ -306,24 +307,3 @@ func (f *ODSFile) axis(ctx context.Context, axisType rsmt2d.Axis, axisIdx int) ( return half.Extended() } - -type fileReader struct { - r io.ReaderAt - current, total int64 -} - -func newFileReader(r io.ReaderAt, offset, shareSize, odsSize int) *fileReader { - return &fileReader{ - r: r, - current: int64(offset), - total: int64(odsSize*odsSize*shareSize + offset), - } -} - -func (w *fileReader) Read(p []byte) (int, error) { - if w.current >= w.total { - return 0, io.EOF - } - - return w.r.ReadAt(p, w.current) -} diff --git a/store/file/ods_test.go b/store/file/ods_test.go index 155c885eaf..575b265fc0 100644 --- a/store/file/ods_test.go +++ b/store/file/ods_test.go @@ -61,7 +61,7 @@ func TestODSFile(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) t.Cleanup(cancel) - ODSSize := 8 + ODSSize := 64 eds.TestSuiteAccessor(ctx, t, createAccessor, ODSSize) eds.TestStreamer(ctx, t, createCachedStreamer, ODSSize) eds.TestStreamer(ctx, t, createStreamer, ODSSize) diff --git a/store/file/q1q4_file_test.go b/store/file/q1q4_file_test.go index daa0c4cb87..2c5c9b4065 100644 --- a/store/file/q1q4_file_test.go +++ b/store/file/q1q4_file_test.go @@ -44,7 +44,7 @@ func TestQ1Q4File(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) t.Cleanup(cancel) - ODSSize := 8 + ODSSize := 64 eds.TestSuiteAccessor(ctx, t, createQ1Q4File, ODSSize) } From e6913fce721728eba575eb803bd40ddc5c6ebf1f Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Tue, 2 Jul 2024 15:55:44 +0200 Subject: [PATCH 6/8] return rand/v2 --- share/shwap/p2p/bitswap/block_fetch_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/share/shwap/p2p/bitswap/block_fetch_test.go b/share/shwap/p2p/bitswap/block_fetch_test.go index 04c90e8a5a..bcd1b72eb3 100644 --- a/share/shwap/p2p/bitswap/block_fetch_test.go +++ b/share/shwap/p2p/bitswap/block_fetch_test.go @@ -2,7 +2,6 @@ package bitswap import ( "context" - "math/rand" "sync" "testing" "time" @@ -20,6 +19,7 @@ import ( mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "math/rand/v2" "github.com/celestiaorg/rsmt2d" @@ -94,7 +94,7 @@ func TestFetch_Duplicates(t *testing.T) { wg.Add(1) go func(i int) { - rint := rand.Intn(10) + rint := rand.IntN(10) // this sleep ensures fetches aren't started simultaneously, allowing to check for edge-cases time.Sleep(time.Millisecond * time.Duration(rint)) From 60427d92949a81031120498d5c9285cd529f9fd8 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Tue, 2 Jul 2024 15:56:40 +0200 Subject: [PATCH 7/8] fix lint --- share/shwap/p2p/bitswap/block_fetch_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/share/shwap/p2p/bitswap/block_fetch_test.go b/share/shwap/p2p/bitswap/block_fetch_test.go index bcd1b72eb3..59e9384e65 100644 --- a/share/shwap/p2p/bitswap/block_fetch_test.go +++ b/share/shwap/p2p/bitswap/block_fetch_test.go @@ -2,6 +2,7 @@ package bitswap import ( "context" + "math/rand/v2" "sync" "testing" "time" @@ -19,7 +20,6 @@ import ( mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "math/rand/v2" "github.com/celestiaorg/rsmt2d" From 19d8d477faba754500ad9649d64448abdfc4c45d Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Tue, 2 Jul 2024 22:51:02 +0200 Subject: [PATCH 8/8] lower max size for tests --- share/new_eds/proofs_cache_test.go | 4 ++-- share/new_eds/reader_test.go | 10 ---------- share/new_eds/rsmt2d_test.go | 4 ++-- store/file/ods_test.go | 4 ++-- store/file/q1q4_file_test.go | 4 ++-- 5 files changed, 8 insertions(+), 18 deletions(-) diff --git a/share/new_eds/proofs_cache_test.go b/share/new_eds/proofs_cache_test.go index 4552e4b862..b570b15c1e 100644 --- a/share/new_eds/proofs_cache_test.go +++ b/share/new_eds/proofs_cache_test.go @@ -9,8 +9,8 @@ import ( ) func TestCache(t *testing.T) { - ODSSize := 64 - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ODSSize := 16 + ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) t.Cleanup(cancel) newAccessor := func(tb testing.TB, inner *rsmt2d.ExtendedDataSquare) Accessor { diff --git a/share/new_eds/reader_test.go b/share/new_eds/reader_test.go index 7c2ab57e82..7b05b17b69 100644 --- a/share/new_eds/reader_test.go +++ b/share/new_eds/reader_test.go @@ -2,7 +2,6 @@ package eds import ( "errors" - "fmt" "io" "math/rand" "testing" @@ -13,19 +12,11 @@ import ( "github.com/celestiaorg/celestia-node/share/eds/edstest" ) -func TestSharesReaderMany(t *testing.T) { - // create io.Writer that write random data - for i := 0; i < 10000; i++ { - TestSharesReader(t) - } -} - func TestSharesReader(t *testing.T) { // create io.Writer that write random data odsSize := 16 eds := edstest.RandEDS(t, odsSize) getShare := func(rowIdx, colIdx int) ([]byte, error) { - fmt.Println("get", rowIdx, colIdx) return eds.GetCell(uint(rowIdx), uint(colIdx)), nil } @@ -56,7 +47,6 @@ func readWithRandomBuffer(reader io.Reader, maxBufSize int) ([]byte, error) { } data = append(data, buf...) if errors.Is(err, io.EOF) { - fmt.Println("eof?") break } } diff --git a/share/new_eds/rsmt2d_test.go b/share/new_eds/rsmt2d_test.go index 3a571027f6..ef4fdae7bc 100644 --- a/share/new_eds/rsmt2d_test.go +++ b/share/new_eds/rsmt2d_test.go @@ -15,11 +15,11 @@ import ( ) func TestRsmt2dAccessor(t *testing.T) { - odsSize := 64 + odsSize := 16 newAccessor := func(tb testing.TB, eds *rsmt2d.ExtendedDataSquare) Accessor { return &Rsmt2D{ExtendedDataSquare: eds} } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) t.Cleanup(cancel) TestSuiteAccessor(ctx, t, newAccessor, odsSize) diff --git a/store/file/ods_test.go b/store/file/ods_test.go index 575b265fc0..307689092c 100644 --- a/store/file/ods_test.go +++ b/store/file/ods_test.go @@ -58,10 +58,10 @@ func TestReadODSFromFile(t *testing.T) { } func TestODSFile(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) t.Cleanup(cancel) - ODSSize := 64 + ODSSize := 16 eds.TestSuiteAccessor(ctx, t, createAccessor, ODSSize) eds.TestStreamer(ctx, t, createCachedStreamer, ODSSize) eds.TestStreamer(ctx, t, createStreamer, ODSSize) diff --git a/store/file/q1q4_file_test.go b/store/file/q1q4_file_test.go index 2c5c9b4065..d739ad40ba 100644 --- a/store/file/q1q4_file_test.go +++ b/store/file/q1q4_file_test.go @@ -41,10 +41,10 @@ func TestCreateQ1Q4File(t *testing.T) { } func TestQ1Q4File(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) t.Cleanup(cancel) - ODSSize := 64 + ODSSize := 16 eds.TestSuiteAccessor(ctx, t, createQ1Q4File, ODSSize) }