Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(da): metrics #11

Merged
merged 10 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ Run centralized-sequencer by specifying DA network details:
| `host` | centralized sequencer host | localhost |
| `port` | centralized sequencer port | 50051 |
| `listen-all` |listen on all network interfaces (0.0.0.0) instead of just localhost|disabled|
| `metrics` |enable prometheus metrics|disabled|
| `metrics-address` |address to expose prometheus metrics|`":8080"`|
<!-- markdownlint-enable MD013 -->

See `./build/centralized-sequencer --help` for details.
Expand Down
3 changes: 3 additions & 0 deletions da/da.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ type BaseResult struct {
Message string
// DAHeight informs about a height on Data Availability Layer for given result.
DAHeight uint64
// BlobSize is the size of the blob submitted.
BlobSize uint64
// SubmittedCount is the number of successfully submitted blocks.
SubmittedCount uint64
}
Expand Down Expand Up @@ -192,6 +194,7 @@ func (dac *DAClient) SubmitBatch(ctx context.Context, data []*sequencing.Batch,
BaseResult: BaseResult{
Code: StatusSuccess,
DAHeight: binary.LittleEndian.Uint64(ids[0]),
BlobSize: blobSize,
tuxcanfly marked this conversation as resolved.
Show resolved Hide resolved
SubmittedCount: uint64(len(ids)),
},
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ toolchain go1.22.3

require (
github.com/dgraph-io/badger/v3 v3.2103.5
github.com/go-kit/kit v0.13.0
github.com/gogo/protobuf v1.3.2
github.com/ipfs/go-log/v2 v2.5.1
github.com/prometheus/client_golang v1.19.1
github.com/rollkit/go-da v0.8.0
github.com/rollkit/go-sequencing v0.3.0
github.com/rollkit/rollkit v0.13.7
Expand All @@ -30,7 +32,6 @@ require (
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/filecoin-project/go-jsonrpc v0.6.0 // indirect
github.com/go-kit/kit v0.13.0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/golang/glog v1.2.2 // indirect
Expand Down Expand Up @@ -68,7 +69,6 @@ require (
github.com/petermattis/goid v0.0.0-20230317030725-371a4b8eda08 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
Expand Down
52 changes: 43 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package main

import (
"context"
"encoding/hex"
"flag"
"fmt"
"log"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/rollkit/centralized-sequencer/sequencing"
sequencingGRPC "github.com/rollkit/go-sequencing/proxy/grpc"
)
Expand All @@ -24,14 +28,16 @@ const (

func main() {
var (
host string
port string
listenAll bool
batchTime time.Duration
da_address string
da_namespace string
da_auth_token string
db_path string
host string
port string
listenAll bool
batchTime time.Duration
da_address string
da_namespace string
da_auth_token string
db_path string
metricsEnabled bool
metricsAddress string
)
flag.StringVar(&host, "host", defaultHost, "centralized sequencer host")
flag.StringVar(&port, "port", defaultPort, "centralized sequencer port")
Expand All @@ -41,6 +47,8 @@ func main() {
flag.StringVar(&da_namespace, "da_namespace", "", "DA namespace where the sequencer submits transactions")
flag.StringVar(&da_auth_token, "da_auth_token", "", "auth token for the DA")
flag.StringVar(&db_path, "db_path", "", "path to the database")
flag.BoolVar(&metricsEnabled, "metrics", false, "Enable Prometheus metrics")
flag.StringVar(&metricsAddress, "metrics-address", ":8080", "Address to expose Prometheus metrics")

flag.Parse()

Expand All @@ -60,7 +68,28 @@ func main() {
log.Fatalf("Error decoding namespace: %v", err)
}

centralizedSeq, err := sequencing.NewSequencer(da_address, da_auth_token, namespace, batchTime, db_path)
var metricsServer *http.Server
if metricsEnabled {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
metricsServer = &http.Server{
Addr: metricsAddress,
Handler: mux,
ReadHeaderTimeout: 5 * time.Second,
}
go func() {
log.Printf("Starting metrics server on %v...\n", metricsAddress)
if err := metricsServer.ListenAndServe(); err != http.ErrServerClosed {
log.Fatalf("Failed to serve metrics: %v", err)
}
}()
}

metrics, err := sequencing.DefaultMetricsProvider(metricsEnabled)(da_namespace)
if err != nil {
log.Fatalf("Failed to create metrics: %v", err)
}
centralizedSeq, err := sequencing.NewSequencer(da_address, da_auth_token, namespace, batchTime, metrics, db_path)
tuxcanfly marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Fatalf("Failed to create centralized sequencer: %v", err)
}
Expand All @@ -74,6 +103,11 @@ func main() {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt, syscall.SIGINT)
<-interrupt
if metricsServer != nil {
if err := metricsServer.Shutdown(context.Background()); err != nil {
log.Printf("Error shutting down metrics server: %v", err)
}
}
fmt.Println("\nCtrl+C pressed. Exiting...")
os.Exit(0)
}
99 changes: 99 additions & 0 deletions sequencing/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package sequencing

import (
"errors"

"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/discard"
"github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)

const (
// MetricsSubsystem is a subsystem shared by all metrics exposed by this
// package.
MetricsSubsystem = "sequencer"
)

// MetricsProvider returns sequencing Metrics.
type MetricsProvider func(chainID string) (*Metrics, error)

// DefaultMetricsProvider returns Metrics build using Prometheus client library
// if Prometheus is enabled. Otherwise, it returns no-op Metrics.
func DefaultMetricsProvider(enabled bool) MetricsProvider {
return func(chainID string) (*Metrics, error) {
if enabled {
return PrometheusMetrics("chain_id", chainID)
}
return NopMetrics()
}
}

// Metrics contains metrics exposed by this package.
type Metrics struct {
// GasPrice
GasPrice metrics.Gauge
// Last submitted blob size
LastBlobSize metrics.Gauge
// cost / byte
// CostPerByte metrics.Gauge
// Wallet Balance
// WalletBalance metrics.Gauge
tuxcanfly marked this conversation as resolved.
Show resolved Hide resolved
// Transaction Status
TransactionStatus metrics.Counter
// Number of pending blocks.
NumPendingBlocks metrics.Gauge
// Last included block height
IncludedBlockHeight metrics.Gauge
}

// PrometheusMetrics returns Metrics build using Prometheus client library.
// Optionally, labels can be provided along with their values ("foo",
// "fooValue").
func PrometheusMetrics(labelsAndValues ...string) (*Metrics, error) {
if len(labelsAndValues)%2 != 0 {
return nil, errors.New("uneven number of labels and values; labels and values should be provided in pairs")
}
labels := []string{}
for i := 0; i < len(labelsAndValues); i += 2 {
labels = append(labels, labelsAndValues[i])
}
tuxcanfly marked this conversation as resolved.
Show resolved Hide resolved
return &Metrics{
GasPrice: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: MetricsSubsystem,
Name: "gas_price",
Help: "The gas price of DA.",
}, labels).With(labelsAndValues...),
LastBlobSize: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: MetricsSubsystem,
Name: "last_blob_size",
Help: "The size in bytes of the last DA blob.",
}, labels).With(labelsAndValues...),
TransactionStatus: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Subsystem: MetricsSubsystem,
Name: "transaction_status",
Help: "Count of transaction statuses for DA submissions",
}, labels).With(labelsAndValues...),
NumPendingBlocks: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: MetricsSubsystem,
Name: "num_pending_blocks",
Help: "The number of pending blocks for DA submission.",
}, labels).With(labelsAndValues...),
IncludedBlockHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: MetricsSubsystem,
Name: "included_block_height",
Help: "The last DA included block height.",
}, labels).With(labelsAndValues...),
}, nil
}

// NopMetrics returns no-op Metrics.
func NopMetrics() (*Metrics, error) {
return &Metrics{
GasPrice: discard.NewGauge(),
LastBlobSize: discard.NewGauge(),
TransactionStatus: discard.NewCounter(),
NumPendingBlocks: discard.NewGauge(),
IncludedBlockHeight: discard.NewGauge(),
}, nil
}
16 changes: 15 additions & 1 deletion sequencing/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,12 @@ type Sequencer struct {

db *badger.DB // BadgerDB instance for persistence
dbMux sync.Mutex // Mutex for safe concurrent DB access

metrics *Metrics
tuxcanfly marked this conversation as resolved.
Show resolved Hide resolved
}

// NewSequencer ...
func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime time.Duration, dbPath string) (*Sequencer, error) {
func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime time.Duration, metrics *Metrics, dbPath string) (*Sequencer, error) {
ctx := context.Background()
dac, err := proxyda.NewClient(daAddress, daAuthToken)
if err != nil {
Expand Down Expand Up @@ -318,6 +320,7 @@ func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime t
bq: NewBatchQueue(),
seenBatches: make(map[string]struct{}),
db: db,
metrics: metrics,
}

// Load last batch hash from DB to recover from crash
Expand Down Expand Up @@ -481,6 +484,16 @@ func (c *Sequencer) publishBatch() error {
return nil
}

func (c *Sequencer) recordMetrics(gasPrice float64, blobSize uint64, statusCode da.StatusCode, numPendingBlocks int, includedBlockHeight uint64) {
if c.metrics != nil {
c.metrics.GasPrice.Set(float64(gasPrice))
c.metrics.LastBlobSize.Set(float64(blobSize))
c.metrics.TransactionStatus.With("status", fmt.Sprintf("%d", statusCode)).Add(1)
c.metrics.NumPendingBlocks.Set(float64(numPendingBlocks))
c.metrics.IncludedBlockHeight.Set(float64(includedBlockHeight))
}
}

func (c *Sequencer) submitBatchToDA(batch sequencing.Batch) error {
batchesToSubmit := []*sequencing.Batch{&batch}
submittedAllBlocks := false
Expand Down Expand Up @@ -542,6 +555,7 @@ daSubmitRetryLoop:
backoff = c.exponentialBackoff(backoff)
}

c.recordMetrics(gasPrice, res.BlobSize, res.Code, len(batchesToSubmit), res.DAHeight)
attempt += 1
}

Expand Down
6 changes: 4 additions & 2 deletions sequencing/sequencer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func startMockDAServJSONRPC(ctx context.Context, da_address string) (*proxy.Serv

func TestNewSequencer(t *testing.T) {
// Create a new sequencer with mock DA client
seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), 1*time.Second, "")
metrics, _ := NopMetrics()
seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), 10*time.Second, metrics, "")
require.NoError(t, err)
defer func() {
err := seq.Close()
Expand All @@ -67,7 +68,8 @@ func TestNewSequencer(t *testing.T) {

func TestSequencer_SubmitRollupTransaction(t *testing.T) {
// Initialize a new sequencer
seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("rollup1"), 1*time.Second, "")
metrics, _ := NopMetrics()
seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("rollup1"), 10*time.Second, metrics, "")
require.NoError(t, err)
defer func() {
err := seq.Close()
Expand Down
Loading