diff --git a/README.md b/README.md index 37e9a06..f323b6a 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,7 @@ In each host group the `interval`, `network`, and `protocol` are optional. The interval Duration is in [Go time.ParseDuration()](https://golang.org/pkg/time/#ParseDuration) syntax. -NOTE: The config is only read on startup, SIGHUP is not supported (yet). +The config is read on startup, and can be reloaded with the SIGHUP signal, or with an HTTP POST to the URI path `/-/reload`. ## Building and running @@ -100,3 +100,8 @@ The Smokeping Prober supports TLS and basic authentication. To use TLS and/or basic authentication, you need to pass a configuration file using the `--web.config.file` parameter. The format of the file is described [in the exporter-toolkit repository](https://github.com/prometheus/exporter-toolkit/blob/master/docs/web-configuration.md). + + +### Health check + +A health check can be requested in the URI path `/-/healthy`. diff --git a/collector.go b/collector.go index 9330f2e..fc9ca3b 100644 --- a/collector.go +++ b/collector.go @@ -84,8 +84,28 @@ type SmokepingCollector struct { requestsSent *prometheus.Desc } -func NewSmokepingCollector(pingers *[]*probing.Pinger, pingResponseSeconds prometheus.HistogramVec) *SmokepingCollector { - for _, pinger := range *pingers { +func NewSmokepingCollector(pingers []*probing.Pinger, pingResponseSeconds prometheus.HistogramVec) *SmokepingCollector { + + instance := SmokepingCollector{ + requestsSent: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "", "requests_total"), + "Number of ping requests sent", + labelNames, + nil, + ), + } + + instance.updatePingers(pingers, pingResponseSeconds) + + return &instance +} + +func (s *SmokepingCollector) updatePingers(pingers []*probing.Pinger, pingResponseSeconds prometheus.HistogramVec) { + pingResponseDuplicates.Reset() + pingResponseSeconds.Reset() + pingResponseTTL.Reset() + pingSendErrors.Reset() + for _, pinger := range pingers { // Init all metrics to 0s. ipAddr := pinger.IPAddr().String() host := pinger.Addr() @@ -129,16 +149,7 @@ func NewSmokepingCollector(pingers *[]*probing.Pinger, pingResponseSeconds prome "bytes_received", pkt.Nbytes, "icmp_seq", pkt.Seq, "time", pkt.Rtt, "ttl", pkt.TTL, "error", err) } } - - return &SmokepingCollector{ - pingers: pingers, - requestsSent: prometheus.NewDesc( - prometheus.BuildFQName(namespace, "", "requests_total"), - "Number of ping requests sent", - labelNames, - nil, - ), - } + s.pingers = &pingers } func (s *SmokepingCollector) Describe(ch chan<- *prometheus.Desc) { diff --git a/main.go b/main.go index e2cef1b..9fba938 100644 --- a/main.go +++ b/main.go @@ -20,8 +20,10 @@ import ( "net/http" _ "net/http/pprof" "os" + "os/signal" "strconv" "strings" + "syscall" "time" "github.com/prometheus-community/pro-bing" @@ -76,8 +78,110 @@ func HostList(s kingpin.Settings) (target *[]string) { return } -func init() { - prometheus.MustRegister(versioncollector.NewCollector("smokeping_prober")) +type smokePingers struct { + started []*probing.Pinger + prepared []*probing.Pinger + g *errgroup.Group + maxInterval time.Duration +} + +func (s *smokePingers) sizeOfPrepared() int { + if s.prepared != nil { + return len(s.prepared) + } + return 0 +} + +func (s *smokePingers) start() { + if s.sizeOfPrepared() == 0 { + return + } + if s.g != nil { + err := s.stop() + if err != nil { + level.Warn(logger).Log("msg", "At least one previous pinger failed to run", "err", err) + } + } + s.g = new(errgroup.Group) + s.started = s.prepared + splay := time.Duration(s.maxInterval.Nanoseconds() / int64(len(s.started))) + for _, pinger := range s.started { + level.Info(logger).Log("msg", "Starting prober", "address", pinger.Addr(), "interval", pinger.Interval, "size_bytes", pinger.Size, "source", pinger.Source) + s.g.Go(pinger.Run) + time.Sleep(splay) + } + s.prepared = nil +} + +func (s *smokePingers) stop() error { + if s.g == nil { + return nil + } + if s.started == nil { + return nil + } + for _, pinger := range s.started { + pinger.Stop() + } + if err := s.g.Wait(); err != nil { + return fmt.Errorf("pingers failed: %v", err) + } + return nil +} + +func (s *smokePingers) prepare(hosts *[]string, interval *time.Duration, privileged *bool, sizeBytes *int) error { + pingers := make([]*probing.Pinger, len(*hosts)) + var pinger *probing.Pinger + var host string + for i, host := range *hosts { + pinger = probing.New(host) + + err := pinger.Resolve() + if err != nil { + return fmt.Errorf("failed to resolve pinger: %v", err) + } + + level.Info(logger).Log("msg", "Pinger resolved", "host", host, "ip_addr", pinger.IPAddr()) + + pinger.Interval = *interval + pinger.RecordRtts = false + pinger.SetPrivileged(*privileged) + pinger.Size = *sizeBytes + + pingers[i] = pinger + } + + maxInterval := *interval + sc.Lock() + for _, targetGroup := range sc.C.Targets { + packetSize := targetGroup.Size + if packetSize < 24 || packetSize > 65535 { + return fmt.Errorf("packet size must be in the range 24-65535, but found '%d' bytes", packetSize) + } + if targetGroup.Interval > maxInterval { + maxInterval = targetGroup.Interval + } + for _, host = range targetGroup.Hosts { + pinger = probing.New(host) + pinger.Interval = targetGroup.Interval + pinger.RecordRtts = false + pinger.SetNetwork(targetGroup.Network) + pinger.Size = packetSize + pinger.Source = targetGroup.Source + if targetGroup.Protocol == "icmp" { + pinger.SetPrivileged(true) + } + err := pinger.Resolve() + if err != nil { + return fmt.Errorf("failed to resolve pinger: %v", err) + } + pingers = append(pingers, pinger) + } + } + sc.Unlock() + s.prepared = pingers + s.maxInterval = maxInterval + return nil } func parseBuckets(buckets string) ([]float64, error) { @@ -93,6 +197,10 @@ func parseBuckets(buckets string) ([]float64, error) { return bucketlist, nil } +func init() { + prometheus.MustRegister(versioncollector.NewCollector("smokeping_prober")) +} + func main() { var ( configFile = kingpin.Flag("config.file", "Optional smokeping_prober configuration yaml file.").String() @@ -107,6 +215,9 @@ func main() { hosts = HostList(kingpin.Arg("hosts", "List of hosts to ping")) ) + var smokePingers smokePingers + var smokepingCollector *SmokepingCollector + promlogConfig := &promlog.Config{} flag.AddFlags(kingpin.CommandLine, promlogConfig) kingpin.Version(version.Print("smokeping_prober")) @@ -139,76 +250,84 @@ func main() { pingResponseSeconds := newPingResponseHistogram(bucketlist, *factor) prometheus.MustRegister(pingResponseSeconds) - pingers := make([]*probing.Pinger, len(*hosts)) - var pinger *probing.Pinger - var host string - for i, host := range *hosts { - pinger = probing.New(host) - - err := pinger.Resolve() - if err != nil { - level.Error(logger).Log("msg", "Failed to resolve pinger", "err", err) - os.Exit(1) - } - - level.Info(logger).Log("msg", "Pinger resolved", "host", host, "ip_addr", pinger.IPAddr()) - - pinger.Interval = *interval - pinger.RecordRtts = false - pinger.SetPrivileged(*privileged) - pinger.Size = *sizeBytes + err = smokePingers.prepare(hosts, interval, privileged, sizeBytes) + if err != nil { + level.Error(logger).Log("err", "Unable to create ping", err) + os.Exit(1) + } - pingers[i] = pinger + if smokePingers.sizeOfPrepared() == 0 { + level.Error(logger).Log("msg", "no targets specified on command line or in config file") + os.Exit(1) } - maxInterval := *interval - sc.Lock() - for _, targetGroup := range sc.C.Targets { - if targetGroup.Interval > maxInterval { - maxInterval = targetGroup.Interval - } - packetSize := targetGroup.Size - if packetSize < 24 || packetSize > 65535 { - level.Error(logger).Log("msg", "Invalid packet size. (24-65535)", "bytes", packetSize) - os.Exit(1) - } - for _, host = range targetGroup.Hosts { - pinger = probing.New(host) - pinger.Interval = targetGroup.Interval - pinger.RecordRtts = false - pinger.SetNetwork(targetGroup.Network) - pinger.Size = packetSize - pinger.Source = targetGroup.Source - if targetGroup.Protocol == "icmp" { - pinger.SetPrivileged(true) + smokePingers.start() + smokepingCollector = NewSmokepingCollector(smokePingers.started, *pingResponseSeconds) + prometheus.MustRegister(smokepingCollector) + + hup := make(chan os.Signal, 1) + signal.Notify(hup, syscall.SIGHUP) + reloadCh := make(chan chan error) + go func() { + for { + var errCallback func(e error) + var successCallback func() + select { + case <-hup: + errCallback = func(e error) {} + successCallback = func() {} + case rc := <-reloadCh: + errCallback = func(e error) { + rc <- e + } + successCallback = func() { + rc <- nil + } } - err := pinger.Resolve() + if err := sc.ReloadConfig(*configFile); err != nil { + level.Error(logger).Log("msg", "Error reloading config", "err", err) + errCallback(err) + continue + } + err = smokePingers.prepare(hosts, interval, privileged, sizeBytes) if err != nil { - level.Error(logger).Log("msg", "failed to resolve pinger", "error", err.Error()) - os.Exit(1) + level.Error(logger).Log("msg", "Unable to create ping from config", "err", err) + errCallback(err) + continue + } + if smokePingers.sizeOfPrepared() == 0 { + level.Error(logger).Log("msg", "No targets specified on command line or in config file") + errCallback(fmt.Errorf("no targets specified")) + continue } - pingers = append(pingers, pinger) - } - } - sc.Unlock() - - if len(pingers) == 0 { - level.Error(logger).Log("msg", "no targets specified on command line or in config file") - os.Exit(1) - } - splay := time.Duration(interval.Nanoseconds() / int64(len(pingers))) - level.Info(logger).Log("msg", fmt.Sprintf("Waiting %s between starting pingers", splay)) - g := new(errgroup.Group) - for _, pinger := range pingers { - level.Info(logger).Log("msg", "Starting prober", "address", pinger.Addr(), "interval", pinger.Interval, "size_bytes", pinger.Size, "source", pinger.Source) - g.Go(pinger.Run) - time.Sleep(splay) - } + smokePingers.start() + smokepingCollector.updatePingers(smokePingers.started, *pingResponseSeconds) - prometheus.MustRegister(NewSmokepingCollector(&pingers, *pingResponseSeconds)) + level.Info(logger).Log("msg", "Reloaded config file") + successCallback() + } + }() http.Handle(*metricsPath, promhttp.Handler()) + http.HandleFunc("/-/healthy", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("Healthy")) + }) + http.HandleFunc("/-/reload", + func(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + w.WriteHeader(http.StatusMethodNotAllowed) + fmt.Fprintf(w, "This endpoint requires a POST request.\n") + return + } + + rc := make(chan error) + reloadCh <- rc + if err := <-rc; err != nil { + http.Error(w, fmt.Sprintf("Failed to reload config: %s", err), http.StatusInternalServerError) + } + }) if *metricsPath != "/" && *metricsPath != "" { landingConfig := web.LandingConfig{ Name: "Smokeping Prober", @@ -235,11 +354,9 @@ func main() { os.Exit(1) } - for _, pinger := range pingers { - pinger.Stop() - } - if err = g.Wait(); err != nil { - level.Error(logger).Log("msg", "pingers failed", "error", err) - os.Exit(1) + err = smokePingers.stop() + if err != nil { + level.Error(logger).Log("err", err) } + }