Skip to content

Commit

Permalink
feat(da): add metrics handler
Browse files Browse the repository at this point in the history
  • Loading branch information
tuxcanfly committed Oct 1, 2024
1 parent b24b6cc commit 97654c1
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 9 deletions.
33 changes: 25 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"fmt"
"log"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rollkit/centralized-sequencer/sequencing"
sequencingGRPC "github.com/rollkit/go-sequencing/proxy/grpc"
)
Expand All @@ -24,13 +26,15 @@ const (

func main() {
var (
host string
port string
listenAll bool
batchTime time.Duration
da_address string
da_namespace string
da_auth_token string
host string
port string
listenAll bool
batchTime time.Duration
da_address string
da_namespace string
da_auth_token string
metricsEnabled bool
metricsAddress string
)
flag.StringVar(&host, "host", defaultHost, "centralized sequencer host")
flag.StringVar(&port, "port", defaultPort, "centralized sequencer port")
Expand All @@ -39,6 +43,8 @@ func main() {
flag.StringVar(&da_address, "da_address", defaultDA, "DA address")
flag.StringVar(&da_namespace, "da_namespace", "", "DA namespace where the sequencer submits transactions")
flag.StringVar(&da_auth_token, "da_auth_token", "", "auth token for the DA")
flag.BoolVar(&metricsEnabled, "metrics", false, "Enable Prometheus metrics")
flag.StringVar(&metricsAddress, "metrics-address", ":8080", "Address to expose Prometheus metrics")

flag.Parse()

Expand All @@ -58,7 +64,18 @@ func main() {
log.Fatalf("Error decoding namespace: %v", err)
}

centralizedSeq, err := sequencing.NewSequencer(da_address, da_auth_token, namespace, batchTime)
if metricsEnabled {
go func() {
err := http.ListenAndServe(metricsAddress, nil)
if err != nil {
log.Fatalf("Failed to serve metrics: %v", err)
}
http.Handle("/metrics", promhttp.Handler())
}()
}

metrics := sequencing.DefaultMetricsProvider(metricsEnabled, hex.EncodeToString([]byte(da_namespace)))
centralizedSeq, err := sequencing.NewSequencer(da_address, da_auth_token, namespace, batchTime, metrics)
if err != nil {
log.Fatalf("Failed to create centralized sequencer: %v", err)
}
Expand Down
14 changes: 14 additions & 0 deletions sequencing/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,20 @@ const (
MetricsSubsystem = "sequencer"
)

// MetricsProvider returns sequencing Metrics.
type MetricsProvider func(chainID string) *Metrics

// DefaultMetricsProvider returns Metrics build using Prometheus client library
// if Prometheus is enabled. Otherwise, it returns no-op Metrics.
func DefaultMetricsProvider(enabled bool, namespace string) MetricsProvider {
return func(chainID string) *Metrics {
if enabled {
return PrometheusMetrics(namespace, "chain_id", chainID)
}
return NopMetrics()
}
}

// Metrics contains metrics exposed by this package.
type Metrics struct {
// GasPrice
Expand Down
7 changes: 6 additions & 1 deletion sequencing/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"sync"
Expand Down Expand Up @@ -124,11 +125,12 @@ type Sequencer struct {
seenBatches map[string]struct{}
bq *BatchQueue

metricsProvider MetricsProvider
metrics *Metrics
}

// NewSequencer ...
func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime time.Duration) (*Sequencer, error) {
func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime time.Duration, metricsProvider MetricsProvider) (*Sequencer, error) {
ctx := context.Background()
dac, err := proxyda.NewClient(daAddress, daAuthToken)
if err != nil {
Expand All @@ -147,6 +149,8 @@ func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime t
tq: NewTransactionQueue(),
bq: NewBatchQueue(),
seenBatches: make(map[string]struct{}),
metricsProvider: metricsProvider,
metrics: NopMetrics(), // Initialized from metricsProvider in SubmitRollupTransaction
}
go s.batchSubmissionLoop(s.ctx)
return s, nil
Expand Down Expand Up @@ -297,6 +301,7 @@ func hashSHA256(data []byte) []byte {
func (c *Sequencer) SubmitRollupTransaction(ctx context.Context, rollupId []byte, tx []byte) error {
if c.rollupId == nil {
c.rollupId = rollupId
c.metrics = c.metricsProvider(hex.EncodeToString(rollupId))
} else {
if !bytes.Equal(c.rollupId, rollupId) {
return ErrorRollupIdMismatch
Expand Down

0 comments on commit 97654c1

Please sign in to comment.