Skip to content

Commit

Permalink
fix(updater): retry process rounds on network failure; add health check
Browse files Browse the repository at this point in the history
  • Loading branch information
oyyblin committed Nov 21, 2024
1 parent a614b27 commit d2daed6
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 4 deletions.
2 changes: 2 additions & 0 deletions charts/updater/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ spec:
ports:
- containerPort: 4014
name: http-prometheus
- containerPort: 8080
name: http
envFrom:
- configMapRef:
name: {{ include "updater.fullname" . }}
Expand Down
4 changes: 4 additions & 0 deletions charts/updater/templates/service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,8 @@ spec:
targetPort: http-prometheus
protocol: TCP
name: http-prometheus
- port: 8080
targetPort: http
protocol: TCP
name: http
selector: {{- include "updater.selectorLabels" . | nindent 4}}
43 changes: 41 additions & 2 deletions updater/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,34 @@ func main() {
errGroup, ctx := errgroup.WithContext(context.Background())
errGroup.Go(func() error {
if err := updater.Start(ctx); err != nil {
log.Error().Err(err).Msg("error running updater")
return err
}
return nil
})

// Start health check server
errGroup.Go(func() error {
log.Info().Int("port", cfg.HttpPort).Msg("Starting health check server...")

healthMux := http.NewServeMux()
healthMux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
})

healthServer := &http.Server{
Addr: fmt.Sprintf(":%d", cfg.HttpPort),
Handler: healthMux,
}

go func() {
<-ctx.Done()
healthServer.Shutdown(context.Background())
}()

if err := healthServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Error().Err(err).Msg("error running health check server")
return err
}
return nil
Expand All @@ -105,8 +133,19 @@ func main() {
// Start metrics server
errGroup.Go(func() error {
log.Info().Int("port", cfg.MetricsPort).Msg("Starting metrics server...")
http.Handle("/metrics", promhttp.Handler())
if err := http.ListenAndServe(fmt.Sprintf(":%d", cfg.MetricsPort), nil); err != nil {

metricsServer := &http.Server{
Addr: fmt.Sprintf(":%d", cfg.MetricsPort),
Handler: promhttp.Handler(),
}

go func() {
<-ctx.Done()
metricsServer.Shutdown(context.Background())
}()

if err := metricsServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Error().Err(err).Msg("error running metrics server")
return err
}
return nil
Expand Down
1 change: 1 addition & 0 deletions updater/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ type Config struct {
SenderPrivateKey string `envconfig:"SENDER_PRIVATE_KEY" required:"true"`
GenesisRound uint64 `envconfig:"GENESIS_ROUND" required:"true"`
MetricsPort int `envconfig:"METRICS_PORT" default:"4014"`
HttpPort int `envconfig:"HTTP_PORT" default:"8080"`
}
35 changes: 33 additions & 2 deletions updater/internal/service/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"drand-oracle-updater/signer"
"encoding/hex"
"errors"
"math"
"sync"
"time"

Expand Down Expand Up @@ -236,8 +237,38 @@ func (u *Updater) processRounds(ctx context.Context) error {
case <-ctx.Done():
return nil
case rd := <-u.roundChan:
if err := u.processRound(ctx, rd.round, rd.randomness, rd.signature); err != nil {
log.Error().Err(err).Uint64("round", rd.round).Msg("Failed to process round")
// Add retry logic with exponential backoff
maxRetries := 5
var err error
for attempt := 0; attempt < maxRetries; attempt++ {
err = u.processRound(ctx, rd.round, rd.randomness, rd.signature)
if err == nil {
break
}

if attempt < maxRetries-1 {
backoffDuration := time.Duration(math.Pow(2, float64(attempt))) * time.Second
log.Warn().
Err(err).
Uint64("round", rd.round).
Int("attempt", attempt+1).
Dur("backoff", backoffDuration).
Msg("Retrying round processing after backoff")

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(backoffDuration):
continue
}
}
}

if err != nil {
log.Error().
Err(err).
Uint64("round", rd.round).
Msg("Failed to process round after all retries")
return err
}
}
Expand Down

0 comments on commit d2daed6

Please sign in to comment.