Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: node health check #11

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- `AKASH_PROXY_SEED_REFRESH_INTERVAL` (default: `5m`) - How frequently fetch SEED_URL for updates.
- `AKASH_PROXY_CHAIN_ID` (default: `akashnet-2`) - Expected chain ID.
- `AKASH_PROXY_HEALTHY_THRESHOLD` (default: `10s`) - How slow on average a node needs to be to be marked as unhealthy.
- `AKASH_PROXY_HEALTH_INTERVAL` (default: `5m`) - Check Health on endpoints.
- `AKASH_PROXY_HEALTHY_ERROR_RATE_THRESHOLD` (default: `30`) - Percentage of request errors deemed acceptable.
- `AKASH_PROXY_HEALTHY_ERROR_RATE_BUCKET_TIMEOUT` (default: `1m`) - How long in the past requests are considered to check for status codes.
- `AKASH_PROXY_PROXY_REQUEST_TIMEOUT` (default: `15s`) - Request timeout for a proxied request.
Expand Down
3 changes: 3 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type Config struct {
// How slow on average a node needs to be to be marked as unhealthy.
HealthyThreshold time.Duration `env:"HEALTHY_THRESHOLD" envDefault:"10s"`

// Check Health on endpoints.
CheckHealthInterval time.Duration `env:"HEALTH_INTERVAL" envDefault:"5m"`

// Percentage of request errors deemed acceptable.
HealthyErrorRateThreshold float64 `env:"HEALTHY_ERROR_RATE_THRESHOLD" envDefault:"30"`

Expand Down
112 changes: 109 additions & 3 deletions internal/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package proxy

import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"math/rand"
"net/http"
Expand All @@ -10,6 +13,7 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/akash-network/rpc-proxy/internal/config"
"github.com/akash-network/rpc-proxy/internal/seed"
Expand All @@ -34,6 +38,36 @@ func New(
}
}

type RPCSyncInfo struct {
LatestBlockTime time.Time `json:"latest_block_time"`
CatchingUp bool `json:"catching_up"`
}
type RPCNodeInfo struct {
ID string `json:"id"`
Network string `json:"network"`
Version string `json:"version"`
}

type RPCStatusResponse struct {
Jsonrpc string `json:"jsonrpc"`
Result struct {
NodeInfo RPCNodeInfo `json:"node_info"`
SyncInfo RPCSyncInfo `json:"sync_info"`
} `json:"result"`
}

type RestBlockHeader struct {
ChainID string `json:"chain_id"`
Height string `json:"height"`
Time time.Time `json:"time"`
}

type RestStatusResponse struct {
Block struct {
Header RestBlockHeader `json:"header"`
} `json:"block"`
}

type Proxy struct {
cfg config.Config
kind ProxyKind
Expand All @@ -59,7 +93,7 @@ func (p *Proxy) Stats() []ServerStat {
Name: s.name,
URL: s.url.String(),
Avg: s.pings.Last(),
Degraded: !s.Healthy(),
Degraded: !s.IsHealthy(),
Initialized: reqCount > 0,
Requests: reqCount,
ErrorRate: s.ErrorRate(),
Expand Down Expand Up @@ -91,6 +125,77 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}

func checkEndpoint(url string, kind ProxyKind) error {
switch kind {
case RPC:
return checkRPC(url)
case Rest:
return checkREST(url)
default:
return fmt.Errorf("unsupported proxy kind: %v", kind)
}
}

func performGetRequest(url string, timeout time.Duration) ([]byte, error) {
client := &http.Client{Timeout: timeout}
resp, err := client.Get(url)
if err != nil {
return nil, fmt.Errorf("error making request: %v", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading response body: %v", err)
}

return body, nil
}

func checkRPC(url string) error {
body, err := performGetRequest(url+"/status", 2*time.Second)
if err != nil {
return err
}

var status RPCStatusResponse
if err := json.Unmarshal(body, &status); err != nil {
return fmt.Errorf("error unmarshaling JSON in RPC check: %v (response body: %s)", err, string(body))
}

if status.Result.SyncInfo.CatchingUp {
return fmt.Errorf("node is still catching up")
}

if !status.Result.SyncInfo.LatestBlockTime.After(time.Now().UTC().Add(-time.Minute)) {
return fmt.Errorf("latest block time is more than 1 minute old")
}

return nil
}

func checkREST(url string) error {
body, err := performGetRequest(url+"/blocks/latest", 2*time.Second)
if err != nil {
return err
}

var status RestStatusResponse
if err := json.Unmarshal(body, &status); err != nil {
return fmt.Errorf("error unmarshaling JSON in REST check: %v (response body: %s)", err, string(body))
}

if !status.Block.Header.Time.After(time.Now().UTC().Add(-time.Minute)) {
nick134-bit marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("latest block time is more than 1 minute old")
}

return nil
}

func (p *Proxy) next() *Server {
p.mu.Lock()
if len(p.servers) == 0 {
Expand All @@ -100,7 +205,7 @@ func (p *Proxy) next() *Server {
server := p.servers[p.round%len(p.servers)]
p.round++
p.mu.Unlock()
if server.Healthy() && server.ErrorRate() <= p.cfg.HealthyErrorRateThreshold {
if server.IsHealthy() && server.ErrorRate() <= p.cfg.HealthyErrorRateThreshold {
return server
}
if rand.Intn(99)+1 < p.cfg.UnhealthyServerRecoverChancePct {
Expand Down Expand Up @@ -136,6 +241,7 @@ func (p *Proxy) doUpdate(providers []seed.Provider) error {
provider.Provider,
provider.Address,
p.cfg,
p.kind,
)
if err != nil {
return err
Expand All @@ -147,7 +253,7 @@ func (p *Proxy) doUpdate(providers []seed.Provider) error {
// remove deleted servers
p.servers = slices.DeleteFunc(p.servers, func(srv *Server) bool {
for _, provider := range providers {
if provider.Provider == srv.name {
if provider.Provider == srv.name && srv.healthy.Load() {
return false
}
}
Expand Down
54 changes: 47 additions & 7 deletions internal/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package proxy

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
Expand All @@ -28,14 +29,50 @@ func TestProxy(t *testing.T) {

func testProxy(tb testing.TB, kind ProxyKind) {
srv1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = io.WriteString(w, "srv1 replied")
if r.URL.Path == "/status" || r.URL.Path == "/blocks/latest" {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]interface{}{
"result": map[string]interface{}{
"sync_info": map[string]interface{}{
"latest_block_time": time.Now().Format(time.RFC3339),
"catching_up": false,
},
},
"block": map[string]interface{}{
"header": map[string]interface{}{
"time": time.Now().Format(time.RFC3339),
},
},
})
} else {
_, _ = io.WriteString(w, "srv1 replied")
}
}))
tb.Cleanup(srv1.Close)

srv2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(time.Millisecond * 500)
_, _ = io.WriteString(w, "srv2 replied")
if r.URL.Path == "/status" || r.URL.Path == "/blocks/latest" {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]interface{}{
"result": map[string]interface{}{
"sync_info": map[string]interface{}{
"latest_block_time": time.Now().Format(time.RFC3339),
"catching_up": false,
},
},
"block": map[string]interface{}{
"header": map[string]interface{}{
"time": time.Now().Format(time.RFC3339),
},
},
})
} else {
time.Sleep(time.Millisecond * 500)
_, _ = io.WriteString(w, "srv2 replied")
}
}))
tb.Cleanup(srv2.Close)

srv3 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusTeapot)
}))
Expand All @@ -48,6 +85,7 @@ func testProxy(tb testing.TB, kind ProxyKind) {
UnhealthyServerRecoverChancePct: 1,
HealthyErrorRateThreshold: 10,
HealthyErrorRateBucketTimeout: time.Second * 10,
CheckHealthInterval: time.Second * 5,
})

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -77,8 +115,7 @@ func testProxy(tb testing.TB, kind ProxyKind) {
}

require.Eventually(tb, func() bool { return proxy.initialized.Load() }, time.Second, time.Millisecond)

require.Len(tb, proxy.servers, 3)
require.Len(tb, proxy.servers, 2)

proxySrv := httptest.NewServer(proxy)
tb.Cleanup(proxySrv.Close)
Expand Down Expand Up @@ -111,7 +148,7 @@ func testProxy(tb testing.TB, kind ProxyKind) {
cancel()

stats := proxy.Stats()
require.Len(tb, stats, 3)
require.Len(tb, stats, 2)

var srv1Stats ServerStat
var srv2Stats ServerStat
Expand All @@ -129,11 +166,14 @@ func testProxy(tb testing.TB, kind ProxyKind) {
}
require.Zero(tb, srv1Stats.ErrorRate)
require.Zero(tb, srv2Stats.ErrorRate)
require.Equal(tb, float64(100), srv3Stats.ErrorRate)
require.Greater(tb, srv1Stats.Requests, srv2Stats.Requests)
require.Greater(tb, srv2Stats.Avg, srv1Stats.Avg)
require.False(tb, srv1Stats.Degraded)
require.True(tb, srv2Stats.Degraded)
require.True(tb, srv1Stats.Initialized)
require.True(tb, srv2Stats.Initialized)
require.False(tb, srv3Stats.Initialized)

require.Len(tb, proxy.servers, 2)
require.Equal(tb, int64(0), srv3Stats.Requests)
}
66 changes: 46 additions & 20 deletions internal/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,63 @@ import (
"github.com/akash-network/rpc-proxy/internal/ttlslice"
)

func newServer(name, addr string, cfg config.Config) (*Server, error) {
func newServer(name, addr string, cfg config.Config, kind ProxyKind) (*Server, error) {
target, err := url.Parse(addr)
if err != nil {
return nil, fmt.Errorf("could not create new server: %w", err)
}
return &Server{
name: name,
url: target,
pings: avg.Moving(50),
cfg: cfg,
successes: ttlslice.New[int](),
failures: ttlslice.New[int](),
}, nil

server := &Server{
name: name,
url: target,
pings: avg.Moving(50),
cfg: cfg,
successes: ttlslice.New[int](),
failures: ttlslice.New[int](),
lastHealthCheck: time.Now().UTC(),
healthy: atomic.Bool{},
kind: kind,
}

err = checkEndpoint(addr, kind)
server.healthy.Store(err == nil)

return server, nil
}

type Server struct {
name string
url *url.URL
kind ProxyKind
pings *avg.MovingAverage
lastHealthCheck time.Time
healthy atomic.Bool

requestCount atomic.Int64
cfg config.Config
name string
url *url.URL
pings *avg.MovingAverage
successes *ttlslice.Slice[int]
failures *ttlslice.Slice[int]
requestCount atomic.Int64
}

func (s *Server) IsHealthy() bool {
now := time.Now().UTC()
//Add different config value if wanted
if now.Sub(s.lastHealthCheck) >= s.cfg.CheckHealthInterval {
slog.Info("checking health", "name", s.name)
err := checkEndpoint(s.url.String(), s.kind)
healthy := err == nil
s.healthy.Store(healthy)
s.lastHealthCheck = now

if healthy {
slog.Info("server is healthy", "name", s.name)
} else {
slog.Error("server is unhealthy", "name", s.name, "err", err)
}
}

return s.pings.Last() < s.cfg.HealthyThreshold && s.healthy.Load()
}
func (s *Server) ErrorRate() float64 {
suss := len(s.successes.List())
fail := len(s.failures.List())
Expand All @@ -49,12 +81,6 @@ func (s *Server) ErrorRate() float64 {
}
return (float64(fail) * 100) / float64(total)
}

func (s *Server) Healthy() bool {
return s.pings.Last() < s.cfg.HealthyThreshold &&
s.ErrorRate() < s.cfg.HealthyErrorRateThreshold
}

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var status int = -1
start := time.Now()
Expand Down Expand Up @@ -108,7 +134,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.failures.Append(status, s.cfg.HealthyErrorRateBucketTimeout)
}

if !s.Healthy() && ctx.Err() == nil && err == nil {
if !s.IsHealthy() && ctx.Err() == nil && err == nil {
// if it's not healthy, this is a tryout to improve - if the request
// wasn't canceled, reset stats
slog.Info("resetting statistics", "name", s.name)
Expand Down