Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(shwap): Add eds store #3545

Merged
merged 20 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.26.0
go.opentelemetry.io/otel/trace v1.26.0
go.opentelemetry.io/proto/otlp v1.2.0
go.uber.org/atomic v1.11.0
go.uber.org/fx v1.22.0
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.24.0
Expand Down Expand Up @@ -324,7 +325,6 @@ require (
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.26.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/dig v1.17.1 // indirect
go.uber.org/mock v0.4.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions share/new_eds/accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
type Accessor interface {
// Size returns square size of the Accessor.
Size(ctx context.Context) int
// DataHash returns data hash of the Accessor.
DataHash(ctx context.Context) (share.DataHash, error)
// Sample returns share and corresponding proof for row and column indices. Implementation can
// choose which axis to use for proof. Chosen axis for proof should be indicated in the returned
// Sample.
Expand Down
8 changes: 8 additions & 0 deletions share/new_eds/close_once.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ func (c *closeOnce) Size(ctx context.Context) int {
return c.f.Size(ctx)
}

// DataHash returns data hash of the Accessor.
func (c *closeOnce) DataHash(ctx context.Context) (share.DataHash, error) {
if c.closed.Load() {
return nil, errAccessorClosed
}
return c.f.DataHash(ctx)
}

func (c *closeOnce) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error) {
if c.closed.Load() {
return shwap.Sample{}, errAccessorClosed
Expand Down
4 changes: 4 additions & 0 deletions share/new_eds/close_once_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (s *stubEdsAccessorCloser) Size(context.Context) int {
return 0
}

func (s *stubEdsAccessorCloser) DataHash(context.Context) (share.DataHash, error) {
return nil, nil
}

func (s *stubEdsAccessorCloser) Sample(context.Context, int, int) (shwap.Sample, error) {
return shwap.Sample{}, nil
}
Expand Down
5 changes: 5 additions & 0 deletions share/new_eds/proofs_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ func (c *proofsCache) Size(ctx context.Context) int {
return int(size)
}

// DataHash returns data hash of the Accessor.
func (c *proofsCache) DataHash(ctx context.Context) (share.DataHash, error) {
return c.inner.DataHash(ctx)
}

func (c *proofsCache) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error) {
axisType, axisIdx, shrIdx := rsmt2d.Row, rowIdx, colIdx
ax, err := c.axisWithProofs(ctx, axisType, axisIdx)
Expand Down
7 changes: 7 additions & 0 deletions share/new_eds/rsmt2d.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"

"github.com/celestiaorg/celestia-app/pkg/da"
"github.com/celestiaorg/celestia-app/pkg/wrapper"
"github.com/celestiaorg/rsmt2d"

Expand All @@ -24,6 +25,12 @@ func (eds *Rsmt2D) Size(context.Context) int {
return int(eds.Width())
}

// DataHash returns data hash of the Accessor.
func (eds *Rsmt2D) DataHash(context.Context) (share.DataHash, error) {
dah, _ := da.NewDataAvailabilityHeader(eds.ExtendedDataSquare)
return dah.Hash(), nil
}

// Sample returns share and corresponding proof for row and column indices.
func (eds *Rsmt2D) Sample(
_ context.Context,
Expand Down
4 changes: 4 additions & 0 deletions store/cache/accessor_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ func (m *mockAccessor) Size(context.Context) int {
panic("implement me")
}

func (m *mockAccessor) DataHash(context.Context) (share.DataHash, error) {
panic("implement me")
}

func (m *mockAccessor) Sample(context.Context, int, int) (shwap.Sample, error) {
panic("implement me")
}
Expand Down
5 changes: 5 additions & 0 deletions store/cache/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ func (n NoopFile) Size(context.Context) int {
return 0
}

// DataHash returns data hash of the Accessor.
func (n NoopFile) DataHash(context.Context) (share.DataHash, error) {
return nil, nil
}

func (n NoopFile) Sample(context.Context, int, int) (shwap.Sample, error) {
return shwap.Sample{}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion store/file/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func BenchmarkCodec(b *testing.B) {
}
}

func newShards(b require.TestingT, size int, fillParity bool) [][]byte {
func newShards(b testing.TB, size int, fillParity bool) [][]byte {
shards := make([][]byte, size)
original := sharetest.RandShares(b, size/2)
copy(shards, original)
Expand Down
5 changes: 5 additions & 0 deletions store/file/ods.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ func (f *ODSFile) size() int {
return int(f.hdr.squareSize)
}

// DataHash returns data hash of the Accessor.
func (f *ODSFile) DataHash(context.Context) (share.DataHash, error) {
return f.hdr.datahash, nil
}

// Close closes the file.
func (f *ODSFile) Close() error {
return f.fl.Close()
Expand Down
5 changes: 5 additions & 0 deletions store/file/q1q4_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ func (f *Q1Q4File) Size(ctx context.Context) int {
return f.ods.Size(ctx)
}

// DataHash returns data hash of the Accessor.
func (f *Q1Q4File) DataHash(ctx context.Context) (share.DataHash, error) {
return f.ods.DataHash(ctx)
}

func (f *Q1Q4File) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error) {
// use native AxisHalf implementation, to read axis from Q4 quandrant when possible
half, err := f.AxisHalf(ctx, rsmt2d.Row, rowIdx)
Expand Down
130 changes: 130 additions & 0 deletions store/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package store

import (
"context"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

const (
failedKey = "failed"
sizeKey = "eds_size"
)

var meter = otel.Meter("store")

type metrics struct {
put metric.Float64Histogram
putExists metric.Int64Counter
get metric.Float64Histogram
has metric.Float64Histogram
remove metric.Float64Histogram
}

func (s *Store) WithMetrics() error {
put, err := meter.Float64Histogram("eds_store_put_time_histogram",
metric.WithDescription("eds store put time histogram(s)"))
if err != nil {
return err
}

putExists, err := meter.Int64Counter("eds_store_put_exists_counter",
metric.WithDescription("eds store put file exists"))
if err != nil {
return err
}

get, err := meter.Float64Histogram("eds_store_get_time_histogram",
metric.WithDescription("eds store get time histogram(s)"))
if err != nil {
return err
}

has, err := meter.Float64Histogram("eds_store_has_time_histogram",
metric.WithDescription("eds store has time histogram(s)"))
if err != nil {
return err
}

remove, err := meter.Float64Histogram("eds_store_remove_time_histogram",
metric.WithDescription("eds store remove time histogram(s)"))
if err != nil {
return err
}

if err = s.cache.EnableMetrics(); err != nil {
return err
}

s.metrics = &metrics{
put: put,
putExists: putExists,
get: get,
has: has,
remove: remove,
}
return nil
}

func (m *metrics) observePut(ctx context.Context, dur time.Duration, size uint, failed bool) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.put.Record(ctx, dur.Seconds(), metric.WithAttributes(
attribute.Bool(failedKey, failed),
attribute.Int(sizeKey, int(size))))
}

func (m *metrics) observePutExist(ctx context.Context) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.putExists.Add(ctx, 1)
}

func (m *metrics) observeGet(ctx context.Context, dur time.Duration, failed bool) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.get.Record(ctx, dur.Seconds(), metric.WithAttributes(
attribute.Bool(failedKey, failed)))
}

func (m *metrics) observeHas(ctx context.Context, dur time.Duration, failed bool) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.has.Record(ctx, dur.Seconds(), metric.WithAttributes(
attribute.Bool(failedKey, failed)))
}

func (m *metrics) observeRemove(ctx context.Context, dur time.Duration, failed bool) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.remove.Record(ctx, dur.Seconds(), metric.WithAttributes(
attribute.Bool(failedKey, failed)))
}
Loading