Skip to content

Commit

Permalink
Merge branch 'main' into latest-fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
gupadhyaya committed Nov 4, 2024
2 parents f918436 + 5f2a130 commit 1ba073d
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 15 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ Run centralized-sequencer by specifying DA network details:
| `host` | centralized sequencer host | localhost |
| `port` | centralized sequencer port | 50051 |
| `listen-all` |listen on all network interfaces (0.0.0.0) instead of just localhost|disabled|
| `metrics` |enable prometheus metrics|disabled|
| `metrics-address` |address to expose prometheus metrics|`":8080"`|
<!-- markdownlint-enable MD013 -->

See `./build/centralized-sequencer --help` for details.
Expand Down
54 changes: 44 additions & 10 deletions cmd/centralized-sequencer/main.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package main

import (
"context"
"encoding/hex"
"flag"
"fmt"
"log"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/rollkit/centralized-sequencer/sequencing"
sequencingGRPC "github.com/rollkit/go-sequencing/proxy/grpc"
)
Expand All @@ -24,15 +28,17 @@ const (

func main() {
var (
host string
port string
listenAll bool
rollupId string
batchTime time.Duration
da_address string
da_namespace string
da_auth_token string
db_path string
host string
port string
listenAll bool
rollupId string
batchTime time.Duration
da_address string
da_namespace string
da_auth_token string
db_path string
metricsEnabled bool
metricsAddress string
)
flag.StringVar(&host, "host", defaultHost, "centralized sequencer host")
flag.StringVar(&port, "port", defaultPort, "centralized sequencer port")
Expand All @@ -43,6 +49,8 @@ func main() {
flag.StringVar(&da_namespace, "da_namespace", "", "DA namespace where the sequencer submits transactions")
flag.StringVar(&da_auth_token, "da_auth_token", "", "auth token for the DA")
flag.StringVar(&db_path, "db_path", "", "path to the database")
flag.BoolVar(&metricsEnabled, "metrics", false, "Enable Prometheus metrics")
flag.StringVar(&metricsAddress, "metrics-address", ":8080", "Address to expose Prometheus metrics")

flag.Parse()

Expand All @@ -62,7 +70,28 @@ func main() {
log.Fatalf("Error decoding namespace: %v", err)
}

centralizedSeq, err := sequencing.NewSequencer(da_address, da_auth_token, namespace, []byte(rollupId), batchTime, db_path)
var metricsServer *http.Server
if metricsEnabled {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
metricsServer = &http.Server{
Addr: metricsAddress,
Handler: mux,
ReadHeaderTimeout: 5 * time.Second,
}
go func() {
log.Printf("Starting metrics server on %v...\n", metricsAddress)
if err := metricsServer.ListenAndServe(); err != http.ErrServerClosed {
log.Fatalf("Failed to serve metrics: %v", err)
}
}()
}

metrics, err := sequencing.DefaultMetricsProvider(metricsEnabled)(da_namespace)
if err != nil {
log.Fatalf("Failed to create metrics: %v", err)
}
centralizedSeq, err := sequencing.NewSequencer(da_address, da_auth_token, namespace, []byte(rollupId), batchTime, metrics, db_path)
if err != nil {
log.Fatalf("Failed to create centralized sequencer: %v", err)
}
Expand All @@ -76,6 +105,11 @@ func main() {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt, syscall.SIGINT)
<-interrupt
if metricsServer != nil {
if err := metricsServer.Shutdown(context.Background()); err != nil {
log.Printf("Error shutting down metrics server: %v", err)
}
}
fmt.Println("\nCtrl+C pressed. Exiting...")
os.Exit(0)
}
3 changes: 3 additions & 0 deletions da/da.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ type BaseResult struct {
Message string
// DAHeight informs about a height on Data Availability Layer for given result.
DAHeight uint64
// BlobSize is the size of the blob submitted.
BlobSize uint64
// SubmittedCount is the number of successfully submitted blocks.
SubmittedCount uint64
}
Expand Down Expand Up @@ -192,6 +194,7 @@ func (dac *DAClient) SubmitBatch(ctx context.Context, data []*sequencing.Batch,
BaseResult: BaseResult{
Code: StatusSuccess,
DAHeight: binary.LittleEndian.Uint64(ids[0]),
BlobSize: blobSize,
SubmittedCount: uint64(len(ids)),
},
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ toolchain go1.22.3

require (
github.com/dgraph-io/badger/v3 v3.2103.5
github.com/go-kit/kit v0.13.0
github.com/gogo/protobuf v1.3.2
github.com/ipfs/go-log/v2 v2.5.1
github.com/prometheus/client_golang v1.19.1
github.com/rollkit/go-da v0.8.0
github.com/rollkit/go-sequencing v0.3.0
github.com/rollkit/rollkit v0.13.7
Expand All @@ -30,7 +32,6 @@ require (
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/filecoin-project/go-jsonrpc v0.6.0 // indirect
github.com/go-kit/kit v0.13.0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/golang/glog v1.2.2 // indirect
Expand Down Expand Up @@ -68,7 +69,6 @@ require (
github.com/petermattis/goid v0.0.0-20230317030725-371a4b8eda08 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
Expand Down
99 changes: 99 additions & 0 deletions sequencing/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package sequencing

import (
"errors"

"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/discard"
"github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)

const (
// MetricsSubsystem is a subsystem shared by all metrics exposed by this
// package.
MetricsSubsystem = "sequencer"
)

// MetricsProvider returns sequencing Metrics.
type MetricsProvider func(chainID string) (*Metrics, error)

// DefaultMetricsProvider returns Metrics build using Prometheus client library
// if Prometheus is enabled. Otherwise, it returns no-op Metrics.
func DefaultMetricsProvider(enabled bool) MetricsProvider {
return func(chainID string) (*Metrics, error) {
if enabled {
return PrometheusMetrics("chain_id", chainID)
}
return NopMetrics()
}
}

// Metrics contains metrics exposed by this package.
type Metrics struct {
// GasPrice
GasPrice metrics.Gauge
// Last submitted blob size
LastBlobSize metrics.Gauge
// cost / byte
// CostPerByte metrics.Gauge
// Wallet Balance
// WalletBalance metrics.Gauge
// Transaction Status
TransactionStatus metrics.Counter
// Number of pending blocks.
NumPendingBlocks metrics.Gauge
// Last included block height
IncludedBlockHeight metrics.Gauge
}

// PrometheusMetrics returns Metrics build using Prometheus client library.
// Optionally, labels can be provided along with their values ("foo",
// "fooValue").
func PrometheusMetrics(labelsAndValues ...string) (*Metrics, error) {
if len(labelsAndValues)%2 != 0 {
return nil, errors.New("uneven number of labels and values; labels and values should be provided in pairs")
}
labels := []string{}
for i := 0; i < len(labelsAndValues); i += 2 {
labels = append(labels, labelsAndValues[i])
}
return &Metrics{
GasPrice: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: MetricsSubsystem,
Name: "gas_price",
Help: "The gas price of DA.",
}, labels).With(labelsAndValues...),
LastBlobSize: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: MetricsSubsystem,
Name: "last_blob_size",
Help: "The size in bytes of the last DA blob.",
}, labels).With(labelsAndValues...),
TransactionStatus: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Subsystem: MetricsSubsystem,
Name: "transaction_status",
Help: "Count of transaction statuses for DA submissions",
}, labels).With(labelsAndValues...),
NumPendingBlocks: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: MetricsSubsystem,
Name: "num_pending_blocks",
Help: "The number of pending blocks for DA submission.",
}, labels).With(labelsAndValues...),
IncludedBlockHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: MetricsSubsystem,
Name: "included_block_height",
Help: "The last DA included block height.",
}, labels).With(labelsAndValues...),
}, nil
}

// NopMetrics returns no-op Metrics.
func NopMetrics() (*Metrics, error) {
return &Metrics{
GasPrice: discard.NewGauge(),
LastBlobSize: discard.NewGauge(),
TransactionStatus: discard.NewCounter(),
NumPendingBlocks: discard.NewGauge(),
IncludedBlockHeight: discard.NewGauge(),
}, nil
}
16 changes: 15 additions & 1 deletion sequencing/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,12 @@ type Sequencer struct {

db *badger.DB // BadgerDB instance for persistence
dbMux sync.Mutex // Mutex for safe concurrent DB access

metrics *Metrics
}

// NewSequencer ...
func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, rollupId []byte, batchTime time.Duration, dbPath string) (*Sequencer, error) {
func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, rollupId []byte, batchTime time.Duration, metrics *Metrics, dbPath string) (*Sequencer, error) {
ctx := context.Background()
dac, err := proxyda.NewClient(daAddress, daAuthToken)
if err != nil {
Expand Down Expand Up @@ -318,6 +320,7 @@ func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, rollupId []
bq: NewBatchQueue(),
seenBatches: make(map[string]struct{}),
db: db,
metrics: metrics,
}

// Load last batch hash from DB to recover from crash
Expand Down Expand Up @@ -481,6 +484,16 @@ func (c *Sequencer) publishBatch() error {
return nil
}

func (c *Sequencer) recordMetrics(gasPrice float64, blobSize uint64, statusCode da.StatusCode, numPendingBlocks int, includedBlockHeight uint64) {
if c.metrics != nil {
c.metrics.GasPrice.Set(float64(gasPrice))
c.metrics.LastBlobSize.Set(float64(blobSize))
c.metrics.TransactionStatus.With("status", fmt.Sprintf("%d", statusCode)).Add(1)
c.metrics.NumPendingBlocks.Set(float64(numPendingBlocks))
c.metrics.IncludedBlockHeight.Set(float64(includedBlockHeight))
}
}

func (c *Sequencer) submitBatchToDA(batch sequencing.Batch) error {
batchesToSubmit := []*sequencing.Batch{&batch}
submittedAllBlocks := false
Expand Down Expand Up @@ -542,6 +555,7 @@ daSubmitRetryLoop:
backoff = c.exponentialBackoff(backoff)
}

c.recordMetrics(gasPrice, res.BlobSize, res.Code, len(batchesToSubmit), res.DAHeight)
attempt += 1
}

Expand Down
6 changes: 4 additions & 2 deletions sequencing/sequencer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func startMockDAServJSONRPC(ctx context.Context, da_address string) (*proxy.Serv

func TestNewSequencer(t *testing.T) {
// Create a new sequencer with mock DA client
seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), []byte("rollup1"), 1*time.Second, "")
metrics, _ := NopMetrics()
seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), []byte("rollup1"), 10*time.Second, metrics, "")
require.NoError(t, err)
defer func() {
err := seq.Close()
Expand All @@ -67,7 +68,8 @@ func TestNewSequencer(t *testing.T) {

func TestSequencer_SubmitRollupTransaction(t *testing.T) {
// Initialize a new sequencer
seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), []byte("rollup1"), 1*time.Second, "")
metrics, _ := NopMetrics()
seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), []byte("rollup1"), 10*time.Second, metrics, "")
require.NoError(t, err)
defer func() {
err := seq.Close()
Expand Down

0 comments on commit 1ba073d

Please sign in to comment.