Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reload config on SIGHUP & /-/reload, implement /-/healthy #121

Merged
merged 4 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ In each host group the `interval`, `network`, and `protocol` are optional.

The interval Duration is in [Go time.ParseDuration()](https://golang.org/pkg/time/#ParseDuration) syntax.

NOTE: The config is only read on startup, SIGHUP is not supported (yet).
The config is read on startup, and can be reloaded with the SIGHUP signal, or with an HTTP POST to the URI path `/-/reload`.

## Building and running

Expand Down Expand Up @@ -100,3 +100,8 @@ The Smokeping Prober supports TLS and basic authentication.
To use TLS and/or basic authentication, you need to pass a configuration file
using the `--web.config.file` parameter. The format of the file is described
[in the exporter-toolkit repository](https://github.com/prometheus/exporter-toolkit/blob/master/docs/web-configuration.md).


### Health check

A health check can be requested in the URI path `/-/healthy`.
35 changes: 23 additions & 12 deletions collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,28 @@ type SmokepingCollector struct {
requestsSent *prometheus.Desc
}

func NewSmokepingCollector(pingers *[]*probing.Pinger, pingResponseSeconds prometheus.HistogramVec) *SmokepingCollector {
for _, pinger := range *pingers {
func NewSmokepingCollector(pingers []*probing.Pinger, pingResponseSeconds prometheus.HistogramVec) *SmokepingCollector {

instance := SmokepingCollector{
requestsSent: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "requests_total"),
"Number of ping requests sent",
labelNames,
nil,
),
}

instance.updatePingers(pingers, pingResponseSeconds)

return &instance
}

func (s *SmokepingCollector) updatePingers(pingers []*probing.Pinger, pingResponseSeconds prometheus.HistogramVec) {
pingResponseDuplicates.Reset()
pingResponseSeconds.Reset()
pingResponseTTL.Reset()
pingSendErrors.Reset()
for _, pinger := range pingers {
// Init all metrics to 0s.
ipAddr := pinger.IPAddr().String()
host := pinger.Addr()
Expand Down Expand Up @@ -129,16 +149,7 @@ func NewSmokepingCollector(pingers *[]*probing.Pinger, pingResponseSeconds prome
"bytes_received", pkt.Nbytes, "icmp_seq", pkt.Seq, "time", pkt.Rtt, "ttl", pkt.TTL, "error", err)
}
}

return &SmokepingCollector{
pingers: pingers,
requestsSent: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "requests_total"),
"Number of ping requests sent",
labelNames,
nil,
),
}
s.pingers = &pingers
}

func (s *SmokepingCollector) Describe(ch chan<- *prometheus.Desc) {
Expand Down
253 changes: 185 additions & 68 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"

"github.com/prometheus-community/pro-bing"
Expand Down Expand Up @@ -76,8 +78,110 @@ func HostList(s kingpin.Settings) (target *[]string) {
return
}

func init() {
prometheus.MustRegister(versioncollector.NewCollector("smokeping_prober"))
type smokePingers struct {
started []*probing.Pinger
prepared []*probing.Pinger
g *errgroup.Group
maxInterval time.Duration
}

func (s *smokePingers) sizeOfPrepared() int {
if s.prepared != nil {
return len(s.prepared)
}
return 0
}

func (s *smokePingers) start() {
if s.sizeOfPrepared() == 0 {
return
}
if s.g != nil {
err := s.stop()
if err != nil {
level.Warn(logger).Log("msg", "At least one previous pinger failed to run", "err", err)
}
}
s.g = new(errgroup.Group)
s.started = s.prepared
splay := time.Duration(s.maxInterval.Nanoseconds() / int64(len(s.started)))
for _, pinger := range s.started {
level.Info(logger).Log("msg", "Starting prober", "address", pinger.Addr(), "interval", pinger.Interval, "size_bytes", pinger.Size, "source", pinger.Source)
s.g.Go(pinger.Run)
time.Sleep(splay)
}
s.prepared = nil
}

func (s *smokePingers) stop() error {
if s.g == nil {
return nil
}
if s.started == nil {
return nil
}
for _, pinger := range s.started {
pinger.Stop()
}
if err := s.g.Wait(); err != nil {
return fmt.Errorf("pingers failed: %v", err)
}
return nil
}

func (s *smokePingers) prepare(hosts *[]string, interval *time.Duration, privileged *bool, sizeBytes *int) error {
pingers := make([]*probing.Pinger, len(*hosts))
var pinger *probing.Pinger
var host string
for i, host := range *hosts {
pinger = probing.New(host)

err := pinger.Resolve()
if err != nil {
return fmt.Errorf("failed to resolve pinger: %v", err)
}

level.Info(logger).Log("msg", "Pinger resolved", "host", host, "ip_addr", pinger.IPAddr())

pinger.Interval = *interval
pinger.RecordRtts = false
pinger.SetPrivileged(*privileged)
pinger.Size = *sizeBytes

pingers[i] = pinger
}

maxInterval := *interval
sc.Lock()
for _, targetGroup := range sc.C.Targets {
packetSize := targetGroup.Size
if packetSize < 24 || packetSize > 65535 {
return fmt.Errorf("packet size must be in the range 24-65535, but found '%d' bytes", packetSize)
}
if targetGroup.Interval > maxInterval {
maxInterval = targetGroup.Interval
}
for _, host = range targetGroup.Hosts {
pinger = probing.New(host)
pinger.Interval = targetGroup.Interval
pinger.RecordRtts = false
pinger.SetNetwork(targetGroup.Network)
pinger.Size = packetSize
pinger.Source = targetGroup.Source
if targetGroup.Protocol == "icmp" {
pinger.SetPrivileged(true)
}
err := pinger.Resolve()
if err != nil {
return fmt.Errorf("failed to resolve pinger: %v", err)
}
pingers = append(pingers, pinger)
}
}
sc.Unlock()
Copy link
Contributor

@dswarbrick dswarbrick Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The prepare function can return (with error) before it unlocks the mutex, leaving it perpetually locked. Since smokerpingers is reused within the main function, this could result in a deadlock when the prepare function is called a subsequent time. I suggest a defer sc.Unlock() immediately after the sc.Lock() line earlier.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, anyone want to open a PR for this fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, indeed. I'll fix it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened PR #148 with the fix.

Thank you!

s.prepared = pingers
s.maxInterval = maxInterval
return nil
}

func parseBuckets(buckets string) ([]float64, error) {
Expand All @@ -93,6 +197,10 @@ func parseBuckets(buckets string) ([]float64, error) {
return bucketlist, nil
}

func init() {
prometheus.MustRegister(versioncollector.NewCollector("smokeping_prober"))
}

func main() {
var (
configFile = kingpin.Flag("config.file", "Optional smokeping_prober configuration yaml file.").String()
Expand All @@ -107,6 +215,9 @@ func main() {
hosts = HostList(kingpin.Arg("hosts", "List of hosts to ping"))
)

var smokePingers smokePingers
var smokepingCollector *SmokepingCollector

promlogConfig := &promlog.Config{}
flag.AddFlags(kingpin.CommandLine, promlogConfig)
kingpin.Version(version.Print("smokeping_prober"))
Expand Down Expand Up @@ -139,76 +250,84 @@ func main() {
pingResponseSeconds := newPingResponseHistogram(bucketlist, *factor)
prometheus.MustRegister(pingResponseSeconds)

pingers := make([]*probing.Pinger, len(*hosts))
var pinger *probing.Pinger
var host string
for i, host := range *hosts {
pinger = probing.New(host)

err := pinger.Resolve()
if err != nil {
level.Error(logger).Log("msg", "Failed to resolve pinger", "err", err)
os.Exit(1)
}

level.Info(logger).Log("msg", "Pinger resolved", "host", host, "ip_addr", pinger.IPAddr())

pinger.Interval = *interval
pinger.RecordRtts = false
pinger.SetPrivileged(*privileged)
pinger.Size = *sizeBytes
err = smokePingers.prepare(hosts, interval, privileged, sizeBytes)
if err != nil {
level.Error(logger).Log("err", "Unable to create ping", err)
os.Exit(1)
}

pingers[i] = pinger
if smokePingers.sizeOfPrepared() == 0 {
level.Error(logger).Log("msg", "no targets specified on command line or in config file")
os.Exit(1)
}

maxInterval := *interval
sc.Lock()
for _, targetGroup := range sc.C.Targets {
if targetGroup.Interval > maxInterval {
maxInterval = targetGroup.Interval
}
packetSize := targetGroup.Size
if packetSize < 24 || packetSize > 65535 {
level.Error(logger).Log("msg", "Invalid packet size. (24-65535)", "bytes", packetSize)
os.Exit(1)
}
for _, host = range targetGroup.Hosts {
pinger = probing.New(host)
pinger.Interval = targetGroup.Interval
pinger.RecordRtts = false
pinger.SetNetwork(targetGroup.Network)
pinger.Size = packetSize
pinger.Source = targetGroup.Source
if targetGroup.Protocol == "icmp" {
pinger.SetPrivileged(true)
smokePingers.start()
smokepingCollector = NewSmokepingCollector(smokePingers.started, *pingResponseSeconds)
prometheus.MustRegister(smokepingCollector)

hup := make(chan os.Signal, 1)
signal.Notify(hup, syscall.SIGHUP)
reloadCh := make(chan chan error)
go func() {
for {
var errCallback func(e error)
var successCallback func()
select {
case <-hup:
errCallback = func(e error) {}
successCallback = func() {}
case rc := <-reloadCh:
errCallback = func(e error) {
rc <- e
}
successCallback = func() {
rc <- nil
}
}
err := pinger.Resolve()
if err := sc.ReloadConfig(*configFile); err != nil {
level.Error(logger).Log("msg", "Error reloading config", "err", err)
errCallback(err)
continue
}
err = smokePingers.prepare(hosts, interval, privileged, sizeBytes)
if err != nil {
level.Error(logger).Log("msg", "failed to resolve pinger", "error", err.Error())
os.Exit(1)
level.Error(logger).Log("msg", "Unable to create ping from config", "err", err)
errCallback(err)
continue
}
if smokePingers.sizeOfPrepared() == 0 {
level.Error(logger).Log("msg", "No targets specified on command line or in config file")
errCallback(fmt.Errorf("no targets specified"))
continue
}
pingers = append(pingers, pinger)
}
}
sc.Unlock()

if len(pingers) == 0 {
level.Error(logger).Log("msg", "no targets specified on command line or in config file")
os.Exit(1)
}

splay := time.Duration(interval.Nanoseconds() / int64(len(pingers)))
level.Info(logger).Log("msg", fmt.Sprintf("Waiting %s between starting pingers", splay))
g := new(errgroup.Group)
for _, pinger := range pingers {
level.Info(logger).Log("msg", "Starting prober", "address", pinger.Addr(), "interval", pinger.Interval, "size_bytes", pinger.Size, "source", pinger.Source)
g.Go(pinger.Run)
time.Sleep(splay)
}
smokePingers.start()
smokepingCollector.updatePingers(smokePingers.started, *pingResponseSeconds)

prometheus.MustRegister(NewSmokepingCollector(&pingers, *pingResponseSeconds))
level.Info(logger).Log("msg", "Reloaded config file")
successCallback()
}
}()

http.Handle(*metricsPath, promhttp.Handler())
http.HandleFunc("/-/healthy", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("Healthy"))
})
http.HandleFunc("/-/reload",
func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
fmt.Fprintf(w, "This endpoint requires a POST request.\n")
return
}

rc := make(chan error)
reloadCh <- rc
if err := <-rc; err != nil {
http.Error(w, fmt.Sprintf("Failed to reload config: %s", err), http.StatusInternalServerError)
}
})
if *metricsPath != "/" && *metricsPath != "" {
landingConfig := web.LandingConfig{
Name: "Smokeping Prober",
Expand All @@ -235,11 +354,9 @@ func main() {
os.Exit(1)
}

for _, pinger := range pingers {
pinger.Stop()
}
if err = g.Wait(); err != nil {
level.Error(logger).Log("msg", "pingers failed", "error", err)
os.Exit(1)
err = smokePingers.stop()
if err != nil {
level.Error(logger).Log("err", err)
}

}
Loading