From ec872fda6274b942a260815b7f7a0a0fb4830e5a Mon Sep 17 00:00:00 2001 From: robertlincecum Date: Wed, 9 Aug 2023 14:29:58 -0500 Subject: [PATCH] quaistats.go -> refactored to work with the new stats backend --- Makefile | 1 + go.mod | 1 + params/bootnodes.go | 12 +- quaistats/quaistats.go | 391 ++++++++++++++++++++++++++++++----------- 4 files changed, 301 insertions(+), 104 deletions(-) diff --git a/Makefile b/Makefile index d05198d1f1..e38f520c1e 100644 --- a/Makefile +++ b/Makefile @@ -124,6 +124,7 @@ run-all: ifeq (,$(wildcard nodelogs)) mkdir nodelogs endif + @nohup $(BASE_CMD) --port $(PRIME_PORT_TCP) --http.port $(PRIME_PORT_HTTP) --ws.port $(PRIME_PORT_WS) --sub.urls $(PRIME_SUB_URLS) >> nodelogs/prime.log 2>&1 & @nohup $(BASE_CMD) --port $(REGION_0_PORT_TCP) --http.port $(REGION_0_PORT_HTTP) --ws.port $(REGION_0_PORT_WS) --dom.url $(REGION_0_DOM_URL):$(PRIME_PORT_WS) --sub.urls $(REGION_0_SUB_URLS) --region 0 >> nodelogs/region-0.log 2>&1 & @nohup $(BASE_CMD) --port $(REGION_1_PORT_TCP) --http.port $(REGION_1_PORT_HTTP) --ws.port $(REGION_1_PORT_WS) --dom.url $(REGION_1_DOM_URL):$(PRIME_PORT_WS) --sub.urls $(REGION_1_SUB_URLS) --region 1 >> nodelogs/region-1.log 2>&1 & diff --git a/go.mod b/go.mod index 6ad431f1cf..18c39e2cda 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/cockroachdb/pebble v0.0.0-20230701135918-609ae80aea41 github.com/davecgh/go-spew v1.1.1 github.com/deckarep/golang-set v1.8.0 + github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/docker/docker v1.6.2 github.com/dominant-strategies/bn256 v0.0.0-20220930122411-fbf930a7493d github.com/edsrzf/mmap-go v1.1.0 diff --git a/params/bootnodes.go b/params/bootnodes.go index d601f3c694..75b4da6bea 100644 --- a/params/bootnodes.go +++ b/params/bootnodes.go @@ -29,9 +29,9 @@ var ColosseumBootnodes = []string{ // GardenBootnodes are the enode URLs of the P2P bootstrap nodes running on the // Garden test network. var GardenBootnodes = []string{ - "enode://f353567741755a4b35afb4641618b34c7e3c1666b49c9e519e8336c124ba8c2c556f00e9edcd87599994e9674ebdea3503a5ff1948a257c1540fe9f8c0fedf62@35.231.138.241", // us-east1-b - "enode://aa3d4daa8ea958c8a4fe3c56a7c5c0509754eec9b2e4eb45ac5fe9eb16bea442da0b50a0ac09e5fdf7711eb6b64df76dcd22e48d69f2f938d19fc60b9a8fd579@34.118.24.175", // europe-central2-a - "enode://13ed279b8013b61ef41466d4f07679fe17c407af4f6ae09b34042c72635eef314e9020f732a96a739abaa88f51108b76698f6b7335397be99418c371e5c3bcce@34.68.104.77", // us-central1-a + "enode://f353567741755a4b35afb4641618b34c7e3c1666b49c9e519e8336c124ba8c2c556f00e9edcd87599994e9674ebdea3503a5ff1948a257c1540fe9f8c0fedf62@35.231.138.241", // us-east1-b + "enode://aa3d4daa8ea958c8a4fe3c56a7c5c0509754eec9b2e4eb45ac5fe9eb16bea442da0b50a0ac09e5fdf7711eb6b64df76dcd22e48d69f2f938d19fc60b9a8fd579@34.118.24.175", // europe-central2-a + "enode://13ed279b8013b61ef41466d4f07679fe17c407af4f6ae09b34042c72635eef314e9020f732a96a739abaa88f51108b76698f6b7335397be99418c371e5c3bcce@34.68.104.77", // us-central1-a } // OrchardBootnodes are the enode URLs of the P2P bootstrap nodes running on the @@ -45,9 +45,9 @@ var OrchardBootnodes = []string{ // LighthouseBootnodes are the enode URLs of the P2P bootstrap nodes running on the // Lighthouse test network var LighthouseBootnodes = []string{ - "enode://ee89c22bff79d040fcf3dbaea3bcbe429e68b0ca9e32671027554e96aea3f132f6abed8cf5be514c50b76cf2cab96c7d9064a93f0bbd0903f26df4be01ce0e6a@35.196.124.28", // europe-southwest1-a - "enode://b39cf3080c8c9165bf0b50a7f6c8ff5a3568649b0c57ae786f630a054722fccfec7e3232594eb37a62a04e7c310a4d4e899ea42c0bd5a5043a248510715e2af9@35.187.55.110", // southamerica-east1-b - "enode://ce3daf05c462b36bc1a6261b8edb0cd72bf041c1f8fb59100d6a09dc3415d1f136cc8768b12129db73c48085c8e01d7a606be447d3f681f405ab427641599235@34.92.50.205", // asia-northeast3-a + "enode://ee89c22bff79d040fcf3dbaea3bcbe429e68b0ca9e32671027554e96aea3f132f6abed8cf5be514c50b76cf2cab96c7d9064a93f0bbd0903f26df4be01ce0e6a@34.148.120.168", // europe-southwest1-a + "enode://b39cf3080c8c9165bf0b50a7f6c8ff5a3568649b0c57ae786f630a054722fccfec7e3232594eb37a62a04e7c310a4d4e899ea42c0bd5a5043a248510715e2af9@34.209.106.192", // southamerica-east1-b + "enode://ce3daf05c462b36bc1a6261b8edb0cd72bf041c1f8fb59100d6a09dc3415d1f136cc8768b12129db73c48085c8e01d7a606be447d3f681f405ab427641599235@34.116.214.14", // asia-northeast3-a } var V5Bootnodes = []string{} diff --git a/quaistats/quaistats.go b/quaistats/quaistats.go index 688b591e89..2f90276b81 100644 --- a/quaistats/quaistats.go +++ b/quaistats/quaistats.go @@ -18,10 +18,12 @@ package quaistats import ( + "bytes" "context" "encoding/json" "errors" "fmt" + "io/ioutil" "math/big" "net/http" "runtime" @@ -30,6 +32,8 @@ import ( "sync" "time" + "github.com/dgrijalva/jwt-go" + lru "github.com/hashicorp/golang-lru" "github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/mem" @@ -272,96 +276,153 @@ func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent, chainSideCh chan co }() // Resolve the URL, defaulting to TLS, but falling back to none too - path := fmt.Sprintf("%s/api", s.host) - urls := []string{path} + paths := map[string]string{ + "block": fmt.Sprintf("%s/block", s.host), + "peerStats": fmt.Sprintf("%s/peerStats", s.host), + "transactions": fmt.Sprintf("%s/transactions", s.host), + "nodeStats": fmt.Sprintf("%s/nodeStats", s.host), + "pendStats": fmt.Sprintf("%s/pendStats", s.host), + "login": fmt.Sprintf("%s/auth/login", s.host), + } + + // urls := []string{path} + + urlMap := make(map[string][]string) - // url.Parse and url.IsAbs is unsuitable (https://github.com/golang/go/issues/19779) - if !strings.Contains(path, "://") { - urls = []string{"wss://" + path, "ws://" + path} + for key, path := range paths { + // url.Parse and url.IsAbs is unsuitable (https://github.com/golang/go/issues/19779) + if !strings.Contains(path, "://") { + // Append both secure (wss) and non-secure (ws) URLs + if key == "login" { + urlMap[key] = []string{"http://" + path} + } else { + urlMap[key] = []string{"wss://" + path, "ws://" + path} + } + } else { + urlMap[key] = []string{path} + } } errTimer := time.NewTimer(0) defer errTimer.Stop() + var authJwt = "" // Loop reporting until termination for { select { case <-quitCh: return case <-errTimer.C: + // If we don't have a JWT or it's expired, get a new one + + isJwtExpiredResult, jwtIsExpiredErr := s.isJwtExpired(authJwt) + if authJwt == "" || isJwtExpiredResult || jwtIsExpiredErr != nil { + var err error + authJwt, err = s.login2(urlMap["login"][0]) + if err != nil { + log.Warn("Stats login failed", "err", err) + errTimer.Reset(10 * time.Second) + continue + } + } // Establish a websocket connection to the server on any supported URL - var ( - conn *connWrapper - err error - ) dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second} header := make(http.Header) header.Set("origin", "http://localhost") - for _, url := range urls { - c, _, e := dialer.Dial(url, header) - err = e - if err == nil { - conn = newConnectionWrapper(c) - break + header.Set("sec-websocket-protocol", authJwt) + + conns := make(map[string]*connWrapper) + errs := make(map[string]error) + + for key, urls := range urlMap { + if key == "login" { + continue + } + for _, url := range urls { + c, _, e := dialer.Dial(url, header) + err := e + if err == nil { + conns[key] = newConnectionWrapper(c) + break + } + if err != nil { + log.Warn(key+" stats server unreachable", "err", err) + errs[key] = err + errTimer.Reset(10 * time.Second) + continue + } + go s.readLoop(conns[key]) } } - if err != nil { - log.Warn("Stats server unreachable", "err", err) - errTimer.Reset(10 * time.Second) - continue - } + // Authenticate the client with the server - if err = s.login(conn); err != nil { - log.Warn("Stats login failed", "err", err) - conn.Close() - errTimer.Reset(10 * time.Second) - continue - } - go s.readLoop(conn) + for key, conn := range conns { + if errs[key] = s.report(key, conn); errs[key] != nil { + log.Warn("Initial stats report failed", "err", errs[key]) + conn.Close() + errTimer.Reset(0) + continue + } - // Send the initial stats so our node looks decent from the get go - if err = s.report(conn); err != nil { - log.Warn("Initial stats report failed", "err", err) - conn.Close() - errTimer.Reset(0) - continue } + // Keep sending status updates until the connection breaks fullReport := time.NewTicker(reportInterval * time.Second) - for err == nil { + var noErrs = true + for noErrs { + var err error select { case <-quitCh: fullReport.Stop() // Make sure the connection is closed - conn.Close() + for _, conn := range conns { + conn.Close() + } + return case <-fullReport.C: - if err = s.report(conn); err != nil { - log.Warn("Full stats report failed", "err", err) + if err = s.report("nodeStats", conns["nodeStats"]); err != nil { + noErrs = false + log.Warn("nodeStats full stats report failed", "err", err) + } + if err = s.report("peerStats", conns["peerStats"]); err != nil { + noErrs = false + log.Warn("peerStats full stats report failed", "err", err) + } + if err = s.report("transactions", conns["transactions"]); err != nil { + noErrs = false + log.Warn("transactions full stats report failed", "err", err) } case list := <-s.histCh: - if err = s.reportHistory(conn, list); err != nil { + if err = s.reportHistory(conns["block"], list); err != nil { + noErrs = false log.Warn("Requested history report failed", "err", err) } case head := <-headCh: - if err = s.reportBlock(conn, head); err != nil { + if err = s.reportBlock(conns["block"], head); err != nil { + noErrs = false log.Warn("Block stats report failed", "err", err) } if nodeCtx == common.ZONE_CTX && s.backend.ProcessingState() { - if err = s.reportPending(conn); err != nil { + if err = s.reportPending(conns["pendStats"]); err != nil { + noErrs = false log.Warn("Post-block transaction stats report failed", "err", err) } } case sideEvent := <-sideCh: - if err = s.reportSideBlock(conn, sideEvent); err != nil { + if err = s.reportSideBlock(conns["block"], sideEvent); err != nil { + noErrs = false log.Warn("Block stats report failed", "err", err) } } } fullReport.Stop() // Close the current connection and establish a new one - conn.Close() + for _, conn := range conns { + conn.Close() + } + errTimer.Reset(0) } } @@ -476,9 +537,14 @@ type nodeInfo struct { // authMsg is the authentication infos needed to login to a monitoring server. type authMsg struct { - ID string `json:"id"` - Info nodeInfo `json:"info"` - Secret string `json:"secret"` + ID string `json:"id"` + Info nodeInfo `json:"info"` + Secret loginSecret `json:"secret"` +} + +type loginSecret struct { + Name string `json:"name"` + Password string `json:"password"` } // login tries to authorize the client at the remote server. @@ -510,7 +576,10 @@ func (s *Service) login(conn *connWrapper) error { Chain: common.NodeLocation.Name(), ChainID: s.chainID.Uint64(), }, - Secret: s.pass, + Secret: loginSecret{ + Name: "admin", + Password: s.pass, + }, } login := map[string][]interface{}{ "emit": {"hello", auth}, @@ -526,21 +595,129 @@ func (s *Service) login(conn *connWrapper) error { return nil } +type Credentials struct { + Name string `json:"name"` + Password string `json:"password"` +} + +type AuthResponse struct { + Success bool `json:"success"` + Token string `json:"token"` +} + +func (s *Service) login2(url string) (string, error) { + // Substitute with your actual service address and port + + infos := s.server.NodeInfo() + + var protocols []string + for _, proto := range s.server.Protocols { + protocols = append(protocols, fmt.Sprintf("%s/%d", proto.Name, proto.Version)) + } + var network string + if info := infos.Protocols["eth"]; info != nil { + network = fmt.Sprintf("%d", info.(*ethproto.NodeInfo).Network) + } + + auth := &authMsg{ + ID: s.node, + Info: nodeInfo{ + Name: s.node, + Node: infos.Name, + Port: infos.Ports.Listener, + Network: network, + Protocol: strings.Join(protocols, ", "), + API: "No", + Os: runtime.GOOS, + OsVer: runtime.GOARCH, + Client: "0.1.1", + History: true, + Chain: common.NodeLocation.Name(), + ChainID: s.chainID.Uint64(), + }, + Secret: loginSecret{ + Name: "admin", + Password: s.pass, + }, + } + + authJson, err := json.Marshal(auth) + if err != nil { + return "", err + } + + resp, err := http.Post(url, "application/json", bytes.NewBuffer(authJson)) + if err != nil { + return "", err + } + defer resp.Body.Close() + + body, _ := ioutil.ReadAll(resp.Body) + + var authResponse AuthResponse + err = json.Unmarshal(body, &authResponse) + if err != nil { + return "", err + } + + if authResponse.Success { + return authResponse.Token, nil + } + + return "", fmt.Errorf("login failed") +} + +// isJwtExpired checks if the JWT token is expired +func (s *Service) isJwtExpired(authJwt string) (bool, error) { + if authJwt == "" { + return false, errors.New("token is nil") + } + + parts := strings.Split(authJwt, ".") + if len(parts) != 3 { + return false, errors.New("invalid token") + } + + claims := jwt.MapClaims{} + _, _, err := new(jwt.Parser).ParseUnverified(authJwt, claims) + if err != nil { + return false, err + } + + if exp, ok := claims["exp"].(float64); ok { + return time.Now().Unix() >= int64(exp), nil + } + + return false, errors.New("exp claim not found in token") +} + // report collects all possible data to report and send it to the stats server. // This should only be used on reconnects or rarely to avoid overloading the // server. Use the individual methods for reporting subscribed events. -func (s *Service) report(conn *connWrapper) error { - nodeCtx := common.NodeLocation.Context() - if nodeCtx == common.ZONE_CTX && s.backend.ProcessingState() { - if err := s.reportPending(conn); err != nil { +func (s *Service) report(dataType string, conn *connWrapper) error { + if conn == nil || conn.conn == nil { + log.Warn(dataType + " connection is nil") + return errors.New(dataType + " connection is nil") + } + + switch dataType { + case "nodeStats": + if err := s.reportStats(conn); err != nil { return err } - } - if err := s.reportStats(conn); err != nil { - return err - } - if err := s.reportPeers(conn); err != nil { - return err + case "peerStats": + if err := s.reportPeers(conn); err != nil { + return err + } + case "transactions": + nodeCtx := common.NodeLocation.Context() + if nodeCtx == common.ZONE_CTX && s.backend.ProcessingState() { + if err := s.reportPending(conn); err != nil { + return err + } + } + default: + return nil } return nil } @@ -594,27 +771,27 @@ func (s *Service) reportLatency(conn *connWrapper) error { // blockStats is the information to report about individual blocks. type blockStats struct { - Number *big.Int `json:"number"` - Hash common.Hash `json:"hash"` - ParentHash common.Hash `json:"parentHash"` - Timestamp *big.Int `json:"timestamp"` - Miner common.Address `json:"miner"` - GasUsed uint64 `json:"gasUsed"` - GasLimit uint64 `json:"gasLimit"` - Diff string `json:"difficulty"` - Entropy string `json:"entropy"` - Txs []txStats `json:"transactions"` - TxHash common.Hash `json:"transactionsRoot"` - EtxHash common.Hash `json:"extTransactionsRoot"` - EtxRollupHash common.Hash `json:"extRollupRoot"` - ManifestHash common.Hash `json:"manifestHash"` - Root common.Hash `json:"stateRoot"` - Uncles uncleStats `json:"uncles"` - Chain string `json:"chain"` - ChainID uint64 `json:"chainId"` - Tps int64 `json:"tps"` - AppendTime time.Duration `json:"appendTime"` - AvgGasPerSec int64 `json:"avgGasPerSec"` + Number *big.Int `json:"number"` + Hash common.Hash `json:"hash"` + ParentHash common.Hash `json:"parentHash"` + Timestamp *big.Int `json:"timestamp"` + Miner common.Address `json:"miner"` + GasUsed uint64 `json:"gasUsed"` + GasLimit uint64 `json:"gasLimit"` + Diff string `json:"difficulty"` + Entropy string `json:"entropy"` + NoTransactions int `json:"noTransactions"` + TxHash common.Hash `json:"transactionsRoot"` + EtxHash common.Hash `json:"extTransactionsRoot"` + EtxRollupHash common.Hash `json:"extRollupRoot"` + ManifestHash common.Hash `json:"manifestHash"` + Root common.Hash `json:"stateRoot"` + Uncles uncleStats `json:"uncles"` + Chain string `json:"chain"` + ChainID uint64 `json:"chainId"` + Tps int64 `json:"tps"` + AppendTime time.Duration `json:"appendTime"` + AvgGasPerSec int64 `json:"avgGasPerSec"` } type blockTpsCacheDto struct { @@ -730,6 +907,10 @@ func (s *Service) computeTps(block *types.Block) int64 { // reportSideBlock retrieves the current chain side event and reports it to the stats server. func (s *Service) reportSideBlock(conn *connWrapper, block *types.Block) error { log.Trace("Sending new side block to quaistats", "number", block.Number(), "hash", block.Hash()) + if conn == nil || conn.conn == nil { + log.Warn("block connection is nil") + return errors.New("block connection is nil") + } stats := map[string]interface{}{ "id": s.node, @@ -749,6 +930,11 @@ func (s *Service) reportBlock(conn *connWrapper, block *types.Block) error { // Assemble the block report and send it to the server log.Trace("Sending new block to quaistats", "number", details.Number, "hash", details.Hash) + if conn == nil || conn.conn == nil { + log.Warn("block connection is nil") + return errors.New("block connection is nil") + } + stats := map[string]interface{}{ "id": s.node, "block": details, @@ -787,33 +973,38 @@ func (s *Service) assembleBlockStats(block *types.Block) *blockStats { appendTime := block.GetAppendTime() return &blockStats{ - Number: header.Number(), - Hash: header.Hash(), - ParentHash: header.ParentHash(), - Timestamp: new(big.Int).SetUint64(header.Time()), - Miner: author, - GasUsed: header.GasUsed(), - GasLimit: header.GasLimit(), - Diff: header.Difficulty().String(), - Entropy: entropy.String(), - Txs: txs, - TxHash: header.TxHash(), - EtxHash: header.EtxHash(), - EtxRollupHash: header.EtxRollupHash(), - ManifestHash: header.ManifestHash(), - Root: header.Root(), - Uncles: uncles, - Chain: common.NodeLocation.Name(), - ChainID: s.chainID.Uint64(), - Tps: tps, - AppendTime: appendTime, - AvgGasPerSec: avgGasPerSec, + Number: header.Number(), + Hash: header.Hash(), + ParentHash: header.ParentHash(), + Timestamp: new(big.Int).SetUint64(header.Time()), + Miner: author, + GasUsed: header.GasUsed(), + GasLimit: header.GasLimit(), + Diff: header.Difficulty().String(), + Entropy: entropy.String(), + NoTransactions: len(block.Transactions()), + TxHash: header.TxHash(), + EtxHash: header.EtxHash(), + EtxRollupHash: header.EtxRollupHash(), + ManifestHash: header.ManifestHash(), + Root: header.Root(), + Uncles: uncles, + Chain: common.NodeLocation.Name(), + ChainID: s.chainID.Uint64(), + Tps: tps, + AppendTime: appendTime, + AvgGasPerSec: avgGasPerSec, } } // reportHistory retrieves the most recent batch of blocks and reports it to the // stats server. func (s *Service) reportHistory(conn *connWrapper, list []uint64) error { + if conn == nil || conn.conn == nil { + log.Warn("history connection is nil") + return errors.New("history connection is nil") + } + // Figure out the indexes that need reporting indexes := make([]uint64, 0, historyUpdateRange) if len(list) > 0 { @@ -880,6 +1071,10 @@ func (s *Service) reportPending(conn *connWrapper) error { pending, _ := s.backend.Stats() // Assemble the transaction stats and send it to the server log.Trace("Sending pending transactions to quaistats", "count", strconv.Itoa(pending)) + if conn == nil || conn.conn == nil { + log.Warn("pending connection is nil") + return errors.New("pending connection is nil") + } stats := map[string]interface{}{ "id": s.node,