From a22a2947d71829675cd733b0fb396438dc093bad Mon Sep 17 00:00:00 2001 From: Carlos Rodriguez-Fernandez Date: Thu, 28 Mar 2024 08:47:20 -0700 Subject: [PATCH 1/4] reload config on SIGHUP Fixes: #118 Signed-off-by: Carlos Rodriguez-Fernandez --- README.md | 2 +- collector.go | 37 +++++---- main.go | 216 +++++++++++++++++++++++++++++++++++---------------- 3 files changed, 172 insertions(+), 83 deletions(-) diff --git a/README.md b/README.md index 37e9a06..1ab61ad 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,7 @@ In each host group the `interval`, `network`, and `protocol` are optional. The interval Duration is in [Go time.ParseDuration()](https://golang.org/pkg/time/#ParseDuration) syntax. -NOTE: The config is only read on startup, SIGHUP is not supported (yet). +The config is read on startup, and can be reloaded with the SIGHUP signal. ## Building and running diff --git a/collector.go b/collector.go index 9330f2e..9b7d528 100644 --- a/collector.go +++ b/collector.go @@ -17,7 +17,7 @@ package main import ( "net" - "github.com/prometheus-community/pro-bing" + probing "github.com/prometheus-community/pro-bing" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" @@ -84,8 +84,28 @@ type SmokepingCollector struct { requestsSent *prometheus.Desc } -func NewSmokepingCollector(pingers *[]*probing.Pinger, pingResponseSeconds prometheus.HistogramVec) *SmokepingCollector { - for _, pinger := range *pingers { +func NewSmokepingCollector(pingers []*probing.Pinger, pingResponseSeconds prometheus.HistogramVec) *SmokepingCollector { + + instance := SmokepingCollector{ + requestsSent: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "", "requests_total"), + "Number of ping requests sent", + labelNames, + nil, + ), + } + + instance.updatePingers(pingers, pingResponseSeconds) + + return &instance +} + +func (s *SmokepingCollector) updatePingers(pingers []*probing.Pinger, pingResponseSeconds prometheus.HistogramVec) { + pingResponseDuplicates.Reset() + pingResponseSeconds.Reset() + pingResponseTTL.Reset() + pingSendErrors.Reset() + for _, pinger := range pingers { // Init all metrics to 0s. ipAddr := pinger.IPAddr().String() host := pinger.Addr() @@ -129,16 +149,7 @@ func NewSmokepingCollector(pingers *[]*probing.Pinger, pingResponseSeconds prome "bytes_received", pkt.Nbytes, "icmp_seq", pkt.Seq, "time", pkt.Rtt, "ttl", pkt.TTL, "error", err) } } - - return &SmokepingCollector{ - pingers: pingers, - requestsSent: prometheus.NewDesc( - prometheus.BuildFQName(namespace, "", "requests_total"), - "Number of ping requests sent", - labelNames, - nil, - ), - } + s.pingers = &pingers } func (s *SmokepingCollector) Describe(ch chan<- *prometheus.Desc) { diff --git a/main.go b/main.go index e2cef1b..a358384 100644 --- a/main.go +++ b/main.go @@ -20,11 +20,13 @@ import ( "net/http" _ "net/http/pprof" "os" + "os/signal" "strconv" "strings" + "syscall" "time" - "github.com/prometheus-community/pro-bing" + probing "github.com/prometheus-community/pro-bing" "github.com/superq/smokeping_prober/config" "github.com/alecthomas/kingpin/v2" @@ -76,8 +78,107 @@ func HostList(s kingpin.Settings) (target *[]string) { return } -func init() { - prometheus.MustRegister(versioncollector.NewCollector("smokeping_prober")) +type smokePingers struct { + started []*probing.Pinger + prepared []*probing.Pinger + g *errgroup.Group + maxInterval time.Duration +} + +func (s *smokePingers) sizeOfPrepared() int { + if s.prepared != nil { + return len(s.prepared) + } + return 0 +} + +func (s *smokePingers) start() { + if s.sizeOfPrepared() == 0 { + return + } + if s.g != nil { + s.stop() + } + s.g = new(errgroup.Group) + s.started = s.prepared + splay := time.Duration(s.maxInterval.Nanoseconds() / int64(len(s.started))) + for _, pinger := range s.started { + level.Info(logger).Log("msg", "Starting prober", "address", pinger.Addr(), "interval", pinger.Interval, "size_bytes", pinger.Size, "source", pinger.Source) + s.g.Go(pinger.Run) + time.Sleep(splay) + } + s.prepared = nil +} + +func (s *smokePingers) stop() error { + if s.g == nil { + return nil + } + if s.started == nil { + return nil + } + for _, pinger := range s.started { + pinger.Stop() + } + if err := s.g.Wait(); err != nil { + return fmt.Errorf("pingers failed: %v", err) + } + return nil +} + +func (s *smokePingers) prepare(hosts *[]string, interval *time.Duration, privileged *bool, sizeBytes *int) error { + pingers := make([]*probing.Pinger, len(*hosts)) + var pinger *probing.Pinger + var host string + for i, host := range *hosts { + pinger = probing.New(host) + + err := pinger.Resolve() + if err != nil { + return fmt.Errorf("failed to resolve pinger: %v", err) + } + + level.Info(logger).Log("msg", "Pinger resolved", "host", host, "ip_addr", pinger.IPAddr()) + + pinger.Interval = *interval + pinger.RecordRtts = false + pinger.SetPrivileged(*privileged) + pinger.Size = *sizeBytes + + pingers[i] = pinger + } + + maxInterval := *interval + sc.Lock() + for _, targetGroup := range sc.C.Targets { + packetSize := targetGroup.Size + if packetSize < 24 || packetSize > 65535 { + return fmt.Errorf("packet size must be in the range 24-65535, but found '%d' bytes", packetSize) + } + if targetGroup.Interval > maxInterval { + maxInterval = targetGroup.Interval + } + for _, host = range targetGroup.Hosts { + pinger = probing.New(host) + pinger.Interval = targetGroup.Interval + pinger.RecordRtts = false + pinger.SetNetwork(targetGroup.Network) + pinger.Size = packetSize + pinger.Source = targetGroup.Source + if targetGroup.Protocol == "icmp" { + pinger.SetPrivileged(true) + } + err := pinger.Resolve() + if err != nil { + return fmt.Errorf("failed to resolve pinger: %v", err) + } + pingers = append(pingers, pinger) + } + } + sc.Unlock() + s.prepared = pingers + s.maxInterval = maxInterval + return nil } func parseBuckets(buckets string) ([]float64, error) { @@ -93,6 +194,10 @@ func parseBuckets(buckets string) ([]float64, error) { return bucketlist, nil } +func init() { + prometheus.MustRegister(versioncollector.NewCollector("smokeping_prober")) +} + func main() { var ( configFile = kingpin.Flag("config.file", "Optional smokeping_prober configuration yaml file.").String() @@ -107,6 +212,9 @@ func main() { hosts = HostList(kingpin.Arg("hosts", "List of hosts to ping")) ) + var smokePingers smokePingers + var smokepingCollector *SmokepingCollector + promlogConfig := &promlog.Config{} flag.AddFlags(kingpin.CommandLine, promlogConfig) kingpin.Version(version.Print("smokeping_prober")) @@ -139,74 +247,46 @@ func main() { pingResponseSeconds := newPingResponseHistogram(bucketlist, *factor) prometheus.MustRegister(pingResponseSeconds) - pingers := make([]*probing.Pinger, len(*hosts)) - var pinger *probing.Pinger - var host string - for i, host := range *hosts { - pinger = probing.New(host) - - err := pinger.Resolve() - if err != nil { - level.Error(logger).Log("msg", "Failed to resolve pinger", "err", err) - os.Exit(1) - } - - level.Info(logger).Log("msg", "Pinger resolved", "host", host, "ip_addr", pinger.IPAddr()) - - pinger.Interval = *interval - pinger.RecordRtts = false - pinger.SetPrivileged(*privileged) - pinger.Size = *sizeBytes + err = smokePingers.prepare(hosts, interval, privileged, sizeBytes) + if err != nil { + level.Error(logger).Log("err", "Unable to create ping", err) + os.Exit(1) + } - pingers[i] = pinger + if smokePingers.sizeOfPrepared() == 0 { + level.Error(logger).Log("msg", "no targets specified on command line or in config file") + os.Exit(1) } - maxInterval := *interval - sc.Lock() - for _, targetGroup := range sc.C.Targets { - if targetGroup.Interval > maxInterval { - maxInterval = targetGroup.Interval - } - packetSize := targetGroup.Size - if packetSize < 24 || packetSize > 65535 { - level.Error(logger).Log("msg", "Invalid packet size. (24-65535)", "bytes", packetSize) - os.Exit(1) - } - for _, host = range targetGroup.Hosts { - pinger = probing.New(host) - pinger.Interval = targetGroup.Interval - pinger.RecordRtts = false - pinger.SetNetwork(targetGroup.Network) - pinger.Size = packetSize - pinger.Source = targetGroup.Source - if targetGroup.Protocol == "icmp" { - pinger.SetPrivileged(true) + smokePingers.start() + smokepingCollector = NewSmokepingCollector(smokePingers.started, *pingResponseSeconds) + prometheus.MustRegister(smokepingCollector) + + hup := make(chan os.Signal, 1) + signal.Notify(hup, syscall.SIGHUP) + go func() { + for { + <-hup + if err := sc.ReloadConfig(*configFile); err != nil { + level.Error(logger).Log("msg", "Error reloading config", "err", err) + continue } - err := pinger.Resolve() + err = smokePingers.prepare(hosts, interval, privileged, sizeBytes) if err != nil { - level.Error(logger).Log("msg", "failed to resolve pinger", "error", err.Error()) - os.Exit(1) + level.Error(logger).Log("msg", "Unable to create ping from config", "err", err) + continue + } + if smokePingers.sizeOfPrepared() == 0 { + level.Error(logger).Log("msg", "No targets specified on command line or in config file") + continue } - pingers = append(pingers, pinger) - } - } - sc.Unlock() - if len(pingers) == 0 { - level.Error(logger).Log("msg", "no targets specified on command line or in config file") - os.Exit(1) - } + smokePingers.start() + smokepingCollector.updatePingers(smokePingers.started, *pingResponseSeconds) - splay := time.Duration(interval.Nanoseconds() / int64(len(pingers))) - level.Info(logger).Log("msg", fmt.Sprintf("Waiting %s between starting pingers", splay)) - g := new(errgroup.Group) - for _, pinger := range pingers { - level.Info(logger).Log("msg", "Starting prober", "address", pinger.Addr(), "interval", pinger.Interval, "size_bytes", pinger.Size, "source", pinger.Source) - g.Go(pinger.Run) - time.Sleep(splay) - } - - prometheus.MustRegister(NewSmokepingCollector(&pingers, *pingResponseSeconds)) + level.Info(logger).Log("msg", "Reloaded config file") + } + }() http.Handle(*metricsPath, promhttp.Handler()) if *metricsPath != "/" && *metricsPath != "" { @@ -235,11 +315,9 @@ func main() { os.Exit(1) } - for _, pinger := range pingers { - pinger.Stop() - } - if err = g.Wait(); err != nil { - level.Error(logger).Log("msg", "pingers failed", "error", err) - os.Exit(1) + err = smokePingers.stop() + if err != nil { + level.Error(logger).Log("err", err) } + } From c2529c1ea81958d8b87d5d7c8ebe9a66eec7b991 Mon Sep 17 00:00:00 2001 From: Carlos Rodriguez-Fernandez Date: Thu, 28 Mar 2024 21:56:22 -0700 Subject: [PATCH 2/4] set up `healthy` and `reload` endpoints * `/-/healthy` indicates that the http server successfully started, and consequentially the config was successfully parsed. * `/-/reload` allows to attempt to reload the config, which is expected to be called by a sidecar that watches for config file changes Signed-off-by: Carlos Rodriguez-Fernandez --- README.md | 7 ++++++- main.go | 38 +++++++++++++++++++++++++++++++++++++- 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 1ab61ad..f323b6a 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,7 @@ In each host group the `interval`, `network`, and `protocol` are optional. The interval Duration is in [Go time.ParseDuration()](https://golang.org/pkg/time/#ParseDuration) syntax. -The config is read on startup, and can be reloaded with the SIGHUP signal. +The config is read on startup, and can be reloaded with the SIGHUP signal, or with an HTTP POST to the URI path `/-/reload`. ## Building and running @@ -100,3 +100,8 @@ The Smokeping Prober supports TLS and basic authentication. To use TLS and/or basic authentication, you need to pass a configuration file using the `--web.config.file` parameter. The format of the file is described [in the exporter-toolkit repository](https://github.com/prometheus/exporter-toolkit/blob/master/docs/web-configuration.md). + + +### Health check + +A health check can be requested in the URI path `/-/healthy`. diff --git a/main.go b/main.go index a358384..b8d23d6 100644 --- a/main.go +++ b/main.go @@ -264,20 +264,37 @@ func main() { hup := make(chan os.Signal, 1) signal.Notify(hup, syscall.SIGHUP) + reloadCh := make(chan chan error) go func() { for { - <-hup + var errCallback func(e error) + var successCallback func() + select { + case <-hup: + errCallback = func(e error) {} + successCallback = func() {} + case rc := <-reloadCh: + errCallback = func(e error) { + rc <- e + } + successCallback = func() { + rc <- nil + } + } if err := sc.ReloadConfig(*configFile); err != nil { level.Error(logger).Log("msg", "Error reloading config", "err", err) + errCallback(err) continue } err = smokePingers.prepare(hosts, interval, privileged, sizeBytes) if err != nil { level.Error(logger).Log("msg", "Unable to create ping from config", "err", err) + errCallback(err) continue } if smokePingers.sizeOfPrepared() == 0 { level.Error(logger).Log("msg", "No targets specified on command line or in config file") + errCallback(fmt.Errorf("no targets specified")) continue } @@ -285,10 +302,29 @@ func main() { smokepingCollector.updatePingers(smokePingers.started, *pingResponseSeconds) level.Info(logger).Log("msg", "Reloaded config file") + successCallback() } }() http.Handle(*metricsPath, promhttp.Handler()) + http.HandleFunc("/-/healthy", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("Healthy")) + }) + http.HandleFunc("/-/reload", + func(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + w.WriteHeader(http.StatusMethodNotAllowed) + fmt.Fprintf(w, "This endpoint requires a POST request.\n") + return + } + + rc := make(chan error) + reloadCh <- rc + if err := <-rc; err != nil { + http.Error(w, fmt.Sprintf("Failed to reload config: %s", err), http.StatusInternalServerError) + } + }) if *metricsPath != "/" && *metricsPath != "" { landingConfig := web.LandingConfig{ Name: "Smokeping Prober", From 14644ded2958617c964ce3c3a1c28fe18a6d2f13 Mon Sep 17 00:00:00 2001 From: Carlos Rodriguez-Fernandez Date: Mon, 1 Apr 2024 06:07:56 -0700 Subject: [PATCH 3/4] warn on reload when previous pinger had failed Signed-off-by: Carlos Rodriguez-Fernandez --- main.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index b8d23d6..9f4589f 100644 --- a/main.go +++ b/main.go @@ -97,7 +97,10 @@ func (s *smokePingers) start() { return } if s.g != nil { - s.stop() + err := s.stop() + if err != nil { + level.Warn(logger).Log("msg", "At least one previous pinger failed to run", "err", err) + } } s.g = new(errgroup.Group) s.started = s.prepared From 9c6a78c0fd9a0d8f648b7b72f06d4dfb4888ef5c Mon Sep 17 00:00:00 2001 From: Carlos Rodriguez-Fernandez Date: Mon, 1 Apr 2024 06:54:35 -0700 Subject: [PATCH 4/4] remove unnecessary alias Signed-off-by: Carlos Rodriguez-Fernandez --- collector.go | 2 +- main.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/collector.go b/collector.go index 9b7d528..fc9ca3b 100644 --- a/collector.go +++ b/collector.go @@ -17,7 +17,7 @@ package main import ( "net" - probing "github.com/prometheus-community/pro-bing" + "github.com/prometheus-community/pro-bing" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" diff --git a/main.go b/main.go index 9f4589f..9fba938 100644 --- a/main.go +++ b/main.go @@ -26,7 +26,7 @@ import ( "syscall" "time" - probing "github.com/prometheus-community/pro-bing" + "github.com/prometheus-community/pro-bing" "github.com/superq/smokeping_prober/config" "github.com/alecthomas/kingpin/v2"