diff --git a/README.md b/README.md index 2c10ade..44a5ff3 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,8 @@ Run centralized-sequencer by specifying DA network details: | `host` | centralized sequencer host | localhost | | `port` | centralized sequencer port | 50051 | | `listen-all` |listen on all network interfaces (0.0.0.0) instead of just localhost|disabled| +| `metrics` |enable prometheus metrics|disabled| +| `metrics-address` |address to expose prometheus metrics|`":8080"`| See `./build/centralized-sequencer --help` for details. diff --git a/da/da.go b/da/da.go index abf5ed1..10f2a42 100644 --- a/da/da.go +++ b/da/da.go @@ -80,6 +80,8 @@ type BaseResult struct { Message string // DAHeight informs about a height on Data Availability Layer for given result. DAHeight uint64 + // BlobSize is the size of the blob submitted. + BlobSize uint64 // SubmittedCount is the number of successfully submitted blocks. SubmittedCount uint64 } @@ -192,6 +194,7 @@ func (dac *DAClient) SubmitBatch(ctx context.Context, data []*sequencing.Batch, BaseResult: BaseResult{ Code: StatusSuccess, DAHeight: binary.LittleEndian.Uint64(ids[0]), + BlobSize: blobSize, SubmittedCount: uint64(len(ids)), }, } diff --git a/go.mod b/go.mod index 3e483aa..d4842d8 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,10 @@ toolchain go1.22.3 require ( github.com/dgraph-io/badger/v3 v3.2103.5 + github.com/go-kit/kit v0.13.0 github.com/gogo/protobuf v1.3.2 github.com/ipfs/go-log/v2 v2.5.1 + github.com/prometheus/client_golang v1.19.1 github.com/rollkit/go-da v0.8.0 github.com/rollkit/go-sequencing v0.3.0 github.com/rollkit/rollkit v0.13.7 @@ -30,7 +32,6 @@ require ( github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/filecoin-project/go-jsonrpc v0.6.0 // indirect - github.com/go-kit/kit v0.13.0 // indirect github.com/go-kit/log v0.2.1 // indirect github.com/go-logfmt/logfmt v0.6.0 // indirect github.com/golang/glog v1.2.2 // indirect @@ -68,7 +69,6 @@ require ( github.com/petermattis/goid v0.0.0-20230317030725-371a4b8eda08 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/prometheus/client_golang v1.19.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect diff --git a/main.go b/main.go index 852c730..470c637 100644 --- a/main.go +++ b/main.go @@ -1,16 +1,20 @@ package main import ( + "context" "encoding/hex" "flag" "fmt" "log" "net" + "net/http" "os" "os/signal" "syscall" "time" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/rollkit/centralized-sequencer/sequencing" sequencingGRPC "github.com/rollkit/go-sequencing/proxy/grpc" ) @@ -24,14 +28,16 @@ const ( func main() { var ( - host string - port string - listenAll bool - batchTime time.Duration - da_address string - da_namespace string - da_auth_token string - db_path string + host string + port string + listenAll bool + batchTime time.Duration + da_address string + da_namespace string + da_auth_token string + db_path string + metricsEnabled bool + metricsAddress string ) flag.StringVar(&host, "host", defaultHost, "centralized sequencer host") flag.StringVar(&port, "port", defaultPort, "centralized sequencer port") @@ -41,6 +47,8 @@ func main() { flag.StringVar(&da_namespace, "da_namespace", "", "DA namespace where the sequencer submits transactions") flag.StringVar(&da_auth_token, "da_auth_token", "", "auth token for the DA") flag.StringVar(&db_path, "db_path", "", "path to the database") + flag.BoolVar(&metricsEnabled, "metrics", false, "Enable Prometheus metrics") + flag.StringVar(&metricsAddress, "metrics-address", ":8080", "Address to expose Prometheus metrics") flag.Parse() @@ -60,7 +68,28 @@ func main() { log.Fatalf("Error decoding namespace: %v", err) } - centralizedSeq, err := sequencing.NewSequencer(da_address, da_auth_token, namespace, batchTime, db_path) + var metricsServer *http.Server + if metricsEnabled { + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + metricsServer = &http.Server{ + Addr: metricsAddress, + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + } + go func() { + log.Printf("Starting metrics server on %v...\n", metricsAddress) + if err := metricsServer.ListenAndServe(); err != http.ErrServerClosed { + log.Fatalf("Failed to serve metrics: %v", err) + } + }() + } + + metrics, err := sequencing.DefaultMetricsProvider(metricsEnabled)(da_namespace) + if err != nil { + log.Fatalf("Failed to create metrics: %v", err) + } + centralizedSeq, err := sequencing.NewSequencer(da_address, da_auth_token, namespace, batchTime, metrics, db_path) if err != nil { log.Fatalf("Failed to create centralized sequencer: %v", err) } @@ -74,6 +103,11 @@ func main() { interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt, syscall.SIGINT) <-interrupt + if metricsServer != nil { + if err := metricsServer.Shutdown(context.Background()); err != nil { + log.Printf("Error shutting down metrics server: %v", err) + } + } fmt.Println("\nCtrl+C pressed. Exiting...") os.Exit(0) } diff --git a/sequencing/metrics.go b/sequencing/metrics.go new file mode 100644 index 0000000..e94198b --- /dev/null +++ b/sequencing/metrics.go @@ -0,0 +1,99 @@ +package sequencing + +import ( + "errors" + + "github.com/go-kit/kit/metrics" + "github.com/go-kit/kit/metrics/discard" + "github.com/go-kit/kit/metrics/prometheus" + stdprometheus "github.com/prometheus/client_golang/prometheus" +) + +const ( + // MetricsSubsystem is a subsystem shared by all metrics exposed by this + // package. + MetricsSubsystem = "sequencer" +) + +// MetricsProvider returns sequencing Metrics. +type MetricsProvider func(chainID string) (*Metrics, error) + +// DefaultMetricsProvider returns Metrics build using Prometheus client library +// if Prometheus is enabled. Otherwise, it returns no-op Metrics. +func DefaultMetricsProvider(enabled bool) MetricsProvider { + return func(chainID string) (*Metrics, error) { + if enabled { + return PrometheusMetrics("chain_id", chainID) + } + return NopMetrics() + } +} + +// Metrics contains metrics exposed by this package. +type Metrics struct { + // GasPrice + GasPrice metrics.Gauge + // Last submitted blob size + LastBlobSize metrics.Gauge + // cost / byte + // CostPerByte metrics.Gauge + // Wallet Balance + // WalletBalance metrics.Gauge + // Transaction Status + TransactionStatus metrics.Counter + // Number of pending blocks. + NumPendingBlocks metrics.Gauge + // Last included block height + IncludedBlockHeight metrics.Gauge +} + +// PrometheusMetrics returns Metrics build using Prometheus client library. +// Optionally, labels can be provided along with their values ("foo", +// "fooValue"). +func PrometheusMetrics(labelsAndValues ...string) (*Metrics, error) { + if len(labelsAndValues)%2 != 0 { + return nil, errors.New("uneven number of labels and values; labels and values should be provided in pairs") + } + labels := []string{} + for i := 0; i < len(labelsAndValues); i += 2 { + labels = append(labels, labelsAndValues[i]) + } + return &Metrics{ + GasPrice: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: MetricsSubsystem, + Name: "gas_price", + Help: "The gas price of DA.", + }, labels).With(labelsAndValues...), + LastBlobSize: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: MetricsSubsystem, + Name: "last_blob_size", + Help: "The size in bytes of the last DA blob.", + }, labels).With(labelsAndValues...), + TransactionStatus: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Subsystem: MetricsSubsystem, + Name: "transaction_status", + Help: "Count of transaction statuses for DA submissions", + }, labels).With(labelsAndValues...), + NumPendingBlocks: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: MetricsSubsystem, + Name: "num_pending_blocks", + Help: "The number of pending blocks for DA submission.", + }, labels).With(labelsAndValues...), + IncludedBlockHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: MetricsSubsystem, + Name: "included_block_height", + Help: "The last DA included block height.", + }, labels).With(labelsAndValues...), + }, nil +} + +// NopMetrics returns no-op Metrics. +func NopMetrics() (*Metrics, error) { + return &Metrics{ + GasPrice: discard.NewGauge(), + LastBlobSize: discard.NewGauge(), + TransactionStatus: discard.NewCounter(), + NumPendingBlocks: discard.NewGauge(), + IncludedBlockHeight: discard.NewGauge(), + }, nil +} diff --git a/sequencing/sequencer.go b/sequencing/sequencer.go index 8325a0d..5796e6b 100644 --- a/sequencing/sequencer.go +++ b/sequencing/sequencer.go @@ -281,10 +281,12 @@ type Sequencer struct { db *badger.DB // BadgerDB instance for persistence dbMux sync.Mutex // Mutex for safe concurrent DB access + + metrics *Metrics } // NewSequencer ... -func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime time.Duration, dbPath string) (*Sequencer, error) { +func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime time.Duration, metrics *Metrics, dbPath string) (*Sequencer, error) { ctx := context.Background() dac, err := proxyda.NewClient(daAddress, daAuthToken) if err != nil { @@ -318,6 +320,7 @@ func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime t bq: NewBatchQueue(), seenBatches: make(map[string]struct{}), db: db, + metrics: metrics, } // Load last batch hash from DB to recover from crash @@ -481,6 +484,16 @@ func (c *Sequencer) publishBatch() error { return nil } +func (c *Sequencer) recordMetrics(gasPrice float64, blobSize uint64, statusCode da.StatusCode, numPendingBlocks int, includedBlockHeight uint64) { + if c.metrics != nil { + c.metrics.GasPrice.Set(float64(gasPrice)) + c.metrics.LastBlobSize.Set(float64(blobSize)) + c.metrics.TransactionStatus.With("status", fmt.Sprintf("%d", statusCode)).Add(1) + c.metrics.NumPendingBlocks.Set(float64(numPendingBlocks)) + c.metrics.IncludedBlockHeight.Set(float64(includedBlockHeight)) + } +} + func (c *Sequencer) submitBatchToDA(batch sequencing.Batch) error { batchesToSubmit := []*sequencing.Batch{&batch} submittedAllBlocks := false @@ -542,6 +555,7 @@ daSubmitRetryLoop: backoff = c.exponentialBackoff(backoff) } + c.recordMetrics(gasPrice, res.BlobSize, res.Code, len(batchesToSubmit), res.DAHeight) attempt += 1 } diff --git a/sequencing/sequencer_test.go b/sequencing/sequencer_test.go index 4581c58..ee42c57 100644 --- a/sequencing/sequencer_test.go +++ b/sequencing/sequencer_test.go @@ -51,7 +51,8 @@ func startMockDAServJSONRPC(ctx context.Context, da_address string) (*proxy.Serv func TestNewSequencer(t *testing.T) { // Create a new sequencer with mock DA client - seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), 1*time.Second, "") + metrics, _ := NopMetrics() + seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), 10*time.Second, metrics, "") require.NoError(t, err) defer func() { err := seq.Close() @@ -67,7 +68,8 @@ func TestNewSequencer(t *testing.T) { func TestSequencer_SubmitRollupTransaction(t *testing.T) { // Initialize a new sequencer - seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("rollup1"), 1*time.Second, "") + metrics, _ := NopMetrics() + seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("rollup1"), 10*time.Second, metrics, "") require.NoError(t, err) defer func() { err := seq.Close()