Skip to content

Commit

Permalink
Merge pull request #121 from carlosrodfern/master
Browse files Browse the repository at this point in the history
reload config on `SIGHUP` & `/-/reload`, implement `/-/healthy`
  • Loading branch information
SuperQ authored Apr 2, 2024
2 parents be30c29 + 9c6a78c commit 584df24
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 81 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ In each host group the `interval`, `network`, and `protocol` are optional.

The interval Duration is in [Go time.ParseDuration()](https://golang.org/pkg/time/#ParseDuration) syntax.

NOTE: The config is only read on startup, SIGHUP is not supported (yet).
The config is read on startup, and can be reloaded with the SIGHUP signal, or with an HTTP POST to the URI path `/-/reload`.

## Building and running

Expand Down Expand Up @@ -100,3 +100,8 @@ The Smokeping Prober supports TLS and basic authentication.
To use TLS and/or basic authentication, you need to pass a configuration file
using the `--web.config.file` parameter. The format of the file is described
[in the exporter-toolkit repository](https://github.com/prometheus/exporter-toolkit/blob/master/docs/web-configuration.md).


### Health check

A health check can be requested in the URI path `/-/healthy`.
35 changes: 23 additions & 12 deletions collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,28 @@ type SmokepingCollector struct {
requestsSent *prometheus.Desc
}

func NewSmokepingCollector(pingers *[]*probing.Pinger, pingResponseSeconds prometheus.HistogramVec) *SmokepingCollector {
for _, pinger := range *pingers {
func NewSmokepingCollector(pingers []*probing.Pinger, pingResponseSeconds prometheus.HistogramVec) *SmokepingCollector {

instance := SmokepingCollector{
requestsSent: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "requests_total"),
"Number of ping requests sent",
labelNames,
nil,
),
}

instance.updatePingers(pingers, pingResponseSeconds)

return &instance
}

func (s *SmokepingCollector) updatePingers(pingers []*probing.Pinger, pingResponseSeconds prometheus.HistogramVec) {
pingResponseDuplicates.Reset()
pingResponseSeconds.Reset()
pingResponseTTL.Reset()
pingSendErrors.Reset()
for _, pinger := range pingers {
// Init all metrics to 0s.
ipAddr := pinger.IPAddr().String()
host := pinger.Addr()
Expand Down Expand Up @@ -129,16 +149,7 @@ func NewSmokepingCollector(pingers *[]*probing.Pinger, pingResponseSeconds prome
"bytes_received", pkt.Nbytes, "icmp_seq", pkt.Seq, "time", pkt.Rtt, "ttl", pkt.TTL, "error", err)
}
}

return &SmokepingCollector{
pingers: pingers,
requestsSent: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "requests_total"),
"Number of ping requests sent",
labelNames,
nil,
),
}
s.pingers = &pingers
}

func (s *SmokepingCollector) Describe(ch chan<- *prometheus.Desc) {
Expand Down
253 changes: 185 additions & 68 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"

"github.com/prometheus-community/pro-bing"
Expand Down Expand Up @@ -76,8 +78,110 @@ func HostList(s kingpin.Settings) (target *[]string) {
return
}

func init() {
prometheus.MustRegister(versioncollector.NewCollector("smokeping_prober"))
type smokePingers struct {
started []*probing.Pinger
prepared []*probing.Pinger
g *errgroup.Group
maxInterval time.Duration
}

func (s *smokePingers) sizeOfPrepared() int {
if s.prepared != nil {
return len(s.prepared)
}
return 0
}

func (s *smokePingers) start() {
if s.sizeOfPrepared() == 0 {
return
}
if s.g != nil {
err := s.stop()
if err != nil {
level.Warn(logger).Log("msg", "At least one previous pinger failed to run", "err", err)
}
}
s.g = new(errgroup.Group)
s.started = s.prepared
splay := time.Duration(s.maxInterval.Nanoseconds() / int64(len(s.started)))
for _, pinger := range s.started {
level.Info(logger).Log("msg", "Starting prober", "address", pinger.Addr(), "interval", pinger.Interval, "size_bytes", pinger.Size, "source", pinger.Source)
s.g.Go(pinger.Run)
time.Sleep(splay)
}
s.prepared = nil
}

func (s *smokePingers) stop() error {
if s.g == nil {
return nil
}
if s.started == nil {
return nil
}
for _, pinger := range s.started {
pinger.Stop()
}
if err := s.g.Wait(); err != nil {
return fmt.Errorf("pingers failed: %v", err)
}
return nil
}

func (s *smokePingers) prepare(hosts *[]string, interval *time.Duration, privileged *bool, sizeBytes *int) error {
pingers := make([]*probing.Pinger, len(*hosts))
var pinger *probing.Pinger
var host string
for i, host := range *hosts {
pinger = probing.New(host)

err := pinger.Resolve()
if err != nil {
return fmt.Errorf("failed to resolve pinger: %v", err)
}

level.Info(logger).Log("msg", "Pinger resolved", "host", host, "ip_addr", pinger.IPAddr())

pinger.Interval = *interval
pinger.RecordRtts = false
pinger.SetPrivileged(*privileged)
pinger.Size = *sizeBytes

pingers[i] = pinger
}

maxInterval := *interval
sc.Lock()
for _, targetGroup := range sc.C.Targets {
packetSize := targetGroup.Size
if packetSize < 24 || packetSize > 65535 {
return fmt.Errorf("packet size must be in the range 24-65535, but found '%d' bytes", packetSize)
}
if targetGroup.Interval > maxInterval {
maxInterval = targetGroup.Interval
}
for _, host = range targetGroup.Hosts {
pinger = probing.New(host)
pinger.Interval = targetGroup.Interval
pinger.RecordRtts = false
pinger.SetNetwork(targetGroup.Network)
pinger.Size = packetSize
pinger.Source = targetGroup.Source
if targetGroup.Protocol == "icmp" {
pinger.SetPrivileged(true)
}
err := pinger.Resolve()
if err != nil {
return fmt.Errorf("failed to resolve pinger: %v", err)
}
pingers = append(pingers, pinger)
}
}
sc.Unlock()
s.prepared = pingers
s.maxInterval = maxInterval
return nil
}

func parseBuckets(buckets string) ([]float64, error) {
Expand All @@ -93,6 +197,10 @@ func parseBuckets(buckets string) ([]float64, error) {
return bucketlist, nil
}

func init() {
prometheus.MustRegister(versioncollector.NewCollector("smokeping_prober"))
}

func main() {
var (
configFile = kingpin.Flag("config.file", "Optional smokeping_prober configuration yaml file.").String()
Expand All @@ -107,6 +215,9 @@ func main() {
hosts = HostList(kingpin.Arg("hosts", "List of hosts to ping"))
)

var smokePingers smokePingers
var smokepingCollector *SmokepingCollector

promlogConfig := &promlog.Config{}
flag.AddFlags(kingpin.CommandLine, promlogConfig)
kingpin.Version(version.Print("smokeping_prober"))
Expand Down Expand Up @@ -139,76 +250,84 @@ func main() {
pingResponseSeconds := newPingResponseHistogram(bucketlist, *factor)
prometheus.MustRegister(pingResponseSeconds)

pingers := make([]*probing.Pinger, len(*hosts))
var pinger *probing.Pinger
var host string
for i, host := range *hosts {
pinger = probing.New(host)

err := pinger.Resolve()
if err != nil {
level.Error(logger).Log("msg", "Failed to resolve pinger", "err", err)
os.Exit(1)
}

level.Info(logger).Log("msg", "Pinger resolved", "host", host, "ip_addr", pinger.IPAddr())

pinger.Interval = *interval
pinger.RecordRtts = false
pinger.SetPrivileged(*privileged)
pinger.Size = *sizeBytes
err = smokePingers.prepare(hosts, interval, privileged, sizeBytes)
if err != nil {
level.Error(logger).Log("err", "Unable to create ping", err)
os.Exit(1)
}

pingers[i] = pinger
if smokePingers.sizeOfPrepared() == 0 {
level.Error(logger).Log("msg", "no targets specified on command line or in config file")
os.Exit(1)
}

maxInterval := *interval
sc.Lock()
for _, targetGroup := range sc.C.Targets {
if targetGroup.Interval > maxInterval {
maxInterval = targetGroup.Interval
}
packetSize := targetGroup.Size
if packetSize < 24 || packetSize > 65535 {
level.Error(logger).Log("msg", "Invalid packet size. (24-65535)", "bytes", packetSize)
os.Exit(1)
}
for _, host = range targetGroup.Hosts {
pinger = probing.New(host)
pinger.Interval = targetGroup.Interval
pinger.RecordRtts = false
pinger.SetNetwork(targetGroup.Network)
pinger.Size = packetSize
pinger.Source = targetGroup.Source
if targetGroup.Protocol == "icmp" {
pinger.SetPrivileged(true)
smokePingers.start()
smokepingCollector = NewSmokepingCollector(smokePingers.started, *pingResponseSeconds)
prometheus.MustRegister(smokepingCollector)

hup := make(chan os.Signal, 1)
signal.Notify(hup, syscall.SIGHUP)
reloadCh := make(chan chan error)
go func() {
for {
var errCallback func(e error)
var successCallback func()
select {
case <-hup:
errCallback = func(e error) {}
successCallback = func() {}
case rc := <-reloadCh:
errCallback = func(e error) {
rc <- e
}
successCallback = func() {
rc <- nil
}
}
err := pinger.Resolve()
if err := sc.ReloadConfig(*configFile); err != nil {
level.Error(logger).Log("msg", "Error reloading config", "err", err)
errCallback(err)
continue
}
err = smokePingers.prepare(hosts, interval, privileged, sizeBytes)
if err != nil {
level.Error(logger).Log("msg", "failed to resolve pinger", "error", err.Error())
os.Exit(1)
level.Error(logger).Log("msg", "Unable to create ping from config", "err", err)
errCallback(err)
continue
}
if smokePingers.sizeOfPrepared() == 0 {
level.Error(logger).Log("msg", "No targets specified on command line or in config file")
errCallback(fmt.Errorf("no targets specified"))
continue
}
pingers = append(pingers, pinger)
}
}
sc.Unlock()

if len(pingers) == 0 {
level.Error(logger).Log("msg", "no targets specified on command line or in config file")
os.Exit(1)
}

splay := time.Duration(interval.Nanoseconds() / int64(len(pingers)))
level.Info(logger).Log("msg", fmt.Sprintf("Waiting %s between starting pingers", splay))
g := new(errgroup.Group)
for _, pinger := range pingers {
level.Info(logger).Log("msg", "Starting prober", "address", pinger.Addr(), "interval", pinger.Interval, "size_bytes", pinger.Size, "source", pinger.Source)
g.Go(pinger.Run)
time.Sleep(splay)
}
smokePingers.start()
smokepingCollector.updatePingers(smokePingers.started, *pingResponseSeconds)

prometheus.MustRegister(NewSmokepingCollector(&pingers, *pingResponseSeconds))
level.Info(logger).Log("msg", "Reloaded config file")
successCallback()
}
}()

http.Handle(*metricsPath, promhttp.Handler())
http.HandleFunc("/-/healthy", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("Healthy"))
})
http.HandleFunc("/-/reload",
func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
fmt.Fprintf(w, "This endpoint requires a POST request.\n")
return
}

rc := make(chan error)
reloadCh <- rc
if err := <-rc; err != nil {
http.Error(w, fmt.Sprintf("Failed to reload config: %s", err), http.StatusInternalServerError)
}
})
if *metricsPath != "/" && *metricsPath != "" {
landingConfig := web.LandingConfig{
Name: "Smokeping Prober",
Expand All @@ -235,11 +354,9 @@ func main() {
os.Exit(1)
}

for _, pinger := range pingers {
pinger.Stop()
}
if err = g.Wait(); err != nil {
level.Error(logger).Log("msg", "pingers failed", "error", err)
os.Exit(1)
err = smokePingers.stop()
if err != nil {
level.Error(logger).Log("err", err)
}

}

0 comments on commit 584df24

Please sign in to comment.