Skip to content

Commit

Permalink
Merge branch 'main' into pruner/reset-maxHeadersPerLoop-properly
Browse files Browse the repository at this point in the history
  • Loading branch information
renaynay authored Jul 8, 2024
2 parents e90c4e8 + 1828f78 commit 6531d24
Show file tree
Hide file tree
Showing 35 changed files with 1,094 additions and 869 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/ci_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,8 @@ jobs:
contents: write
steps:
- uses: actions/checkout@v4

env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- run: |
make openrpc-gen > openrpc.json
gh release upload ${{github.event.release.tag_name}} openrpc.json
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
11 changes: 9 additions & 2 deletions api/docgen/examples.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ var ExampleValues = map[reflect.Type]interface{}{
reflect.TypeOf(42): 42,
reflect.TypeOf(byte(7)): byte(7),
reflect.TypeOf(float64(42)): float64(42),
reflect.TypeOf(blob.GasPrice(0)): blob.GasPrice(0.002),
reflect.TypeOf(true): true,
reflect.TypeOf([]byte{}): []byte("byte array"),
reflect.TypeOf(node.Full): node.Full,
Expand Down Expand Up @@ -177,8 +176,16 @@ func init() {
if err != nil {
panic(err)
}

addToExampleValues(libhead.Hash(hash))

txConfig := state.NewTxConfig(
state.WithGasPrice(0.002),
state.WithGas(142225),
state.WithKeyName("my_celes_key"),
state.WithSignerAddress("celestia1pjcmwj8w6hyr2c4wehakc5g8cfs36aysgucx66"),
state.WithFeeGranterAddress("celestia1hakc56ax66ypjcmwj8w6hyr2c4g8cfs3wesguc"),
)
addToExampleValues(txConfig)
}

func addToExampleValues(v interface{}) {
Expand Down
6 changes: 0 additions & 6 deletions api/gateway/bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@ func (h *Handler) RegisterEndpoints(rpc *Server) {
http.MethodGet,
)

rpc.RegisterHandlerFunc(
submitTxEndpoint,
h.handleSubmitTx,
http.MethodPost,
)

rpc.RegisterHandlerFunc(
healthEndpoint,
h.handleHealthRequest,
Expand Down
6 changes: 0 additions & 6 deletions api/gateway/bindings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ func TestRegisterEndpoints(t *testing.T) {
method: http.MethodGet,
expected: true,
},
{
name: "Submit transaction endpoint",
path: submitTxEndpoint,
method: http.MethodPost,
expected: true,
},
{
name: "Get namespaced shares by height endpoint",
path: fmt.Sprintf("%s/{%s}/height/{%s}", namespacedSharesEndpoint, namespaceKey, heightKey),
Expand Down
39 changes: 1 addition & 38 deletions api/gateway/state.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package gateway

import (
"encoding/hex"
"encoding/json"
"errors"
"net/http"
Expand All @@ -13,8 +12,7 @@ import (
)

const (
balanceEndpoint = "/balance"
submitTxEndpoint = "/submit_tx"
balanceEndpoint = "/balance"
)

const addrKey = "address"
Expand All @@ -24,11 +22,6 @@ var (
ErrMissingAddress = errors.New("address not specified")
)

// submitTxRequest represents a request to submit a raw transaction
type submitTxRequest struct {
Tx string `json:"tx"`
}

func (h *Handler) handleBalanceRequest(w http.ResponseWriter, r *http.Request) {
var (
bal *state.Balance
Expand Down Expand Up @@ -70,33 +63,3 @@ func (h *Handler) handleBalanceRequest(w http.ResponseWriter, r *http.Request) {
log.Errorw("writing response", "endpoint", balanceEndpoint, "err", err)
}
}

func (h *Handler) handleSubmitTx(w http.ResponseWriter, r *http.Request) {
// decode request
var req submitTxRequest
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
writeError(w, http.StatusBadRequest, submitTxEndpoint, err)
return
}
rawTx, err := hex.DecodeString(req.Tx)
if err != nil {
writeError(w, http.StatusBadRequest, submitTxEndpoint, err)
return
}
// perform request
txResp, err := h.state.SubmitTx(r.Context(), rawTx)
if err != nil {
writeError(w, http.StatusInternalServerError, submitTxEndpoint, err)
return
}
resp, err := json.Marshal(txResp)
if err != nil {
writeError(w, http.StatusInternalServerError, submitTxEndpoint, err)
return
}
_, err = w.Write(resp)
if err != nil {
log.Errorw("writing response", "endpoint", submitTxEndpoint, "err", err)
}
}
6 changes: 3 additions & 3 deletions api/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,14 @@ func TestAuthedRPC(t *testing.T) {
expectedResp := &state.TxResponse{}
if tt.perm > 2 {
server.State.EXPECT().Delegate(gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any(), gomock.Any()).Return(expectedResp, nil)
gomock.Any(), gomock.Any()).Return(expectedResp, nil)
txResp, err := rpcClient.State.Delegate(ctx,
state.ValAddress{}, state.Int{}, state.Int{}, 0)
state.ValAddress{}, state.Int{}, state.NewTxConfig())
require.NoError(t, err)
require.Equal(t, expectedResp, txResp)
} else {
_, err := rpcClient.State.Delegate(ctx,
state.ValAddress{}, state.Int{}, state.Int{}, 0)
state.ValAddress{}, state.Int{}, state.NewTxConfig())
require.Error(t, err)
require.ErrorContains(t, err, "missing permission")
}
Expand Down
10 changes: 10 additions & 0 deletions blob/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/tendermint/tendermint/types"

"github.com/celestiaorg/celestia-app/pkg/shares"
apptypes "github.com/celestiaorg/celestia-app/x/blob/types"

"github.com/celestiaorg/celestia-node/share"
)
Expand Down Expand Up @@ -36,6 +37,15 @@ func BlobsToShares(blobs ...*Blob) ([]share.Share, error) {
return shares.ToBytes(rawShares), nil
}

// ToAppBlobs converts node's blob type to the blob type from celestia-app.
func ToAppBlobs(blobs ...*Blob) []*apptypes.Blob {
appBlobs := make([]*apptypes.Blob, 0, len(blobs))
for i := range blobs {
appBlobs[i] = &blobs[i].Blob
}
return appBlobs
}

// toAppShares converts node's raw shares to the app shares, skipping padding
func toAppShares(shrs ...share.Share) ([]shares.Share, error) {
appShrs := make([]shares.Share, 0, len(shrs))
Expand Down
115 changes: 42 additions & 73 deletions blob/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,22 @@ import (
"context"
"errors"
"fmt"
"math"
"slices"
"sync"

sdkmath "cosmossdk.io/math"
"github.com/cosmos/cosmos-sdk/types"
auth "github.com/cosmos/cosmos-sdk/x/auth/types"
logging "github.com/ipfs/go-log/v2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

"github.com/celestiaorg/celestia-app/pkg/appconsts"
"github.com/celestiaorg/celestia-app/pkg/shares"
blobtypes "github.com/celestiaorg/celestia-app/x/blob/types"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/libs/utils"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/state"
)

var (
Expand All @@ -33,21 +30,15 @@ var (
tracer = otel.Tracer("blob/service")
)

// GasPrice represents the amount to be paid per gas unit. Fee is set by
// multiplying GasPrice by GasLimit, which is determined by the blob sizes.
type GasPrice float64

// DefaultGasPrice returns the default gas price, letting node automatically
// determine the Fee based on the passed blob sizes.
func DefaultGasPrice() GasPrice {
return -1.0
}
// SubmitOptions aliases TxOptions from state package allowing users
// to specify options for SubmitPFB transaction.
type SubmitOptions = state.TxConfig

// Submitter is an interface that allows submitting blobs to the celestia-core. It is used to
// avoid a circular dependency between the blob and the state package, since the state package needs
// the blob.Blob type for this signature.
type Submitter interface {
SubmitPayForBlob(ctx context.Context, fee sdkmath.Int, gasLim uint64, blobs []*Blob) (*types.TxResponse, error)
SubmitPayForBlob(context.Context, []*state.Blob, *state.TxConfig) (*types.TxResponse, error)
}

type Service struct {
Expand All @@ -71,48 +62,32 @@ func NewService(
}
}

// SubmitOptions contains the information about fee and gasLimit price in order to configure the
// Submit request.
type SubmitOptions struct {
Fee int64
GasLimit uint64
}

// DefaultSubmitOptions creates a default fee and gas price values.
func DefaultSubmitOptions() *SubmitOptions {
return &SubmitOptions{
Fee: -1,
GasLimit: 0,
}
}

// Submit sends PFB transaction and reports the height at which it was included.
// Allows sending multiple Blobs atomically synchronously.
// Uses default wallet registered on the Node.
// Handles gas estimation and fee calculation.
func (s *Service) Submit(ctx context.Context, blobs []*Blob, gasPrice GasPrice) (uint64, error) {
func (s *Service) Submit(ctx context.Context, blobs []*Blob, txConfig *SubmitOptions) (uint64, error) {
log.Debugw("submitting blobs", "amount", len(blobs))

options := DefaultSubmitOptions()
if gasPrice >= 0 {
blobSizes := make([]uint32, len(blobs))
for i, blob := range blobs {
blobSizes[i] = uint32(len(blob.Data))
appblobs := make([]*state.Blob, len(blobs))
for i := range blobs {
if err := blobs[i].Namespace().ValidateForBlob(); err != nil {
return 0, err
}
options.GasLimit = blobtypes.EstimateGas(blobSizes, appconsts.DefaultGasPerBlobByte, auth.DefaultTxSizeCostPerByte)
options.Fee = types.NewInt(int64(math.Ceil(float64(gasPrice) * float64(options.GasLimit)))).Int64()
appblobs[i] = &blobs[i].Blob
}

resp, err := s.blobSubmitter.SubmitPayForBlob(ctx, types.NewInt(options.Fee), options.GasLimit, blobs)
resp, err := s.blobSubmitter.SubmitPayForBlob(ctx, appblobs, txConfig)
if err != nil {
return 0, err
}
return uint64(resp.Height), nil
}

// Get retrieves all the blobs for given namespaces at the given height by commitment.
// Get collects all namespaced data from the EDS, constructs blobs
// and compares commitments. `ErrBlobNotFound` can be returned in case blob was not found.
// Get retrieves a blob in a given namespace at the given height by commitment.
// Get collects all namespaced data from the EDS, construct the blob
// and compares the commitment argument.
// `ErrBlobNotFound` can be returned in case blob was not found.
func (s *Service) Get(
ctx context.Context,
height uint64,
Expand All @@ -136,9 +111,9 @@ func (s *Service) Get(
return
}

// GetProof retrieves all blobs in the given namespaces at the given height by commitment
// and returns their Proof. It collects all namespaced data from the EDS, constructs blobs
// and compares commitments.
// GetProof returns an NMT inclusion proof for a specified namespace to the respective row roots
// on which the blob spans on at the given height, using the given commitment.
// It employs the same algorithm as service.Get() internally.
func (s *Service) GetProof(
ctx context.Context,
height uint64,
Expand All @@ -163,7 +138,14 @@ func (s *Service) GetProof(
}

// GetAll returns all blobs under the given namespaces at the given height.
// GetAll can return blobs and an error in case if some requests failed.
// If all blobs were found without any errors, the user will receive a list of blobs.
// If the BlobService couldn't find any blobs under the requested namespaces,
// the user will receive an empty list of blobs along with an empty error.
// If some of the requested namespaces were not found, the user will receive all the found blobs and an empty error.
// If there were internal errors during some of the requests,
// the user will receive all found blobs along with a combined error message.
//
// All blobs will preserve the order of the namespaces that were requested.
func (s *Service) GetAll(ctx context.Context, height uint64, namespaces []share.Namespace) ([]*Blob, error) {
header, err := s.headerGetter(ctx, height)
if err != nil {
Expand All @@ -173,40 +155,30 @@ func (s *Service) GetAll(ctx context.Context, height uint64, namespaces []share.
var (
resultBlobs = make([][]*Blob, len(namespaces))
resultErr = make([]error, len(namespaces))
wg = sync.WaitGroup{}
)

for _, namespace := range namespaces {
log.Debugw("performing GetAll request", "namespace", namespace.String(), "height", height)
}

wg := sync.WaitGroup{}
for i, namespace := range namespaces {
wg.Add(1)
go func(i int, namespace share.Namespace) {
log.Debugw("retrieving all blobs from", "namespace", namespace.String(), "height", height)
defer wg.Done()

blobs, err := s.getBlobs(ctx, namespace, header)
if err != nil {
resultErr[i] = fmt.Errorf("getting blobs for namespace(%s): %w", namespace.String(), err)
return
if err != nil && !errors.Is(err, ErrBlobNotFound) {
log.Errorf("getting blobs for namespaceID(%s): %v", namespace.ID().String(), err)
resultErr[i] = err
}
if len(blobs) > 0 {
log.Infow("retrieved blobs", "height", height, "total", len(blobs))
resultBlobs[i] = blobs
}

log.Debugw("receiving blobs", "height", height, "total", len(blobs))
resultBlobs[i] = blobs
}(i, namespace)
}
wg.Wait()

blobs := make([]*Blob, 0)
for _, resBlobs := range resultBlobs {
if len(resBlobs) > 0 {
blobs = append(blobs, resBlobs...)
}
}

if len(blobs) == 0 {
resultErr = append(resultErr, ErrBlobNotFound)
}
return blobs, errors.Join(resultErr...)
blobs := slices.Concat(resultBlobs...)
err = errors.Join(resultErr...)
return blobs, err
}

// Included verifies that the blob was included in a specific height.
Expand Down Expand Up @@ -413,8 +385,5 @@ func (s *Service) getBlobs(
sharesParser := &parser{verifyFn: verifyFn}

_, _, err = s.retrieve(ctx, header.Height(), namespace, sharesParser)
if len(blobs) == 0 {
return nil, ErrBlobNotFound
}
return blobs, nil
return blobs, err
}
Loading

0 comments on commit 6531d24

Please sign in to comment.