Skip to content

Commit

Permalink
feat(blob/trace): cover blob service with traces
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs committed Jan 18, 2024
1 parent 36205cc commit f7b22ee
Showing 1 changed file with 31 additions and 6 deletions.
37 changes: 31 additions & 6 deletions blob/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,24 @@ import (
"sync"

"cosmossdk.io/math"
"github.com/celestiaorg/celestia-app/pkg/shares"
"github.com/cosmos/cosmos-sdk/types"
logging "github.com/ipfs/go-log/v2"

"github.com/celestiaorg/celestia-app/pkg/shares"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/libs/utils"
"github.com/celestiaorg/celestia-node/share"
)

var (
ErrBlobNotFound = errors.New("blob: not found")
ErrInvalidProof = errors.New("blob: invalid proof")

log = logging.Logger("blob")
log = logging.Logger("blob")
tracer = otel.Tracer("blob/service")
)

// Submitter is an interface that allows submitting blobs to the celestia-core. It is used to
Expand Down Expand Up @@ -165,7 +169,11 @@ func (s *Service) Included(
namespace share.Namespace,
proof *Proof,
com Commitment,
) (bool, error) {
) (_ bool, err error) {
ctx, span := tracer.Start(ctx, "included")
defer func() {
utils.SetStatusAndEnd(span, err)
}()
// In the current implementation, LNs will have to download all shares to recompute the commitment.
// To achieve 1. we need to modify Proof structure and to store all subtree roots, that were
// involved in commitment creation and then call `merkle.HashFromByteSlices`(tendermint package).
Expand Down Expand Up @@ -193,16 +201,28 @@ func (s *Service) getByCommitment(
height uint64,
namespace share.Namespace,
commitment Commitment,
) (*Blob, *Proof, error) {
) (_ *Blob, _ *Proof, err error) {
log.Infow("requesting blob",
"height", height,
"namespace", namespace.String())

ctx, span := tracer.Start(ctx, "get-by-commitment")
defer func() {
utils.SetStatusAndEnd(span, err)
}()
span.SetAttributes(
attribute.Int64("height", int64(height)),
attribute.String("commitment", string(commitment)),
)

header, err := s.headerGetter(ctx, height)
if err != nil {
return nil, nil, err
}

span.AddEvent("received eds", trace.WithAttributes(
attribute.Int64("eds-size", int64(len(header.DAH.RowRoots)))))

namespacedShares, err := s.shareGetter.GetSharesByNamespace(ctx, header, namespace)
if err != nil {
if errors.Is(err, share.ErrNotFound) {
Expand Down Expand Up @@ -240,6 +260,7 @@ func (s *Service) getByCommitment(
}
for _, b := range blobs {
if b.Commitment.Equal(commitment) {
span.AddEvent("blob reconstructed")
return b, &proofs, nil
}
// Falling under this flag means that the data from the last row
Expand Down Expand Up @@ -276,7 +297,11 @@ func (s *Service) getBlobs(
ctx context.Context,
namespace share.Namespace,
header *header.ExtendedHeader,
) ([]*Blob, error) {
) (_ []*Blob, err error) {
ctx, span := tracer.Start(ctx, "get-blobs")
defer func() {
utils.SetStatusAndEnd(span, err)
}()
namespacedShares, err := s.shareGetter.GetSharesByNamespace(ctx, header, namespace)
if err != nil {
return nil, err
Expand Down

0 comments on commit f7b22ee

Please sign in to comment.