diff --git a/share/new_eds/accessor.go b/share/new_eds/accessor.go index 8ced738fa4..30b671e302 100644 --- a/share/new_eds/accessor.go +++ b/share/new_eds/accessor.go @@ -14,6 +14,8 @@ import ( type Accessor interface { // Size returns square size of the Accessor. Size(ctx context.Context) int + // DataRoot returns data hash of the Accessor. + DataRoot(ctx context.Context) (share.DataHash, error) // Sample returns share and corresponding proof for row and column indices. Implementation can // choose which axis to use for proof. Chosen axis for proof should be indicated in the returned // Sample. diff --git a/share/new_eds/close_once.go b/share/new_eds/close_once.go index c05851db7c..3a6906bfac 100644 --- a/share/new_eds/close_once.go +++ b/share/new_eds/close_once.go @@ -42,6 +42,14 @@ func (c *closeOnce) Size(ctx context.Context) int { return c.f.Size(ctx) } +// DataRoot returns root hash of Accessor's underlying EDS. +func (c *closeOnce) DataRoot(ctx context.Context) (share.DataHash, error) { + if c.closed.Load() { + return nil, errAccessorClosed + } + return c.f.DataRoot(ctx) +} + func (c *closeOnce) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error) { if c.closed.Load() { return shwap.Sample{}, errAccessorClosed diff --git a/share/new_eds/close_once_test.go b/share/new_eds/close_once_test.go index a063423a1c..df5ef1731c 100644 --- a/share/new_eds/close_once_test.go +++ b/share/new_eds/close_once_test.go @@ -50,6 +50,10 @@ func (s *stubEdsAccessorCloser) Size(context.Context) int { return 0 } +func (s *stubEdsAccessorCloser) DataRoot(context.Context) (share.DataHash, error) { + return nil, nil +} + func (s *stubEdsAccessorCloser) Sample(context.Context, int, int) (shwap.Sample, error) { return shwap.Sample{}, nil } diff --git a/share/new_eds/proofs_cache.go b/share/new_eds/proofs_cache.go index faf3e580ef..7494b31025 100644 --- a/share/new_eds/proofs_cache.go +++ b/share/new_eds/proofs_cache.go @@ -77,6 +77,10 @@ func (c *proofsCache) Size(ctx context.Context) int { return int(size) } +func (c *proofsCache) DataRoot(ctx context.Context) (share.DataHash, error) { + return c.inner.DataRoot(ctx) +} + func (c *proofsCache) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error) { axisType, axisIdx, shrIdx := rsmt2d.Row, rowIdx, colIdx ax, err := c.axisWithProofs(ctx, axisType, axisIdx) diff --git a/share/new_eds/rsmt2d.go b/share/new_eds/rsmt2d.go index 9bec30db66..91e93ca135 100644 --- a/share/new_eds/rsmt2d.go +++ b/share/new_eds/rsmt2d.go @@ -24,6 +24,15 @@ func (eds *Rsmt2D) Size(context.Context) int { return int(eds.Width()) } +// DataRoot returns data hash of the Accessor. +func (eds *Rsmt2D) DataRoot(context.Context) (share.DataHash, error) { + dah, err := share.NewRoot(eds.ExtendedDataSquare) + if err != nil { + return share.DataHash{}, fmt.Errorf("while creating data root: %w", err) + } + return dah.Hash(), nil +} + // Sample returns share and corresponding proof for row and column indices. func (eds *Rsmt2D) Sample( _ context.Context, diff --git a/store/cache/accessor_cache.go b/store/cache/accessor_cache.go index 90bda41db6..7537c07b33 100644 --- a/store/cache/accessor_cache.go +++ b/store/cache/accessor_cache.go @@ -151,10 +151,9 @@ func (bc *AccessorCache) Remove(height uint64) error { } // EnableMetrics enables metrics for the cache. -func (bc *AccessorCache) EnableMetrics() error { - var err error +func (bc *AccessorCache) EnableMetrics() (unreg func() error, err error) { bc.metrics, err = newMetrics(bc) - return err + return bc.metrics.reg.Unregister, err } func (s *accessor) addRef() error { diff --git a/store/cache/accessor_cache_test.go b/store/cache/accessor_cache_test.go index c3777f548b..7b8261c72c 100644 --- a/store/cache/accessor_cache_test.go +++ b/store/cache/accessor_cache_test.go @@ -300,6 +300,10 @@ func (m *mockAccessor) Size(context.Context) int { panic("implement me") } +func (m *mockAccessor) DataRoot(context.Context) (share.DataHash, error) { + panic("implement me") +} + func (m *mockAccessor) Sample(context.Context, int, int) (shwap.Sample, error) { panic("implement me") } diff --git a/store/cache/cache.go b/store/cache/cache.go index 7bdf247612..ca9309bd8b 100644 --- a/store/cache/cache.go +++ b/store/cache/cache.go @@ -32,5 +32,5 @@ type Cache interface { Remove(height uint64) error // EnableMetrics enables metrics in Cache - EnableMetrics() error + EnableMetrics() (unreg func() error, err error) } diff --git a/store/cache/doublecache.go b/store/cache/doublecache.go index 1e1230f703..6c48cc4cb6 100644 --- a/store/cache/doublecache.go +++ b/store/cache/doublecache.go @@ -2,6 +2,7 @@ package cache import ( "errors" + "fmt" eds "github.com/celestiaorg/celestia-node/share/new_eds" ) @@ -43,9 +44,18 @@ func (mc *DoubleCache) Second() Cache { return mc.second } -func (mc *DoubleCache) EnableMetrics() error { - if err := mc.first.EnableMetrics(); err != nil { - return err +func (mc *DoubleCache) EnableMetrics() (unreg func() error, err error) { + unreg1, err := mc.first.EnableMetrics() + if err != nil { + return nil, fmt.Errorf("while enabling metrics for first cache: %w", err) } - return mc.second.EnableMetrics() + unreg2, err := mc.second.EnableMetrics() + if err != nil { + return unreg1, fmt.Errorf("while enabling metrics for second cache: %w", err) + } + + unregFn := func() error { + return errors.Join(unreg1(), unreg2()) + } + return unregFn, nil } diff --git a/store/cache/noop.go b/store/cache/noop.go index f1a2936cdb..045369a635 100644 --- a/store/cache/noop.go +++ b/store/cache/noop.go @@ -28,8 +28,9 @@ func (n NoopCache) Remove(uint64) error { return nil } -func (n NoopCache) EnableMetrics() error { - return nil +func (n NoopCache) EnableMetrics() (unreg func() error, err error) { + noop := func() error { return nil } + return noop, nil } var _ eds.AccessorStreamer = NoopFile{} @@ -45,6 +46,11 @@ func (n NoopFile) Size(context.Context) int { return 0 } +// DataRoot returns root hash of Accessor's underlying EDS. +func (n NoopFile) DataRoot(context.Context) (share.DataHash, error) { + return nil, nil +} + func (n NoopFile) Sample(context.Context, int, int) (shwap.Sample, error) { return shwap.Sample{}, nil } diff --git a/store/file/codec_test.go b/store/file/codec_test.go index d6fdbb3045..857c16aff0 100644 --- a/store/file/codec_test.go +++ b/store/file/codec_test.go @@ -68,7 +68,7 @@ func BenchmarkCodec(b *testing.B) { } } -func newShards(b require.TestingT, size int, fillParity bool) [][]byte { +func newShards(b testing.TB, size int, fillParity bool) [][]byte { shards := make([][]byte, size) original := sharetest.RandShares(b, size/2) copy(shards, original) diff --git a/store/file/header.go b/store/file/header.go index 67121a03f3..9946041f46 100644 --- a/store/file/header.go +++ b/store/file/header.go @@ -2,6 +2,7 @@ package file import ( "encoding/binary" + "errors" "fmt" "io" @@ -42,7 +43,11 @@ const ( func readHeader(r io.Reader) (*headerV0, error) { // read first byte to determine the fileVersion var version headerVersion - if err := binary.Read(r, binary.LittleEndian, &version); err != nil { + err := binary.Read(r, binary.LittleEndian, &version) + if err != nil { + if errors.Is(err, io.EOF) { + return nil, ErrEmptyFile + } return nil, fmt.Errorf("readHeader: %w", err) } diff --git a/store/file/ods.go b/store/file/ods.go index 20e966e9d4..2128942190 100644 --- a/store/file/ods.go +++ b/store/file/ods.go @@ -2,6 +2,7 @@ package file import ( "context" + "errors" "fmt" "io" "os" @@ -16,6 +17,10 @@ import ( var _ eds.AccessorStreamer = (*ODSFile)(nil) +// ErrEmptyFile signals that the ODS file is empty. +// This helps avoid storing empty block EDSes. +var ErrEmptyFile = errors.New("file is empty") + type ODSFile struct { path string hdr *headerV0 @@ -61,7 +66,8 @@ func CreateODSFile( datahash share.DataHash, eds *rsmt2d.ExtendedDataSquare, ) (*ODSFile, error) { - f, err := os.Create(path) + mod := os.O_RDWR | os.O_CREATE | os.O_EXCL // ensure we fail if already exist + f, err := os.OpenFile(path, mod, 0o666) if err != nil { return nil, fmt.Errorf("file create: %w", err) } @@ -114,6 +120,11 @@ func (f *ODSFile) size() int { return int(f.hdr.squareSize) } +// DataRoot returns root hash of Accessor's underlying EDS. +func (f *ODSFile) DataRoot(context.Context) (share.DataHash, error) { + return f.hdr.datahash, nil +} + // Close closes the file. func (f *ODSFile) Close() error { return f.fl.Close() diff --git a/store/file/q1q4_file.go b/store/file/q1q4_file.go index 8b0bed86a9..1eb242f749 100644 --- a/store/file/q1q4_file.go +++ b/store/file/q1q4_file.go @@ -33,8 +33,7 @@ func OpenQ1Q4File(path string) (*Q1Q4File, error) { }, nil } -func CreateQ1Q4File(path string, datahash share.DataHash, eds *rsmt2d.ExtendedDataSquare, -) (*Q1Q4File, error) { +func CreateQ1Q4File(path string, datahash share.DataHash, eds *rsmt2d.ExtendedDataSquare) (*Q1Q4File, error) { ods, err := CreateODSFile(path, datahash, eds) if err != nil { return nil, err @@ -54,6 +53,11 @@ func (f *Q1Q4File) Size(ctx context.Context) int { return f.ods.Size(ctx) } +// DataRoot returns root hash of Accessor's underlying EDS. +func (f *Q1Q4File) DataRoot(ctx context.Context) (share.DataHash, error) { + return f.ods.DataRoot(ctx) +} + func (f *Q1Q4File) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error) { // use native AxisHalf implementation, to read axis from Q4 quandrant when possible half, err := f.AxisHalf(ctx, rsmt2d.Row, rowIdx) diff --git a/store/metrics.go b/store/metrics.go new file mode 100644 index 0000000000..cde136a9a4 --- /dev/null +++ b/store/metrics.go @@ -0,0 +1,141 @@ +package store + +import ( + "context" + "fmt" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +const ( + failedKey = "failed" + sizeKey = "eds_size" +) + +var meter = otel.Meter("store") + +type metrics struct { + put metric.Float64Histogram + putExists metric.Int64Counter + get metric.Float64Histogram + has metric.Float64Histogram + remove metric.Float64Histogram + unreg func() error +} + +func (s *Store) WithMetrics() error { + put, err := meter.Float64Histogram("eds_store_put_time_histogram", + metric.WithDescription("eds store put time histogram(s)")) + if err != nil { + return err + } + + putExists, err := meter.Int64Counter("eds_store_put_exists_counter", + metric.WithDescription("eds store put file exists")) + if err != nil { + return err + } + + get, err := meter.Float64Histogram("eds_store_get_time_histogram", + metric.WithDescription("eds store get time histogram(s)")) + if err != nil { + return err + } + + has, err := meter.Float64Histogram("eds_store_has_time_histogram", + metric.WithDescription("eds store has time histogram(s)")) + if err != nil { + return err + } + + remove, err := meter.Float64Histogram("eds_store_remove_time_histogram", + metric.WithDescription("eds store remove time histogram(s)")) + if err != nil { + return err + } + + unreg, err := s.cache.EnableMetrics() + if err != nil { + return fmt.Errorf("while enabling metrics for cache: %w", err) + } + + s.metrics = &metrics{ + put: put, + putExists: putExists, + get: get, + has: has, + remove: remove, + unreg: unreg, + } + return nil +} + +func (m *metrics) observePut(ctx context.Context, dur time.Duration, size uint, failed bool) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + + m.put.Record(ctx, dur.Seconds(), metric.WithAttributes( + attribute.Bool(failedKey, failed), + attribute.Int(sizeKey, int(size)))) +} + +func (m *metrics) observePutExist(ctx context.Context) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + + m.putExists.Add(ctx, 1) +} + +func (m *metrics) observeGet(ctx context.Context, dur time.Duration, failed bool) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + + m.get.Record(ctx, dur.Seconds(), metric.WithAttributes( + attribute.Bool(failedKey, failed))) +} + +func (m *metrics) observeHas(ctx context.Context, dur time.Duration, failed bool) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + + m.has.Record(ctx, dur.Seconds(), metric.WithAttributes( + attribute.Bool(failedKey, failed))) +} + +func (m *metrics) observeRemove(ctx context.Context, dur time.Duration, failed bool) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + + m.remove.Record(ctx, dur.Seconds(), metric.WithAttributes( + attribute.Bool(failedKey, failed))) +} + +func (m *metrics) close() error { + if m == nil { + return nil + } + return m.unreg() +} diff --git a/store/store.go b/store/store.go new file mode 100644 index 0000000000..8c40bfa87b --- /dev/null +++ b/store/store.go @@ -0,0 +1,380 @@ +package store + +import ( + "context" + "errors" + "fmt" + "os" + "strconv" + "time" + + logging "github.com/ipfs/go-log/v2" + + "github.com/celestiaorg/rsmt2d" + + "github.com/celestiaorg/celestia-node/share" + eds "github.com/celestiaorg/celestia-node/share/new_eds" + "github.com/celestiaorg/celestia-node/store/cache" + "github.com/celestiaorg/celestia-node/store/file" +) + +var ( + log = logging.Logger("share/eds") + + emptyAccessor = &eds.Rsmt2D{ExtendedDataSquare: share.EmptyExtendedDataSquare()} +) + +const ( + blocksPath = "/blocks/" + heightsPath = blocksPath + "heights/" + + defaultDirPerm = 0o755 +) + +var ErrNotFound = errors.New("eds not found in store") + +// Store is a storage for EDS files. It persists EDS files on disk in form of Q1Q4 files or ODS +// files. It provides methods to put, get and remove EDS files. It has two caches: recent eds cache +// and availability cache. Recent eds cache is used to cache recent blocks. Availability cache is +// used to cache blocks that are accessed by sample requests. Store is thread-safe. +type Store struct { + // basepath is the root directory of the store + basepath string + // cache is used to cache recent blocks and blocks that are accessed frequently + cache *cache.DoubleCache + // stripedLocks is used to synchronize parallel operations + stripLock *striplock + metrics *metrics +} + +// NewStore creates a new EDS Store under the given basepath and datastore. +func NewStore(params *Parameters, basePath string) (*Store, error) { + if err := params.Validate(); err != nil { + return nil, err + } + + // Ensure the blocks folder exists or is created. + blocksFolderPath := basePath + blocksPath + if err := ensureFolder(blocksFolderPath); err != nil { + log.Errorf("Failed to ensure the existence of the blocks folder at '%s': %s", blocksFolderPath, err) + return nil, fmt.Errorf("ensure blocks folder '%s': %w", blocksFolderPath, err) + } + + // Ensure the heights folder exists or is created. + heightsFolderPath := basePath + heightsPath + if err := ensureFolder(heightsFolderPath); err != nil { + log.Errorf("Failed to ensure the existence of the heights folder at '%s': %s", heightsFolderPath, err) + return nil, fmt.Errorf("ensure heights folder '%s': %w", heightsFolderPath, err) + } + + err := ensureEmptyFile(basePath) + if err != nil { + return nil, fmt.Errorf("creating empty file: %w", err) + } + + recentEDSCache, err := cache.NewAccessorCache("recent", params.RecentBlocksCacheSize) + if err != nil { + return nil, fmt.Errorf("failed to create recent eds cache: %w", err) + } + + availabilityCache, err := cache.NewAccessorCache("availability", params.AvailabilityCacheSize) + if err != nil { + return nil, fmt.Errorf("failed to create availability cache: %w", err) + } + + store := &Store{ + basepath: basePath, + cache: cache.NewDoubleCache(recentEDSCache, availabilityCache), + stripLock: newStripLock(1024), + } + return store, nil +} + +func (s *Store) Close() error { + return s.metrics.close() +} + +func (s *Store) Put( + ctx context.Context, + datahash share.DataHash, + height uint64, + square *rsmt2d.ExtendedDataSquare, +) error { + tNow := time.Now() + lock := s.stripLock.byDatahashAndHeight(datahash, height) + lock.lock() + defer lock.unlock() + + path := s.basepath + blocksPath + datahash.String() + if datahash.IsEmptyRoot() { + err := s.ensureHeightLink(path, height) + return err + } + + exists, err := s.createFile(path, datahash, height, square) + if exists { + s.metrics.observePutExist(ctx) + return nil + } + if err != nil { + s.metrics.observePut(ctx, time.Since(tNow), square.Width(), true) + return fmt.Errorf("creating file: %w", err) + } + s.metrics.observePut(ctx, time.Since(tNow), square.Width(), false) + + // put file in recent cache + eds := &eds.Rsmt2D{ExtendedDataSquare: square} + _, err = s.cache.First().GetOrLoad(ctx, height, accessorLoader(eds)) + if err != nil { + log.Warnf("failed to put file in recent cache: %s", err) + } + + return nil +} + +func (s *Store) createFile( + path string, + datahash share.DataHash, + height uint64, + square *rsmt2d.ExtendedDataSquare, +) (exists bool, err error) { + f, err := file.CreateQ1Q4File(path, datahash, square) + if errors.Is(err, os.ErrExist) { + return true, nil + } + + if err != nil { + return false, fmt.Errorf("creating Q1Q4 file: %w", err) + } + + err = f.Close() + if err != nil { + return false, fmt.Errorf("closing created Q1Q4 file: %w", err) + } + + // create hard link with height as name + err = s.ensureHeightLink(path, height) + if err != nil { + // remove the file if we failed to create a hard link + removeErr := s.removeFile(datahash) + return false, fmt.Errorf("creating hard link: %w", errors.Join(err, removeErr)) + } + return false, nil +} + +func (s *Store) ensureHeightLink(path string, height uint64) error { + // create hard link with height as name + linkPath := s.basepath + heightsPath + strconv.Itoa(int(height)) + err := os.Link(path, linkPath) + if err != nil && !errors.Is(err, os.ErrExist) { + return fmt.Errorf("creating hard link: %w", err) + } + return nil +} + +func (s *Store) GetByDataRoot(ctx context.Context, datahash share.DataHash) (eds.AccessorStreamer, error) { + if datahash.IsEmptyRoot() { + return emptyAccessor, nil + } + lock := s.stripLock.byDatahash(datahash) + lock.RLock() + defer lock.RUnlock() + + tNow := time.Now() + f, err := s.getByDataRoot(datahash) + s.metrics.observeGet(ctx, time.Since(tNow), err != nil) + return f, err +} + +func (s *Store) getByDataRoot(datahash share.DataHash) (eds.AccessorStreamer, error) { + path := s.basepath + blocksPath + datahash.String() + return s.openFile(path) +} + +func (s *Store) GetByHeight(ctx context.Context, height uint64) (eds.AccessorStreamer, error) { + lock := s.stripLock.byHeight(height) + lock.RLock() + defer lock.RUnlock() + + tNow := time.Now() + f, err := s.getByHeight(height) + s.metrics.observeGet(ctx, time.Since(tNow), err != nil) + return f, err +} + +func (s *Store) getByHeight(height uint64) (eds.AccessorStreamer, error) { + f, err := s.cache.Get(height) + if err == nil { + return f, nil + } + path := s.basepath + heightsPath + strconv.Itoa(int(height)) + return s.openFile(path) +} + +func (s *Store) openFile(path string) (eds.AccessorStreamer, error) { + f, err := file.OpenQ1Q4File(path) + if err == nil { + return wrapAccessor(f), nil + } + if os.IsNotExist(err) { + return nil, ErrNotFound + } + if errors.Is(err, file.ErrEmptyFile) { + return emptyAccessor, nil + } + return nil, fmt.Errorf("opening file: %w", err) +} + +func (s *Store) HasByHash(ctx context.Context, datahash share.DataHash) (bool, error) { + if datahash.IsEmptyRoot() { + return true, nil + } + lock := s.stripLock.byDatahash(datahash) + lock.RLock() + defer lock.RUnlock() + + tNow := time.Now() + exist, err := s.hasByHash(datahash) + s.metrics.observeHas(ctx, time.Since(tNow), err != nil) + return exist, err +} + +func (s *Store) hasByHash(datahash share.DataHash) (bool, error) { + path := s.basepath + blocksPath + datahash.String() + return pathExists(path) +} + +func (s *Store) HasByHeight(ctx context.Context, height uint64) (bool, error) { + lock := s.stripLock.byHeight(height) + lock.RLock() + defer lock.RUnlock() + + tNow := time.Now() + exist, err := s.hasByHeight(height) + s.metrics.observeHas(ctx, time.Since(tNow), err != nil) + return exist, err +} + +func (s *Store) hasByHeight(height uint64) (bool, error) { + _, err := s.cache.Get(height) + if err == nil { + return true, nil + } + + path := s.basepath + heightsPath + strconv.Itoa(int(height)) + return pathExists(path) +} + +func (s *Store) Remove(ctx context.Context, height uint64, dataRoot share.DataHash) error { + tNow := time.Now() + err := s.remove(height, dataRoot) + s.metrics.observeRemove(ctx, time.Since(tNow), err != nil) + return err +} + +func (s *Store) remove(height uint64, dataRoot share.DataHash) error { + lock := s.stripLock.byHeight(height) + lock.Lock() + if err := s.removeLink(height); err != nil { + return fmt.Errorf("removing link: %w", err) + } + lock.Unlock() + + dlock := s.stripLock.byDatahash(dataRoot) + dlock.Lock() + defer dlock.Unlock() + if err := s.removeFile(dataRoot); err != nil { + return fmt.Errorf("removing file: %w", err) + } + return nil +} + +func (s *Store) removeLink(height uint64) error { + if err := s.cache.Remove(height); err != nil { + return fmt.Errorf("removing from cache: %w", err) + } + + // remove hard link by height + heightPath := s.basepath + heightsPath + strconv.Itoa(int(height)) + err := os.Remove(heightPath) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return err + } + return nil +} + +func (s *Store) removeFile(hash share.DataHash) error { + // we don't need to remove the empty file, it should always be there + if hash.IsEmptyRoot() { + return nil + } + + hashPath := s.basepath + blocksPath + hash.String() + err := os.Remove(hashPath) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return err + } + return nil +} + +func accessorLoader(accessor eds.AccessorStreamer) cache.OpenAccessorFn { + return func(context.Context) (eds.AccessorStreamer, error) { + return wrapAccessor(accessor), nil + } +} + +func wrapAccessor(accessor eds.AccessorStreamer) eds.AccessorStreamer { + withCache := eds.WithProofsCache(accessor) + closedOnce := eds.WithClosedOnce(withCache) + sanityChecked := eds.WithValidation(closedOnce) + accessorStreamer := eds.AccessorAndStreamer(sanityChecked, closedOnce) + return accessorStreamer +} + +func ensureFolder(path string) error { + info, err := os.Stat(path) + if errors.Is(err, os.ErrNotExist) { + err = os.Mkdir(path, defaultDirPerm) + if err != nil { + return fmt.Errorf("creating blocks dir: %w", err) + } + return nil + } + if err != nil { + return fmt.Errorf("checking dir: %w", err) + } + if !info.IsDir() { + return errors.New("expected dir, got a file") + } + return nil +} + +func pathExists(path string) (bool, error) { + _, err := os.Stat(path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return false, nil + } + return false, err + } + return true, nil +} + +func ensureEmptyFile(basepath string) error { + path := basepath + blocksPath + share.DataHash(share.EmptyRoot().Hash()).String() + ok, err := pathExists(path) + if err != nil { + return fmt.Errorf("checking empty root: %w", err) + } + if ok { + return nil + } + f, err := os.Create(path) + if err != nil { + return fmt.Errorf("creating empty root file: %w", err) + } + if err = f.Close(); err != nil { + return fmt.Errorf("closing empty root file: %w", err) + } + return nil +} diff --git a/store/store_options.go b/store/store_options.go new file mode 100644 index 0000000000..1e27fd00a0 --- /dev/null +++ b/store/store_options.go @@ -0,0 +1,33 @@ +package store + +import ( + "errors" +) + +type Parameters struct { + // RecentBlocksCacheSize is the size of the cache for recent blocks. + RecentBlocksCacheSize int + + // AvailabilityCacheSize is the size of the cache for accessors requested for serving availability + // samples. + AvailabilityCacheSize int +} + +// DefaultParameters returns the default configuration values for the EDS store parameters. +func DefaultParameters() *Parameters { + return &Parameters{ + RecentBlocksCacheSize: 10, + AvailabilityCacheSize: 128, + } +} + +func (p *Parameters) Validate() error { + if p.RecentBlocksCacheSize < 1 { + return errors.New("recent eds cache size must be positive") + } + + if p.AvailabilityCacheSize < 1 { + return errors.New("availability cache size must be positive") + } + return nil +} diff --git a/store/store_test.go b/store/store_test.go new file mode 100644 index 0000000000..448e804d4c --- /dev/null +++ b/store/store_test.go @@ -0,0 +1,323 @@ +package store + +import ( + "context" + "crypto/rand" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/celestiaorg/rsmt2d" + + "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/eds/edstest" + "github.com/celestiaorg/celestia-node/store/cache" +) + +func TestEDSStore(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + edsStore, err := NewStore(DefaultParameters(), t.TempDir()) + require.NoError(t, err) + + // disable cache + edsStore.cache = cache.NewDoubleCache(cache.NoopCache{}, cache.NoopCache{}) + height := atomic.Uint64{} + height.Store(100) + + // PutRegistersShard tests if Put registers the shard on the underlying DAGStore + t.Run("Put", func(t *testing.T) { + eds, dah := randomEDS(t) + height := height.Add(1) + + err := edsStore.Put(ctx, dah.Hash(), height, eds) + require.NoError(t, err) + + // file should become available by hash + has, err := edsStore.HasByHash(ctx, dah.Hash()) + require.NoError(t, err) + require.True(t, has) + + // file should become available by height + has, err = edsStore.HasByHeight(ctx, height) + require.NoError(t, err) + require.True(t, has) + }) + + t.Run("Cached after Put", func(t *testing.T) { + edsStore, err := NewStore(DefaultParameters(), t.TempDir()) + require.NoError(t, err) + + eds, dah := randomEDS(t) + height := height.Add(1) + + err = edsStore.Put(ctx, dah.Hash(), height, eds) + require.NoError(t, err) + + // file should be cached after put + f, err := edsStore.cache.Get(height) + require.NoError(t, err) + require.NoError(t, f.Close()) + + // check that cached file is the same eds + fromFile, err := f.Shares(ctx) + require.NoError(t, err) + require.NoError(t, f.Close()) + expected := eds.FlattenedODS() + require.Equal(t, expected, fromFile) + }) + + t.Run("Second Put should be noop", func(t *testing.T) { + eds, dah := randomEDS(t) + height := height.Add(1) + + err := edsStore.Put(ctx, dah.Hash(), height, eds) + require.NoError(t, err) + + err = edsStore.Put(ctx, dah.Hash(), height, eds) + require.NoError(t, err) + // TODO: check amount of files in the store after the second Put + // after store supports listing + }) + + t.Run("GetByHeight", func(t *testing.T) { + eds, dah := randomEDS(t) + height := height.Add(1) + + err = edsStore.Put(ctx, dah.Hash(), height, eds) + require.NoError(t, err) + + f, err := edsStore.GetByHeight(ctx, height) + require.NoError(t, err) + + // check that cached file is the same eds + fromFile, err := f.Shares(ctx) + require.NoError(t, err) + require.NoError(t, f.Close()) + expected := eds.FlattenedODS() + require.Equal(t, expected, fromFile) + }) + + t.Run("GetByDataRoot", func(t *testing.T) { + eds, dah := randomEDS(t) + height := height.Add(1) + + err := edsStore.Put(ctx, dah.Hash(), height, eds) + require.NoError(t, err) + + f, err := edsStore.GetByDataRoot(ctx, dah.Hash()) + require.NoError(t, err) + + // check that cached file is the same eds + fromFile, err := f.Shares(ctx) + require.NoError(t, err) + require.NoError(t, f.Close()) + expected := eds.FlattenedODS() + require.Equal(t, expected, fromFile) + }) + + t.Run("Does not exist", func(t *testing.T) { + _, dah := randomEDS(t) + height := height.Add(1) + + has, err := edsStore.HasByHash(ctx, dah.Hash()) + require.NoError(t, err) + require.False(t, has) + + has, err = edsStore.HasByHeight(ctx, height) + require.NoError(t, err) + require.False(t, has) + + _, err = edsStore.GetByHeight(ctx, height) + require.ErrorIs(t, err, ErrNotFound) + + _, err = edsStore.GetByDataRoot(ctx, dah.Hash()) + require.ErrorIs(t, err, ErrNotFound) + }) + + t.Run("Remove", func(t *testing.T) { + // removing file that does not exist should be noop + missingHeight := height.Add(1) + err := edsStore.Remove(ctx, missingHeight, share.DataHash{0x01, 0x02}) + require.NoError(t, err) + + eds, dah := randomEDS(t) + height := height.Add(1) + err = edsStore.Put(ctx, dah.Hash(), height, eds) + require.NoError(t, err) + + err = edsStore.Remove(ctx, height, dah.Hash()) + require.NoError(t, err) + + // file should be removed from cache + _, err = edsStore.cache.Get(height) + require.ErrorIs(t, err, cache.ErrCacheMiss) + + // file should not be accessible by hash + has, err := edsStore.HasByHash(ctx, dah.Hash()) + require.NoError(t, err) + require.False(t, has) + + // subsequent remove should be noop + err = edsStore.Remove(ctx, height, dah.Hash()) + require.NoError(t, err) + + // file should not be accessible by height + has, err = edsStore.HasByHeight(ctx, height) + require.NoError(t, err) + require.False(t, has) + }) + + t.Run("empty EDS returned by hash", func(t *testing.T) { + eds := share.EmptyExtendedDataSquare() + dah, err := share.NewRoot(eds) + require.NoError(t, err) + + // assert that the empty file exists + has, err := edsStore.HasByHash(ctx, dah.Hash()) + require.NoError(t, err) + require.True(t, has) + + // assert that the empty file is, in fact, empty + f, err := edsStore.GetByDataRoot(ctx, dah.Hash()) + require.NoError(t, err) + hash, err := f.DataRoot(ctx) + require.NoError(t, err) + require.True(t, hash.IsEmptyRoot()) + }) + + t.Run("empty EDS returned by height", func(t *testing.T) { + eds := share.EmptyExtendedDataSquare() + dah, err := share.NewRoot(eds) + require.NoError(t, err) + height := height.Add(1) + + // assert that the empty file exists + has, err := edsStore.HasByHeight(ctx, height) + require.NoError(t, err) + require.False(t, has) + + err = edsStore.Put(ctx, dah.Hash(), height, eds) + require.NoError(t, err) + + // assert that the empty file can be accessed by height + f, err := edsStore.GetByHeight(ctx, height) + require.NoError(t, err) + hash, err := f.DataRoot(ctx) + require.NoError(t, err) + require.True(t, hash.IsEmptyRoot()) + require.NoError(t, f.Close()) + }) + + t.Run("empty EDS are persisted", func(t *testing.T) { + dir := t.TempDir() + edsStore, err := NewStore(DefaultParameters(), dir) + require.NoError(t, err) + + eds := share.EmptyExtendedDataSquare() + dah, err := share.NewRoot(eds) + require.NoError(t, err) + from, to := 10, 20 + + // store empty EDSs + for i := from; i <= to; i++ { + err := edsStore.Put(ctx, dah.Hash(), uint64(i), eds) + require.NoError(t, err) + } + + // close and reopen the store to ensure that the empty files are persisted + require.NoError(t, edsStore.Close()) + edsStore, err = NewStore(DefaultParameters(), dir) + require.NoError(t, err) + + // assert that the empty files restored from disk + for i := from; i <= to; i++ { + f, err := edsStore.GetByHeight(ctx, uint64(i)) + require.NoError(t, err) + hash, err := f.DataRoot(ctx) + require.NoError(t, err) + require.True(t, hash.IsEmptyRoot()) + require.NoError(t, f.Close()) + } + }) +} + +func BenchmarkStore(b *testing.B) { + ctx, cancel := context.WithCancel(context.Background()) + b.Cleanup(cancel) + + edsStore, err := NewStore(DefaultParameters(), b.TempDir()) + require.NoError(b, err) + + eds := edstest.RandEDS(b, 128) + require.NoError(b, err) + + // BenchmarkStore/bench_put_128-10 27 43968818 ns/op (~43ms) + b.Run("put 128", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + bytes := make([]byte, 5) + rand.Read(bytes) //nolint:errcheck + h := share.DataHash(bytes) + _ = edsStore.Put(ctx, h, uint64(i), eds) + } + }) + + // read 128 EDSs does not read full EDS, but only the header + // BenchmarkStore/bench_read_128-10 82766 14678 ns/op (~14ms) + b.Run("open by height, 128", func(b *testing.B) { + edsStore, err := NewStore(DefaultParameters(), b.TempDir()) + require.NoError(b, err) + + // disable cache + edsStore.cache = cache.NewDoubleCache(cache.NoopCache{}, cache.NoopCache{}) + + dah, err := share.NewRoot(eds) + require.NoError(b, err) + + height := uint64(1984) + err = edsStore.Put(ctx, dah.Hash(), height, eds) + require.NoError(b, err) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + f, err := edsStore.GetByHeight(ctx, height) + require.NoError(b, err) + require.NoError(b, f.Close()) + } + }) + + // BenchmarkStore/open_by_hash,_128-10 72921 16799 ns/op (~16ms) + b.Run("open by hash, 128", func(b *testing.B) { + edsStore, err := NewStore(DefaultParameters(), b.TempDir()) + require.NoError(b, err) + + // disable cache + edsStore.cache = cache.NewDoubleCache(cache.NoopCache{}, cache.NoopCache{}) + + dah, err := share.NewRoot(eds) + require.NoError(b, err) + + height := uint64(1984) + err = edsStore.Put(ctx, dah.Hash(), height, eds) + require.NoError(b, err) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + f, err := edsStore.GetByDataRoot(ctx, dah.Hash()) + require.NoError(b, err) + require.NoError(b, f.Close()) + } + }) +} + +func randomEDS(t *testing.T) (*rsmt2d.ExtendedDataSquare, *share.Root) { + eds := edstest.RandEDS(t, 4) + dah, err := share.NewRoot(eds) + require.NoError(t, err) + + return eds, dah +} diff --git a/store/striplock.go b/store/striplock.go new file mode 100644 index 0000000000..69cee69f2d --- /dev/null +++ b/store/striplock.go @@ -0,0 +1,55 @@ +package store + +import ( + "sync" + + "github.com/celestiaorg/celestia-node/share" +) + +// TODO: move to utils +type striplock struct { + heights []*sync.RWMutex + datahashes []*sync.RWMutex +} + +type multiLock struct { + mu []*sync.RWMutex +} + +func newStripLock(size int) *striplock { + heights := make([]*sync.RWMutex, size) + datahashes := make([]*sync.RWMutex, size) + for i := 0; i < size; i++ { + heights[i] = &sync.RWMutex{} + datahashes[i] = &sync.RWMutex{} + } + return &striplock{heights, datahashes} +} + +func (l *striplock) byHeight(height uint64) *sync.RWMutex { + lkIdx := height % uint64(len(l.heights)) + return l.heights[lkIdx] +} + +func (l *striplock) byDatahash(datahash share.DataHash) *sync.RWMutex { + // Use the last 2 bytes of the datahash as hash to distribute the locks + last := uint16(datahash[len(datahash)-1]) | uint16(datahash[len(datahash)-2])<<8 + lkIdx := last % uint16(len(l.datahashes)) + return l.datahashes[lkIdx] +} + +func (l *striplock) byDatahashAndHeight(datahash share.DataHash, height uint64) *multiLock { + return &multiLock{[]*sync.RWMutex{l.byDatahash(datahash), l.byHeight(height)}} +} + +func (m *multiLock) lock() { + for _, lk := range m.mu { + lk.Lock() + } +} + +func (m *multiLock) unlock() { + for _, lk := range m.mu { + lk.Unlock() + } +}