From 36edd4bbc5a2be35d9af739001e77c55b497d588 Mon Sep 17 00:00:00 2001 From: Eugene Zagidullin Date: Thu, 25 Jul 2024 20:45:09 +0300 Subject: [PATCH] refactored --- bootstrap.go | 105 --------------------- client.go | 16 ---- config.go | 20 ++-- config.yaml | 4 +- main.go | 80 ++++++++++------ mempool.go | 119 ++++++++++++++++++++++++ monitor.go | 106 ++++++++++++++------- monitor_test.go | 242 ------------------------------------------------ poller.go | 226 ++++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 477 insertions(+), 441 deletions(-) delete mode 100644 bootstrap.go delete mode 100644 client.go create mode 100644 mempool.go delete mode 100644 monitor_test.go create mode 100644 poller.go diff --git a/bootstrap.go b/bootstrap.go deleted file mode 100644 index 14bb75a..0000000 --- a/bootstrap.go +++ /dev/null @@ -1,105 +0,0 @@ -package main - -import ( - "context" - "errors" - "sync" - "time" - - tz "github.com/ecadlabs/gotez/v2" - "github.com/ecadlabs/gotez/v2/client" - "github.com/prometheus/client_golang/prometheus" - log "github.com/sirupsen/logrus" -) - -type BootstrapPollerConfig struct { - Client Client - ChainID *tz.ChainID - Timeout time.Duration - Interval time.Duration - Reg prometheus.Registerer -} - -type BootstrapPoller struct { - cfg BootstrapPollerConfig - - mtx sync.RWMutex - status client.BootstrappedResponse - - cancel context.CancelFunc - done chan struct{} - metric prometheus.Gauge -} - -func (c *BootstrapPollerConfig) New() *BootstrapPoller { - b := &BootstrapPoller{ - cfg: *c, - metric: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "tezos", - Subsystem: "node", - Name: "bootstrapped", - Help: "Returns 1 if the node has synchronized its chain with a few peers.", - }), - } - if c.Reg != nil { - c.Reg.MustRegister(b.metric) - } - return b -} - -func (b *BootstrapPoller) Start() { - ctx, cancel := context.WithCancel(context.Background()) - b.cancel = cancel - b.done = make(chan struct{}) - go b.loop(ctx) -} - -func (b *BootstrapPoller) Stop(ctx context.Context) error { - b.cancel() - select { - case <-b.done: - return nil - case <-ctx.Done(): - return ctx.Err() - } -} - -func (b *BootstrapPoller) Status() client.BootstrappedResponse { - b.mtx.RLock() - defer b.mtx.RUnlock() - return b.status -} - -func (b *BootstrapPoller) loop(ctx context.Context) { - t := time.NewTicker(b.cfg.Interval) - defer func() { - t.Stop() - close(b.done) - }() - for { - c, cancel := context.WithTimeout(ctx, b.cfg.Timeout) - resp, err := b.cfg.Client.IsBootstrapped(c, b.cfg.ChainID) - cancel() - if err != nil { - if errors.Is(err, context.Canceled) { - return - } - log.WithField("chain_id", b.cfg.ChainID).Warn(err) - } else { - b.mtx.Lock() - b.status = *resp - b.mtx.Unlock() - v := 0.0 - if resp.SyncState == client.SyncStateSynced && resp.Bootstrapped { - v = 1 - } - b.metric.Set(v) - } - - select { - case <-t.C: - case <-ctx.Done(): - return - } - } -} diff --git a/client.go b/client.go deleted file mode 100644 index 7affec5..0000000 --- a/client.go +++ /dev/null @@ -1,16 +0,0 @@ -package main - -import ( - "context" - - "github.com/ecadlabs/gotez/v2/client" -) - -type Client interface { - Heads(ctx context.Context, r *client.HeadsRequest) (<-chan *client.Head, <-chan error, error) - Constants(ctx context.Context, r *client.ContextRequest) (client.Constants, error) - BlockShellHeader(ctx context.Context, r *client.SimpleRequest) (*client.BlockShellHeader, error) - BlockProtocols(ctx context.Context, r *client.SimpleRequest) (*client.BlockProtocols, error) - BasicBlockInfo(ctx context.Context, chain string, block string) (*client.BasicBlockInfo, error) - IsBootstrapped(ctx context.Context, r *client.ChainID) (*client.BootstrappedResponse, error) -} diff --git a/config.go b/config.go index a18bc68..52000ef 100644 --- a/config.go +++ b/config.go @@ -7,14 +7,14 @@ import ( ) type Config struct { - Listen string `yaml:"listen"` - URL string `yaml:"url"` - ChainID *tz.ChainID `yaml:"chain_id"` - Timeout time.Duration `yaml:"timeout"` - Tolerance time.Duration `yaml:"tolerance"` - ReconnectDelay time.Duration `yaml:"reconnect_delay"` - UseTimestamps bool `yaml:"use_timestamps"` - BootstrappedPollInterval time.Duration `yaml:"bootstrapped_poll_interval"` - HealthUseBootstrapped bool `yaml:"health_use_bootstrapped"` - HealthUseBlockDelay bool `yaml:"health_use_block_delay"` + Listen string `yaml:"listen"` + URL string `yaml:"url"` + ChainID *tz.ChainID `yaml:"chain_id"` + Timeout time.Duration `yaml:"timeout"` + Tolerance time.Duration `yaml:"tolerance"` + ReconnectDelay time.Duration `yaml:"reconnect_delay"` + UseTimestamps bool `yaml:"use_timestamps"` + PollInterval time.Duration `yaml:"poll_interval"` + HealthUseBootstrapped bool `yaml:"health_use_bootstrapped"` + HealthUseBlockDelay bool `yaml:"health_use_block_delay"` } diff --git a/config.yaml b/config.yaml index cf7fc07..df47cb5 100644 --- a/config.yaml +++ b/config.yaml @@ -1,3 +1,3 @@ listen: :8080 -url: http://10.100.20.44 -chain_id: NetXXWAHLEvre9b +url: http://parisnet.i.ecadinfra.com:8732 +chain_id: NetXR64bNAYkP4S diff --git a/main.go b/main.go index 2d1721a..5f80fce 100644 --- a/main.go +++ b/main.go @@ -10,7 +10,9 @@ import ( "flag" - "github.com/ecadlabs/gotez/v2/client" + "github.com/ecadlabs/gotez/v2" + client "github.com/ecadlabs/gotez/v2/clientv2" + "github.com/ecadlabs/gotez/v2/clientv2/utils" "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -20,11 +22,11 @@ import ( ) const ( - defaultListen = ":8080" - defaultTimeout = 30 * time.Second - defaultTolerance = 1 * time.Second - defaultReconnectDelay = 10 * time.Second - defaultBootstrappedPollInterval = 10 * time.Second + defaultListen = ":8080" + defaultTimeout = 30 * time.Second + defaultTolerance = 1 * time.Second + defaultReconnectDelay = 10 * time.Second + defaultPollInterval = 15 * time.Second ) type debugLogger log.Logger @@ -45,13 +47,13 @@ func main() { log.SetLevel(ll) conf := Config{ - Listen: defaultListen, - Timeout: defaultTimeout, - Tolerance: defaultTolerance, - ReconnectDelay: defaultReconnectDelay, - HealthUseBlockDelay: true, - HealthUseBootstrapped: true, - BootstrappedPollInterval: defaultBootstrappedPollInterval, + Listen: defaultListen, + Timeout: defaultTimeout, + Tolerance: defaultTolerance, + ReconnectDelay: defaultReconnectDelay, + HealthUseBlockDelay: true, + HealthUseBootstrapped: true, + PollInterval: defaultPollInterval, } buf, err := os.ReadFile(*confPath) @@ -71,7 +73,7 @@ func main() { reg := prometheus.NewRegistry() - mon := (&HeadMonitorConfig{ + hmon, err := (&HeadMonitorConfig{ Client: &cl, ChainID: conf.ChainID, Timeout: conf.Timeout, @@ -79,31 +81,49 @@ func main() { ReconnectDelay: conf.ReconnectDelay, UseTimestamps: conf.UseTimestamps, Reg: reg, + }).New(context.Background()) + if err != nil { + log.Fatal(err) + } + + nextProto := func() *gotez.ProtocolHash { _, p := hmon.Protocols(); return p } + + mmon := (&MempoolMonitorConfig{ + Client: &cl, + ChainID: conf.ChainID, + Timeout: conf.Timeout, + ReconnectDelay: conf.ReconnectDelay, + Reg: reg, + NextProtocolFunc: nextProto, }).New() - bs := (&BootstrapPollerConfig{ - Client: &cl, - ChainID: conf.ChainID, - Timeout: conf.Timeout, - Interval: conf.BootstrappedPollInterval, - Reg: reg, + poller := (&PollerConfig{ + Client: &cl, + ChainID: conf.ChainID, + Timeout: conf.Timeout, + Interval: conf.PollInterval, + Reg: reg, + NextProtocolFunc: nextProto, }).New() - mon.Start() - defer mon.Stop(context.Background()) + hmon.Start() + defer hmon.Stop(context.Background()) + + poller.Start() + defer poller.Stop(context.Background()) - bs.Start() - defer bs.Stop(context.Background()) + mmon.Start() + defer mmon.Stop(context.Background()) r := mux.NewRouter() r.Methods("GET").Path("/health").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { status := true if conf.HealthUseBootstrapped { - s := bs.Status() - status = status && s.Bootstrapped && s.SyncState == client.SyncStateSynced + s := poller.Status() + status = status && s.Bootstrapped && s.SyncState == utils.SyncStateSynced } if conf.HealthUseBlockDelay { - status = status && mon.Status() + status = status && hmon.Status() } var code int @@ -117,9 +137,9 @@ func main() { json.NewEncoder(w).Encode(status) }) r.Methods("GET").Path("/sync_status").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - status := bs.Status() + status := poller.Status() var code int - if status.Bootstrapped && status.SyncState == client.SyncStateSynced { + if status.Bootstrapped && status.SyncState == utils.SyncStateSynced { code = http.StatusOK } else { code = http.StatusInternalServerError @@ -129,7 +149,7 @@ func main() { json.NewEncoder(w).Encode(status) }) r.Methods("GET").Path("/block_delay").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - status := mon.Status() + status := hmon.Status() var code int if status { code = http.StatusOK diff --git a/mempool.go b/mempool.go new file mode 100644 index 0000000..bc20871 --- /dev/null +++ b/mempool.go @@ -0,0 +1,119 @@ +package main + +import ( + "context" + "encoding/json" + "errors" + "time" + + tz "github.com/ecadlabs/gotez/v2" + client "github.com/ecadlabs/gotez/v2/clientv2" + "github.com/ecadlabs/gotez/v2/clientv2/mempool" + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" +) + +type MempoolMonitorConfig struct { + Client *client.Client + ChainID *tz.ChainID + Timeout time.Duration + ReconnectDelay time.Duration + Reg prometheus.Registerer + NextProtocolFunc func() *tz.ProtocolHash +} + +func (c *MempoolMonitorConfig) New() *MempoolMonitor { + m := &MempoolMonitor{ + cfg: *c, + metric: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "tezos", + Subsystem: "node", + Name: "mempool_operations_total", + Help: "The total number of mempool operations. Resets on reconnection.", + }, []string{"kind", "proto"}), + } + if c.Reg != nil { + c.Reg.MustRegister(m.metric) + } + return m +} + +type MempoolMonitor struct { + cfg MempoolMonitorConfig + cancel context.CancelFunc + done chan struct{} + metric *prometheus.CounterVec +} + +func (h *MempoolMonitor) Start() { + ctx, cancel := context.WithCancel(context.Background()) + h.cancel = cancel + h.done = make(chan struct{}) + go h.serve(ctx) +} + +func (h *MempoolMonitor) Stop(ctx context.Context) error { + h.cancel() + select { + case <-h.done: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (h *MempoolMonitor) serve(ctx context.Context) { + defer close(h.done) + var err error + for { + if err != nil { + log.Error(err) + t := time.After(h.cfg.ReconnectDelay) + select { + case <-t: + case <-ctx.Done(): + return + } + } + + h.metric.Reset() + + var ( + stream <-chan *mempool.MonitorResponse + errCh <-chan error + ) + stream, errCh, err = mempool.Monitor(ctx, h.cfg.Client, h.cfg.ChainID) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + continue + } + + Recv: + for { + select { + case err = <-errCh: + if errors.Is(err, context.Canceled) { + return + } + break Recv + + case resp := <-stream: + counter := h.metric.MustCurryWith(prometheus.Labels{"proto": h.cfg.NextProtocolFunc().String()}) + if log.GetLevel() >= log.DebugLevel { + buf, _ := json.MarshalIndent(resp.Contents, "", " ") + log.Debug(string(buf)) + } + + for _, list := range resp.Contents { + for _, grp := range list.Contents { + for _, op := range grp.Operations() { + counter.With(prometheus.Labels{"kind": op.OperationKind()}).Inc() + } + } + } + } + } + } +} diff --git a/monitor.go b/monitor.go index 684b64d..4097285 100644 --- a/monitor.go +++ b/monitor.go @@ -3,18 +3,20 @@ package main import ( "context" "errors" - "sync/atomic" + "sync" "time" tz "github.com/ecadlabs/gotez/v2" - "github.com/ecadlabs/gotez/v2/client" + client "github.com/ecadlabs/gotez/v2/clientv2" + "github.com/ecadlabs/gotez/v2/clientv2/block" + "github.com/ecadlabs/gotez/v2/clientv2/monitor" "github.com/ecadlabs/gotez/v2/protocol/core" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" ) type HeadMonitorConfig struct { - Client Client + Client *client.Client ChainID *tz.ChainID Timeout time.Duration Tolerance time.Duration @@ -23,7 +25,7 @@ type HeadMonitorConfig struct { Reg prometheus.Registerer } -func (c *HeadMonitorConfig) New() *HeadMonitor { +func (c *HeadMonitorConfig) New(ctx context.Context) (*HeadMonitor, error) { m := &HeadMonitor{ cfg: *c, metric: prometheus.NewGauge(prometheus.GaugeOpts{ @@ -36,54 +38,73 @@ func (c *HeadMonitorConfig) New() *HeadMonitor { if c.Reg != nil { c.Reg.MustRegister(m.metric) } - return m + + bi, err := m.getBlockInfo(ctx, "head") + if err != nil { + return nil, err + } + m.protocol = bi.Protocol + m.nextProtocol = bi.NextProtocol + + return m, nil } type HeadMonitor struct { - cfg HeadMonitorConfig - status atomic.Bool - cancel context.CancelFunc - done chan struct{} - metric prometheus.Gauge + cfg HeadMonitorConfig + mtx sync.RWMutex + status bool + protocol *tz.ProtocolHash + nextProtocol *tz.ProtocolHash + cancel context.CancelFunc + done chan struct{} + metric prometheus.Gauge } func (h *HeadMonitor) Status() bool { - return h.status.Load() + h.mtx.RLock() + defer h.mtx.RUnlock() + return h.status +} + +func (h *HeadMonitor) Protocols() (proto, next *tz.ProtocolHash) { + h.mtx.RLock() + defer h.mtx.RUnlock() + return h.protocol, h.nextProtocol } func (h *HeadMonitor) context(ctx context.Context) (context.Context, context.CancelFunc) { return context.WithTimeout(ctx, h.cfg.Timeout) } -func (h *HeadMonitor) getMinBlockDelay(c context.Context, block string, protocol *tz.ProtocolHash) (time.Duration, error) { +func (h *HeadMonitor) getMinBlockDelay(c context.Context, b string, protocol *tz.ProtocolHash) (time.Duration, error) { ctx, cancel := h.context(c) defer cancel() - consts, err := h.cfg.Client.Constants(ctx, &client.ContextRequest{ + consts, err := block.Constants(ctx, h.cfg.Client, &block.ContextRequest{ Chain: h.cfg.ChainID.String(), - Block: block, + Block: b, Protocol: protocol, }) if err != nil { return 0, err } delay := time.Duration(consts.GetMinimalBlockDelay()) * time.Second - log.Debugf("%s delay = %v", block, delay) + log.Debugf("%s delay = %v", b, delay) return delay, nil } -func (h *HeadMonitor) getShellHeader(c context.Context, block *tz.BlockHash) (*core.ShellHeader, error) { +func (h *HeadMonitor) getShellHeader(c context.Context, b *tz.BlockHash) (*core.ShellHeader, error) { ctx, cancel := h.context(c) defer cancel() - return h.cfg.Client.BlockShellHeader(ctx, &client.SimpleRequest{ + return block.ShellHeader(ctx, h.cfg.Client, &block.SimpleRequest{ Chain: h.cfg.ChainID.String(), - Block: block.String(), + Block: b.String(), }) } -func (h *HeadMonitor) getBlockInfo(c context.Context, block string) (*client.BasicBlockInfo, error) { +func (h *HeadMonitor) getBlockInfo(c context.Context, b string) (*block.BasicBlockInfo, error) { ctx, cancel := h.context(c) defer cancel() - return h.cfg.Client.BasicBlockInfo(ctx, h.cfg.ChainID.String(), block) + return block.BasicInfo(ctx, h.cfg.Client, h.cfg.ChainID.String(), b) } func (h *HeadMonitor) Start() { @@ -107,7 +128,9 @@ func (h *HeadMonitor) serve(ctx context.Context) { defer close(h.done) var err error for { - h.status.Store(false) + h.mtx.Lock() + h.status = false + h.mtx.Unlock() h.metric.Set(0) if err != nil { log.Error(err) @@ -119,7 +142,7 @@ func (h *HeadMonitor) serve(ctx context.Context) { } } - var bi *client.BasicBlockInfo + var bi *block.BasicBlockInfo bi, err = h.getBlockInfo(ctx, "head") if err != nil { if errors.Is(err, context.Canceled) { @@ -127,6 +150,10 @@ func (h *HeadMonitor) serve(ctx context.Context) { } continue } + h.mtx.Lock() + h.protocol = bi.Protocol + h.nextProtocol = bi.NextProtocol + h.mtx.Unlock() var sh *core.ShellHeader sh, err = h.getShellHeader(ctx, bi.Hash) if err != nil { @@ -152,10 +179,10 @@ func (h *HeadMonitor) serve(ctx context.Context) { continue } var ( - stream <-chan *client.Head + stream <-chan *monitor.Head errCh <-chan error ) - stream, errCh, err = h.cfg.Client.Heads(ctx, &client.HeadsRequest{Chain: h.cfg.ChainID.String()}) + stream, errCh, err = monitor.Heads(ctx, h.cfg.Client, &monitor.HeadsRequest{Chain: h.cfg.ChainID.String()}) if err != nil { if errors.Is(err, context.Canceled) { return @@ -181,20 +208,9 @@ func (h *HeadMonitor) serve(ctx context.Context) { } status := t.Before(timestamp.Add(minBlockDelay + h.cfg.Tolerance)) log.Debugf("%v: %t", t, status) - h.status.Store(status) - v := 0.0 - if status { - v = 1 - } - h.metric.Set(v) - timestamp = t - if head.Proto == protoNum { - break - } - // update constant var proto *core.BlockProtocols - proto, err = h.cfg.Client.BlockProtocols(ctx, &client.SimpleRequest{ + proto, err = block.Protocols(ctx, h.cfg.Client, &block.SimpleRequest{ Chain: h.cfg.ChainID.String(), Block: head.Hash.String(), }) @@ -204,6 +220,24 @@ func (h *HeadMonitor) serve(ctx context.Context) { } break Recv } + + h.mtx.Lock() + h.status = status + h.protocol = proto.Protocol + h.nextProtocol = proto.NextProtocol + h.mtx.Unlock() + + v := 0.0 + if status { + v = 1 + } + h.metric.Set(v) + timestamp = t + if head.Proto == protoNum { + break + } + + // update constant log.WithFields(log.Fields{"block": head.Hash, "proto": proto.Protocol}).Info("protocol upgrade") minBlockDelay, err = h.getMinBlockDelay(ctx, head.Hash.String(), proto.Protocol) if err != nil { diff --git a/monitor_test.go b/monitor_test.go deleted file mode 100644 index 5eeb8bb..0000000 --- a/monitor_test.go +++ /dev/null @@ -1,242 +0,0 @@ -package main - -import ( - "context" - "errors" - "runtime" - "sync" - "sync/atomic" - "testing" - - "github.com/ecadlabs/gotez/v2" - "github.com/ecadlabs/gotez/v2/client" - "github.com/ecadlabs/gotez/v2/protocol/core" - "github.com/ecadlabs/gotez/v2/protocol/proto_019_PtParisB" - "github.com/stretchr/testify/require" -) - -type Call struct { - Func string - Args []any - Ret []any -} - -type blockContext struct { - head client.Head - proto gotez.ProtocolHash - constants proto_019_PtParisB.Constants -} - -type clientMockup struct { - mtx sync.Mutex - calls []*Call - head int64 - contexts []*blockContext - done chan struct{} -} - -func (m *clientMockup) call(args, ret []any) { - pc, _, _, ok := runtime.Caller(1) - if !ok { - return - } - m.callFunc(runtime.FuncForPC(pc).Name(), args, ret) -} - -func (m *clientMockup) callFunc(fn string, args, ret []any) { - c := &Call{ - Args: args, - Ret: ret, - Func: fn, - } - m.mtx.Lock() - defer m.mtx.Unlock() - m.calls = append(m.calls, c) -} - -func (c *clientMockup) ctx(block string) (*blockContext, error) { - if block == "head" { - return c.contexts[atomic.LoadInt64(&c.head)], nil - } - for _, bc := range c.contexts { - if bc.head.Hash.String() == block { - return bc, nil - } - } - return nil, errors.New("block not found") -} - -func (c *clientMockup) Constants(ctx context.Context, r *client.ContextRequest) (client.Constants, error) { - bc, err := c.ctx(r.Block) - if err != nil { - c.call([]any{r}, []any{client.Constants(nil), err}) - return nil, err - } - c.call([]any{r}, []any{&bc.constants, error(nil)}) - return &bc.constants, nil -} - -func (c *clientMockup) BlockShellHeader(ctx context.Context, r *client.SimpleRequest) (*client.BlockShellHeader, error) { - bc, err := c.ctx(r.Block) - if err != nil { - c.call([]any{r}, []any{(*client.BlockShellHeader)(nil), err}) - return nil, err - } - c.call([]any{r}, []any{&bc.head.ShellHeader, error(nil)}) - return &bc.head.ShellHeader, nil -} - -func (c *clientMockup) BlockProtocols(ctx context.Context, r *client.SimpleRequest) (*client.BlockProtocols, error) { - bc, err := c.ctx(r.Block) - if err != nil { - c.call([]any{r}, []any{(*client.BlockProtocols)(nil), err}) - return nil, err - } - proto := &client.BlockProtocols{Protocol: &bc.proto} - c.call([]any{r}, []any{proto, error(nil)}) - return proto, nil -} - -func (c *clientMockup) BasicBlockInfo(ctx context.Context, chain string, block string) (*client.BasicBlockInfo, error) { - bc, err := c.ctx(block) - if err != nil { - c.call([]any{chain, block}, []any{(*client.BasicBlockInfo)(nil), err}) - return nil, err - } - bi := &client.BasicBlockInfo{ - Hash: bc.head.Hash, - Protocol: &bc.proto, - } - c.call([]any{chain, block}, []any{bi, error(nil)}) - return bi, nil -} - -func (c *clientMockup) IsBootstrapped(ctx context.Context, r *client.ChainID) (*client.BootstrappedResponse, error) { - status := &client.BootstrappedResponse{Bootstrapped: true, SyncState: client.SyncStateSynced} - c.call([]any{r}, []any{status, error(nil)}) - return status, nil -} - -func (c *clientMockup) Heads(ctx context.Context, r *client.HeadsRequest) (<-chan *client.Head, <-chan error, error) { - heads := make(chan *client.Head) - err := make(chan error) - go func() { - for { - head := atomic.AddInt64(&c.head, 1) - if int(head) == len(c.contexts) { - break - } - bc := c.contexts[head] - select { - case heads <- &bc.head: - c.callFunc("", nil, []any{&bc.head}) - case <-ctx.Done(): - err <- ctx.Err() - return - } - } - close(c.done) - // just wait for a context to close - <-ctx.Done() - err <- ctx.Err() - }() - c.call([]any{r}, []any{error(nil)}) - return heads, err, nil -} - -func TestProtocolUpgrade1(t *testing.T) { - cl := clientMockup{ - done: make(chan struct{}), - contexts: []*blockContext{ - { - head: client.Head{ - Hash: &gotez.BlockHash{1}, - ShellHeader: core.ShellHeader{ - Proto: 0, - Timestamp: 0, - }, - }, - proto: gotez.ProtocolHash{1}, - constants: proto_019_PtParisB.Constants{MinimalBlockDelay: 1}, - }, - { - head: client.Head{ - Hash: &gotez.BlockHash{2}, - ShellHeader: core.ShellHeader{ - Proto: 1, - Timestamp: 1, - }, - }, - proto: gotez.ProtocolHash{2}, - constants: proto_019_PtParisB.Constants{MinimalBlockDelay: 1}, - }, - }, - } - - expect := []*Call{ - { - Func: "github.com/ecadlabs/octez-ecad-sc.(*clientMockup).BasicBlockInfo", - Args: []any{"NetXH12Aer3be93", "head"}, - Ret: []any{&client.BasicBlockInfo{ - Hash: cl.contexts[0].head.Hash, - Protocol: &cl.contexts[0].proto, - }, error(nil)}, - }, - { - Func: "github.com/ecadlabs/octez-ecad-sc.(*clientMockup).BlockShellHeader", - Args: []any{&client.SimpleRequest{ - Chain: "NetXH12Aer3be93", - Block: "BKiisx71SeX91a4DF6vd4ykBkDTdSVpkH44SvxUc9U8ytodDvfn", - }}, - Ret: []any{&cl.contexts[0].head.ShellHeader, error(nil)}, - }, - { - Func: "github.com/ecadlabs/octez-ecad-sc.(*clientMockup).Constants", - Args: []any{&client.ContextRequest{ - Chain: "NetXH12Aer3be93", - Block: "BKiisx71SeX91a4DF6vd4ykBkDTdSVpkH44SvxUc9U8ytodDvfn", - Protocol: &cl.contexts[0].proto, - }}, - Ret: []any{&cl.contexts[0].constants, error(nil)}, - }, - { - Func: "github.com/ecadlabs/octez-ecad-sc.(*clientMockup).Heads", - Args: []any{&client.HeadsRequest{ - Chain: "NetXH12Aer3be93", - }}, - Ret: []any{error(nil)}, - }, - { - Ret: []any{&cl.contexts[1].head}, - }, - { - Func: "github.com/ecadlabs/octez-ecad-sc.(*clientMockup).BlockProtocols", - Args: []any{&client.SimpleRequest{ - Chain: "NetXH12Aer3be93", - Block: "BKjARUyBRFjXVU8CGfgVNBprSJF5f76oCFJjin6787DWo5AnU9J", - }}, - Ret: []any{&client.BlockProtocols{Protocol: &cl.contexts[1].proto}, error(nil)}, - }, - { - Func: "github.com/ecadlabs/octez-ecad-sc.(*clientMockup).Constants", - Args: []any{&client.ContextRequest{ - Chain: "NetXH12Aer3be93", - Block: "BKjARUyBRFjXVU8CGfgVNBprSJF5f76oCFJjin6787DWo5AnU9J", - Protocol: &cl.contexts[1].proto, - }}, - Ret: []any{&cl.contexts[1].constants, error(nil)}, - }, - } - - mon := (&HeadMonitorConfig{ - Client: &cl, - ChainID: &gotez.ChainID{0}, - UseTimestamps: true, - }).New() - - mon.Start() - <-cl.done - mon.Stop(context.Background()) - - require.Equal(t, expect, cl.calls) -} diff --git a/poller.go b/poller.go new file mode 100644 index 0000000..46fc906 --- /dev/null +++ b/poller.go @@ -0,0 +1,226 @@ +package main + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + tz "github.com/ecadlabs/gotez/v2" + client "github.com/ecadlabs/gotez/v2/clientv2" + "github.com/ecadlabs/gotez/v2/clientv2/mempool" + "github.com/ecadlabs/gotez/v2/clientv2/network" + "github.com/ecadlabs/gotez/v2/clientv2/utils" + "github.com/ecadlabs/gotez/v2/protocol/latest" + "github.com/ecadlabs/gotez/v2/protocol/proto_016_PtMumbai" + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" +) + +type PollerConfig struct { + Client *client.Client + ChainID *tz.ChainID + Timeout time.Duration + Interval time.Duration + Reg prometheus.Registerer + NextProtocolFunc func() *tz.ProtocolHash +} + +type Poller struct { + cfg PollerConfig + + mtx sync.RWMutex + status utils.BootstrappedResponse + + cancel context.CancelFunc + done chan struct{} + + bsGauge prometheus.Gauge + connGauge *prometheus.GaugeVec + opsGauge *prometheus.GaugeVec +} + +func (c *PollerConfig) New() *Poller { + b := &Poller{ + cfg: *c, + bsGauge: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "tezos", + Subsystem: "node", + Name: "bootstrapped", + Help: "Returns 1 if the node has synchronized its chain with a few peers.", + }), + connGauge: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "tezos", + Subsystem: "node", + Name: "connections", + Help: "Current number of connections to/from this node.", + }, []string{"direction", "private"}), + opsGauge: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "tezos", + Subsystem: "node", + Name: "mempool_operations", + Help: "The current number of mempool operations.", + }, []string{"kind", "pool", "proto"}), + } + if c.Reg != nil { + c.Reg.MustRegister(b.bsGauge) + c.Reg.MustRegister(b.connGauge) + c.Reg.MustRegister(b.opsGauge) + } + return b +} + +func (p *Poller) Start() { + ctx, cancel := context.WithCancel(context.Background()) + p.cancel = cancel + p.done = make(chan struct{}) + go p.loop(ctx) +} + +func (p *Poller) Stop(ctx context.Context) error { + p.cancel() + select { + case <-p.done: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (p *Poller) Status() utils.BootstrappedResponse { + p.mtx.RLock() + defer p.mtx.RUnlock() + return p.status +} + +func (p *Poller) loop(ctx context.Context) { + t := time.NewTicker(p.cfg.Interval) + defer func() { + t.Stop() + close(p.done) + }() + + pollers := []func(context.Context, chan<- error){ + p.pollBootstrapped, + p.pollConnections, + p.pollMempoolOperations, + } + errCh := make(chan error, len(pollers)) + + for { + for _, poller := range pollers { + go poller(ctx, errCh) + } + done := false + for range pollers { + err := <-errCh + if err != nil { + if errors.Is(err, context.Canceled) { + done = true + } else { + log.WithField("chain_id", p.cfg.ChainID).Warn(err) + } + } + } + if done { + return + } + + select { + case <-t.C: + case <-ctx.Done(): + return + } + } +} + +func (p *Poller) pollBootstrapped(ctx context.Context, errCh chan<- error) { + var err error + defer func() { errCh <- err }() + + c, cancel := context.WithTimeout(ctx, p.cfg.Timeout) + defer cancel() + resp, err := utils.IsBootstrapped(c, p.cfg.Client, p.cfg.ChainID) + if err != nil { + return + } + p.mtx.Lock() + p.status = *resp + p.mtx.Unlock() + v := 0.0 + if resp.SyncState == utils.SyncStateSynced && resp.Bootstrapped { + v = 1 + } + p.bsGauge.Set(v) +} + +func (p *Poller) pollConnections(ctx context.Context, errCh chan<- error) { + var err error + defer func() { errCh <- err }() + + c, cancel := context.WithTimeout(ctx, p.cfg.Timeout) + defer cancel() + resp, err := network.Connections(c, p.cfg.Client) + if err != nil { + return + } + + p.connGauge.Reset() + for _, conn := range resp.Connections { + var dir string + if conn.Incoming { + dir = "incoming" + } else { + dir = "outgoing" + } + p.connGauge.With(prometheus.Labels{"direction": dir, "private": fmt.Sprintf("%t", conn.Private)}).Inc() + } +} + +func updatePool(g *prometheus.GaugeVec, list []*proto_016_PtMumbai.OperationWithoutMetadata[latest.OperationContents]) { + for _, grp := range list { + for _, op := range grp.Operations() { + g.With(prometheus.Labels{"kind": op.OperationKind()}).Inc() + } + } +} + +func (p *Poller) pollMempoolOperations(ctx context.Context, errCh chan<- error) { + var err error + defer func() { errCh <- err }() + + c, cancel := context.WithTimeout(ctx, p.cfg.Timeout) + defer cancel() + resp, err := mempool.PendingOperations(c, p.cfg.Client, p.cfg.ChainID) + if err != nil { + return + } + + p.opsGauge.Reset() + gauge := p.opsGauge.MustCurryWith(prometheus.Labels{"proto": p.cfg.NextProtocolFunc().String()}) + + g := gauge.MustCurryWith(prometheus.Labels{"pool": "validated"}) + for _, list := range resp.Validated { + updatePool(g, list.Contents) + } + + pools := [][]*mempool.PendingOperationsListWithError{ + resp.Refused, + resp.Outdated, + resp.BranchRefused, + resp.BranchDelayed, + } + poolNames := []string{"refused", "outdated", "branch_refused", "branch_delayed"} + for i, pool := range pools { + g := gauge.MustCurryWith(prometheus.Labels{"pool": poolNames[i]}) + for _, list := range pool { + updatePool(g, list.Contents) + } + } + + g = gauge.MustCurryWith(prometheus.Labels{"pool": "unprocessed"}) + for _, list := range resp.Unprocessed { + updatePool(g, list.Contents) + } +}