Skip to content

Commit

Permalink
blockstore impl and various cleanups and improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed Sep 22, 2023
1 parent 199ca73 commit e7ebbb1
Show file tree
Hide file tree
Showing 16 changed files with 519 additions and 165 deletions.
33 changes: 16 additions & 17 deletions share/eds/file.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package eds

import (
"fmt"
"io"
"os"

Expand All @@ -21,7 +22,7 @@ import (
// - Avoid storing constant shares, like padding
type File struct {
path string
hdr Header
hdr *Header
fl fileBackend
}

Expand Down Expand Up @@ -55,9 +56,9 @@ func CreateFile(path string, eds *rsmt2d.ExtendedDataSquare) (*File, error) {
return nil, err
}

h := Header{
ShareSize: uint16(len(eds.GetCell(0, 0))), // TODO: rsmt2d should expose this field
SquareSize: uint32(eds.Width()),
h := &Header{
shareSize: uint16(len(eds.GetCell(0, 0))), // TODO: rsmt2d should expose this field
squareSize: uint32(eds.Width()),
}

if _, err = h.WriteTo(f); err != nil {
Expand All @@ -82,22 +83,17 @@ func (f *File) Close() error {
return f.fl.Close()
}

func (f *File) Header() Header {
func (f *File) Header() *Header {
return f.hdr
}

func (f *File) Axis(idx int, axis rsmt2d.Axis) ([]share.Share, error) {
shrLn := int(f.hdr.ShareSize)
sqrLn := int(f.hdr.SquareSize)
shrLn := int(f.hdr.shareSize)
sqrLn := int(f.hdr.squareSize)

shrs := make([]share.Share, sqrLn)
switch axis {
case rsmt2d.Col:
// [] [] [] []
// [] [] [] []
// [] [] [] []
// [] [] [] []

for i := 0; i < sqrLn; i++ {
pos := idx + i*sqrLn
offset := pos*shrLn + HeaderSize
Expand All @@ -111,6 +107,7 @@ func (f *File) Axis(idx int, axis rsmt2d.Axis) ([]share.Share, error) {
case rsmt2d.Row:
pos := idx * sqrLn
offset := pos*shrLn + HeaderSize

axsData := make([]byte, sqrLn*shrLn)
if _, err := f.fl.ReadAt(axsData, int64(offset)); err != nil {
return nil, err
Expand All @@ -119,14 +116,16 @@ func (f *File) Axis(idx int, axis rsmt2d.Axis) ([]share.Share, error) {
for i := range shrs {
shrs[i] = axsData[i*shrLn : (i+1)*shrLn]
}
default:
return nil, fmt.Errorf("unknown axis")
}

return shrs, nil
}

func (f *File) Share(idx int) (share.Share, error) {
// TODO: Check the cache first
shrLn := int64(f.hdr.ShareSize)
shrLn := int64(f.hdr.shareSize)

offset := int64(idx)*shrLn + HeaderSize
shr := make(share.Share, shrLn)
Expand All @@ -138,7 +137,7 @@ func (f *File) Share(idx int) (share.Share, error) {

func (f *File) ShareWithProof(idx int, axis rsmt2d.Axis) (share.Share, nmt.Proof, error) {
// TODO: Cache the axis as well as computed tree
sqrLn := int(f.hdr.SquareSize)
sqrLn := int(f.hdr.squareSize)
axsIdx, shrIdx := idx/sqrLn, idx%sqrLn
if axis == rsmt2d.Col {
axsIdx, shrIdx = shrIdx, axsIdx
Expand Down Expand Up @@ -166,8 +165,8 @@ func (f *File) ShareWithProof(idx int, axis rsmt2d.Axis) (share.Share, nmt.Proof
}

func (f *File) EDS() (*rsmt2d.ExtendedDataSquare, error) {
shrLn := int(f.hdr.ShareSize)
sqrLn := int(f.hdr.SquareSize)
shrLn := int(f.hdr.shareSize)
sqrLn := int(f.hdr.squareSize)

buf := make([]byte, sqrLn*sqrLn*shrLn)
if _, err := f.fl.ReadAt(buf, HeaderSize); err != nil {
Expand All @@ -183,7 +182,7 @@ func (f *File) EDS() (*rsmt2d.ExtendedDataSquare, error) {
}

codec := share.DefaultRSMT2DCodec()
treeFn := wrapper.NewConstructor(uint64(f.hdr.SquareSize / 2))
treeFn := wrapper.NewConstructor(uint64(f.hdr.squareSize / 2))
eds, err := rsmt2d.ImportExtendedDataSquare(shrs, codec, treeFn)
if err != nil {
return nil, err
Expand Down
55 changes: 36 additions & 19 deletions share/eds/file_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,37 @@ type Header struct {
// User set features
// TODO: Add codec
// TDOD: Add ODS support
Version uint8
Compression uint8
Extensions map[string]string
version uint8
compression uint8
// extensions map[string]string

// Taken directly from EDS
ShareSize uint16
SquareSize uint32
shareSize uint16
squareSize uint32
}

func (h *Header) ShareSize() int {
return int(h.shareSize)
}

func (h *Header) SquareSize() int {
return int(h.squareSize)
}

// TODO(@Wondertan) Should return special types
func (h *Header) Version() uint8 {
return h.version
}
func (h *Header) Compression() uint8 {
return h.compression
}

func (h *Header) WriteTo(w io.Writer) (int64, error) {
buf := make([]byte, HeaderSize)
buf[0] = h.Version
buf[1] = h.Compression
binary.LittleEndian.PutUint16(buf[2:4], h.ShareSize)
binary.LittleEndian.PutUint32(buf[4:12], h.SquareSize)
buf[0] = h.version
buf[1] = h.compression
binary.LittleEndian.PutUint16(buf[2:4], h.shareSize)
binary.LittleEndian.PutUint32(buf[4:12], h.squareSize)
// TODO: Extensions
n, err := w.Write(buf)
return int64(n), err
Expand All @@ -37,26 +54,26 @@ func (h *Header) ReadFrom(r io.Reader) (int64, error) {
return int64(n), err
}

h.Version = buf[0]
h.Compression = buf[1]
h.ShareSize = binary.LittleEndian.Uint16(buf[2:4])
h.SquareSize = binary.LittleEndian.Uint32(buf[4:12])
h.version = buf[0]
h.compression = buf[1]
h.shareSize = binary.LittleEndian.Uint16(buf[2:4])
h.squareSize = binary.LittleEndian.Uint32(buf[4:12])

// TODO: Extensions
return int64(n), err
}

func ReadHeaderAt(r io.ReaderAt, offset int64) (Header, error) {
h := Header{}
func ReadHeaderAt(r io.ReaderAt, offset int64) (*Header, error) {
h := &Header{}
buf := make([]byte, HeaderSize)
_, err := r.ReadAt(buf, offset)
if err != nil {
return h, err
}

h.Version = buf[0]
h.Compression = buf[1]
h.ShareSize = binary.LittleEndian.Uint16(buf[2:4])
h.SquareSize = binary.LittleEndian.Uint32(buf[4:12])
h.version = buf[0]
h.compression = buf[1]
h.shareSize = binary.LittleEndian.Uint16(buf[2:4])
h.squareSize = binary.LittleEndian.Uint32(buf[4:12])
return h, nil
}
8 changes: 8 additions & 0 deletions share/eds/file_store.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
package eds

import "github.com/celestiaorg/celestia-node/share"

type FileStore struct {
baspath string
}

func (fs *FileStore) File(hash share.DataHash) (*File, error) {
// TODO(@Wondertan): Caching
return OpenFile(fs.baspath + "/" + hash.String())
}
18 changes: 14 additions & 4 deletions share/eds/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ import (
"github.com/celestiaorg/celestia-node/share/eds/edstest"
)

func TestCreateFile(t *testing.T) {
path := t.TempDir() + "/testfile"
edsIn := edstest.RandEDS(t, 8)
f, err := CreateFile(path, edsIn)
require.NoError(t, err)
edsOut, err := f.EDS()
require.NoError(t, err)
assert.True(t, edsIn.Equals(edsOut))
}

func TestFile(t *testing.T) {
path := t.TempDir() + "/testfile"
eds := edstest.RandEDS(t, 8)
Expand Down Expand Up @@ -44,7 +54,7 @@ func TestFile(t *testing.T) {
require.NoError(t, err)
assert.EqualValues(t, eds.GetCell(row, col), shr)

shr, proof, err := fl.ShareWithProof(i, axis)
shr, prf, err := fl.ShareWithProof(i, axis)
require.NoError(t, err)
assert.EqualValues(t, eds.GetCell(row, col), shr)

Expand All @@ -53,12 +63,12 @@ func TestFile(t *testing.T) {
namespace = share.GetNamespace(shr)
}

dahroot := root.RowRoots[row]
axishash := root.RowRoots[row]
if axis == rsmt2d.Col {
dahroot = root.ColumnRoots[col]
axishash = root.ColumnRoots[col]
}

ok := proof.VerifyInclusion(sha256.New(), namespace.ToNMT(), [][]byte{shr}, dahroot)
ok := prf.VerifyInclusion(sha256.New(), namespace.ToNMT(), [][]byte{shr}, axishash)
assert.True(t, ok)
}
}
Expand Down
2 changes: 1 addition & 1 deletion share/ipld/blockserv.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ type allowlist struct{}

func (a allowlist) IsAllowed(code uint64) bool {
// we allow all codes except home-baked sha256NamespaceFlagged
return code == sha256NamespaceFlagged || code == 0x7801
return code == sha256NamespaceFlagged
}
133 changes: 133 additions & 0 deletions share/ipldv2/blockstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package ipldv2

import (
"context"
"fmt"
"io"

"github.com/ipfs/boxo/blockstore"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"

"github.com/celestiaorg/nmt"
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
)

// edsFile is a mocking friendly local interface over eds.File.
// TODO(@Wondertan): Consider making an actual interface of eds pkg
type edsFile interface {
io.Closer
Header() *eds.Header
ShareWithProof(idx int, axis rsmt2d.Axis) (share.Share, nmt.Proof, error)
}

// fileStore is a mocking friendly local interface over eds.FileStore
// TODO(@Wondertan): Consider making an actual interface of eds pkg
type fileStore[F edsFile] interface {
File(share.DataHash) (F, error)
}

type Blockstore[F edsFile] struct {
fs fileStore[F]
}

func NewBlockstore[F edsFile](fs fileStore[F]) blockstore.Blockstore {
return &Blockstore[F]{fs}
}

func (b Blockstore[F]) Get(_ context.Context, cid cid.Cid) (blocks.Block, error) {
id, err := SampleIDFromCID(cid)
if err != nil {
err = fmt.Errorf("while converting CID to SampleID: %w", err)
log.Error(err)
return nil, err
}

f, err := b.fs.File(id.DataHash)
if err != nil {
err = fmt.Errorf("while getting EDS file from FS: %w", err)
log.Error(err)
return nil, err
}

shr, prf, err := f.ShareWithProof(id.Index, id.Axis)
if err != nil {
err = fmt.Errorf("while getting share with proof: %w", err)
log.Error(err)
return nil, err
}

s := NewSample(id, shr, prf, f.Header().SquareSize())
blk, err := s.IPLDBlock()
if err != nil {
err = fmt.Errorf("while getting share with proof: %w", err)
log.Error(err)
return nil, err
}

err = f.Close()
if err != nil {
err = fmt.Errorf("while closing EDS file: %w", err)
log.Error(err)
return nil, err
}

return blk, nil
}

func (b Blockstore[F]) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
// TODO(@Wondertan): There must be a way to derive size without reading, proving, serializing and
// allocating Sample's block.Block.
blk, err := b.Get(ctx, cid)
if err != nil {
return 0, err
}

return len(blk.RawData()), nil
}

func (b Blockstore[F]) Has(_ context.Context, cid cid.Cid) (bool, error) {
id, err := SampleIDFromCID(cid)
if err != nil {
err = fmt.Errorf("while converting CID to SampleID: %w", err)
log.Error(err)
return false, err
}

f, err := b.fs.File(id.DataHash)
if err != nil {
err = fmt.Errorf("while getting EDS file from FS: %w", err)
log.Error(err)
return false, err
}

err = f.Close()
if err != nil {
err = fmt.Errorf("while closing EDS file: %w", err)
log.Error(err)
return false, err
}
// existence of the file confirms existence of the share
return true, nil
}

func (b Blockstore[F]) AllKeysChan(context.Context) (<-chan cid.Cid, error) {
return nil, fmt.Errorf("AllKeysChan is unsupported")
}

func (b Blockstore[F]) DeleteBlock(context.Context, cid.Cid) error {
return fmt.Errorf("writes are not supported")
}

func (b Blockstore[F]) Put(context.Context, blocks.Block) error {
return fmt.Errorf("writes are not supported")
}

func (b Blockstore[F]) PutMany(context.Context, []blocks.Block) error {
return fmt.Errorf("writes are not supported")
}

func (b Blockstore[F]) HashOnRead(bool) {}
Loading

0 comments on commit e7ebbb1

Please sign in to comment.