-
Notifications
You must be signed in to change notification settings - Fork 972
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(shwap):Add eds streaming (#3531)
- Loading branch information
Showing
21 changed files
with
433 additions
and
92 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
package eds | ||
|
||
import ( | ||
"bytes" | ||
"errors" | ||
"fmt" | ||
"io" | ||
|
||
"github.com/celestiaorg/celestia-node/share" | ||
) | ||
|
||
// BufferedReader will read Shares from getShare function into the buffer. | ||
// It exposes the buffer to be read by io.Reader interface implementation | ||
type BufferedReader struct { | ||
buf *bytes.Buffer | ||
getShare func(rowIdx, colIdx int) ([]byte, error) | ||
// current is the amount of Shares stored in square that have been written by squareCopy. When | ||
// current reaches total, squareCopy will prevent further reads by returning io.EOF | ||
current, odsSize, total int | ||
} | ||
|
||
func NewSharesReader(odsSize int, getShare func(rowIdx, colIdx int) ([]byte, error)) *BufferedReader { | ||
return &BufferedReader{ | ||
getShare: getShare, | ||
buf: bytes.NewBuffer(nil), | ||
odsSize: odsSize, | ||
total: odsSize * odsSize, | ||
} | ||
} | ||
|
||
func (r *BufferedReader) Read(p []byte) (int, error) { | ||
if r.current >= r.total && r.buf.Len() == 0 { | ||
return 0, io.EOF | ||
} | ||
// if provided array is smaller than data in buf, read from buf | ||
if len(p) <= r.buf.Len() { | ||
return r.buf.Read(p) | ||
} | ||
n, err := io.ReadFull(r.buf, p) | ||
if err == nil { | ||
return n, nil | ||
} | ||
if !errors.Is(err, io.ErrUnexpectedEOF) && !errors.Is(err, io.EOF) { | ||
return n, fmt.Errorf("unexpected error reading from buf: %w", err) | ||
} | ||
|
||
written := n | ||
for r.current < r.total { | ||
rowIdx, colIdx := r.current/r.odsSize, r.current%r.odsSize | ||
share, err := r.getShare(rowIdx, colIdx) | ||
if err != nil { | ||
return 0, fmt.Errorf("get share: %w", err) | ||
} | ||
|
||
// copy share to provided buffer | ||
emptySpace := len(p) - written | ||
r.current++ | ||
if len(share) < emptySpace { | ||
n := copy(p[written:], share) | ||
written += n | ||
continue | ||
} | ||
|
||
// if share didn't fit into buffer fully, store remaining bytes into inner buf | ||
n := copy(p[written:], share[:emptySpace]) | ||
written += n | ||
n, err = r.buf.Write(share[emptySpace:]) | ||
if err != nil { | ||
return 0, fmt.Errorf("write share to inner buffer: %w", err) | ||
} | ||
if n != len(share)-emptySpace { | ||
return 0, fmt.Errorf("share was not written fully: %w", io.ErrShortWrite) | ||
} | ||
return written, nil | ||
} | ||
return written, nil | ||
} | ||
|
||
// ReadShares reads shares from the provided reader and constructs an Extended Data Square. Provided | ||
// reader should contain shares in row-major order. | ||
func ReadShares(r io.Reader, shareSize, odsSize int) ([]share.Share, error) { | ||
shares := make([]share.Share, odsSize*odsSize) | ||
var total int | ||
for i := range shares { | ||
share := make(share.Share, shareSize) | ||
n, err := io.ReadFull(r, share) | ||
if err != nil { | ||
return nil, fmt.Errorf("reading share: %w, bytes read: %v", err, total+n) | ||
} | ||
if n != shareSize { | ||
return nil, fmt.Errorf("share size mismatch: expected %v, got %v", shareSize, n) | ||
} | ||
shares[i] = share | ||
total += n | ||
} | ||
return shares, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
package eds | ||
|
||
import ( | ||
"errors" | ||
"io" | ||
"math/rand" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/celestiaorg/celestia-node/share" | ||
"github.com/celestiaorg/celestia-node/share/eds/edstest" | ||
) | ||
|
||
func TestSharesReader(t *testing.T) { | ||
// create io.Writer that write random data | ||
odsSize := 16 | ||
eds := edstest.RandEDS(t, odsSize) | ||
getShare := func(rowIdx, colIdx int) ([]byte, error) { | ||
return eds.GetCell(uint(rowIdx), uint(colIdx)), nil | ||
} | ||
|
||
reader := NewSharesReader(odsSize, getShare) | ||
readBytes, err := readWithRandomBuffer(reader, 1024) | ||
require.NoError(t, err) | ||
expected := make([]byte, 0, odsSize*odsSize*share.Size) | ||
for _, share := range eds.FlattenedODS() { | ||
expected = append(expected, share...) | ||
} | ||
require.Len(t, readBytes, len(expected)) | ||
require.Equal(t, expected, readBytes) | ||
} | ||
|
||
// testRandReader reads from reader with buffers of random sizes. | ||
func readWithRandomBuffer(reader io.Reader, maxBufSize int) ([]byte, error) { | ||
// create buffer of random size | ||
data := make([]byte, 0, maxBufSize) | ||
for { | ||
bufSize := rand.Intn(maxBufSize-1) + 1 | ||
buf := make([]byte, bufSize) | ||
n, err := reader.Read(buf) | ||
if err != nil && !errors.Is(err, io.EOF) { | ||
return nil, err | ||
} | ||
if n < bufSize { | ||
buf = buf[:n] | ||
} | ||
data = append(data, buf...) | ||
if errors.Is(err, io.EOF) { | ||
break | ||
} | ||
} | ||
return data, nil | ||
} |
Oops, something went wrong.