diff --git a/go.mod b/go.mod index 908f2884..fe98d88d 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( github.com/maruel/panicparse/v2 v2.3.1 github.com/nathanaelle/syslog5424/v2 v2.0.5 github.com/rs/cors v1.10.1 + github.com/rubyist/circuitbreaker v2.2.1+incompatible github.com/shirou/gopsutil v3.21.11+incompatible github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.8.4 @@ -53,10 +54,12 @@ require ( github.com/aws/aws-sdk-go-v2/service/ssooidc v1.22.0 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.27.0 // indirect github.com/aws/smithy-go v1.20.0 // indirect + github.com/cenk/backoff v2.2.1+incompatible // indirect github.com/cilium/ebpf v0.9.1 // indirect github.com/containerd/cgroups/v3 v3.0.1 // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect github.com/go-ole/go-ole v1.3.0 // indirect github.com/go-openapi/analysis v0.22.2 // indirect github.com/go-openapi/jsonpointer v0.20.2 // indirect @@ -80,6 +83,7 @@ require ( github.com/opencontainers/runtime-spec v1.0.2 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/perimeterx/marshmallow v1.1.5 // indirect + github.com/peterbourgon/g2s v0.0.0-20170223122336-d4e7ad98afea // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect diff --git a/go.sum b/go.sum index 21c801a8..ec66a09f 100644 --- a/go.sum +++ b/go.sum @@ -34,6 +34,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.27.0 h1:cjTRjh700H36MQ8M0LnDn33W3Jmw github.com/aws/aws-sdk-go-v2/service/sts v1.27.0/go.mod h1:nXfOBMWPokIbOY+Gi7a1psWMSvskUCemZzI+SMB7Akc= github.com/aws/smithy-go v1.20.0 h1:6+kZsCXZwKxZS9RfISnPc4EXlHoyAkm2hPuM8X2BrrQ= github.com/aws/smithy-go v1.20.0/go.mod h1:uo5RKksAl4PzhqaAbjd4rLgFoq5koTsQKYuGe7dklGc= +github.com/cenk/backoff v2.2.1+incompatible h1:djdFT7f4gF2ttuzRKPbMOWgZajgesItGLwG5FTQKmmE= +github.com/cenk/backoff v2.2.1+incompatible/go.mod h1:7FtoeaSnHoZnmZzz47cM35Y9nSW7tNyaidugnHTaFDE= github.com/cilium/ebpf v0.9.1 h1:64sn2K3UKw8NbP/blsixRpF3nXuyhz/VjRlRzvlBRu4= github.com/cilium/ebpf v0.9.1/go.mod h1:+OhNOIXx/Fnu1IE8bJz2dzOA+VSfyTfdNUVdlQnxUFY= github.com/containerd/cgroups/v3 v3.0.1 h1:4hfGvu8rfGIwVIDd+nLzn/B9ZXx4BcCjzt5ToenJRaE= @@ -158,6 +160,8 @@ github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2D github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= github.com/perimeterx/marshmallow v1.1.5 h1:a2LALqQ1BlHM8PZblsDdidgv1mWi1DgC2UmX50IvK2s= github.com/perimeterx/marshmallow v1.1.5/go.mod h1:dsXbUu8CRzfYP5a87xpp0xq9S3u0Vchtcl8we9tYaXw= +github.com/peterbourgon/g2s v0.0.0-20170223122336-d4e7ad98afea h1:sKwxy1H95npauwu8vtF95vG/syrL0p8fSZo/XlDg5gk= +github.com/peterbourgon/g2s v0.0.0-20170223122336-d4e7ad98afea/go.mod h1:1VcHEd3ro4QMoHfiNl/j7Jkln9+KQuorp0PItHMJYNg= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -171,6 +175,8 @@ github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/rs/cors v1.10.1 h1:L0uuZVXIKlI1SShY2nhFfo44TYvDPQ1w4oFkUJNfhyo= github.com/rs/cors v1.10.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= +github.com/rubyist/circuitbreaker v2.2.1+incompatible h1:KUKd/pV8Geg77+8LNDwdow6rVCAYOp8+kHUyFvL6Mhk= +github.com/rubyist/circuitbreaker v2.2.1+incompatible/go.mod h1:Ycs3JgJADPuzJDwffe12k6BZT8hxVi6lFK+gWYJLN4A= github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= diff --git a/log/rfc5424.go b/log/rfc5424.go index 4254cbf5..d630aa6a 100644 --- a/log/rfc5424.go +++ b/log/rfc5424.go @@ -18,8 +18,10 @@ package log import ( "fmt" "strings" + "time" "github.com/nathanaelle/syslog5424/v2" + circuit "github.com/rubyist/circuitbreaker" "github.com/sirupsen/logrus" ) @@ -27,6 +29,10 @@ type RFC5424Hook struct { syslog *syslog5424.Syslog sender *syslog5424.Sender msgID string + + // Use a circuit breaker to pause sending messages to the syslog target + // in the presence of connection errors. + cb *circuit.Breaker } func (r RFC5424Hook) Levels() []logrus.Level { @@ -58,7 +64,13 @@ func (r RFC5424Hook) Fire(entry *logrus.Entry) (err error) { msg := strings.Join(messages, " ") - r.syslog.Channel(sev).Msgid(r.msgID).Log(msg) + // Do not perform any action unless the circuit breaker is either closed (reset), or is ready to retry. + if r.cb.Ready() { + r.syslog.Channel(sev).Msgid(r.msgID).Log(msg) + // Register any call as successful to enable automatic resets. + // Failures are registered asynchronously by the goroutine that consumes errors from the corresponding channel. + r.cb.Success() + } return } @@ -74,7 +86,9 @@ func NewRFC5424Hook(opts Target) (logrus.Hook, error) { return nil, err } - slConn, _, err := syslog5424.Dial(opts.SyslogProto, opts.SyslogAddr) + // syslog5424.Dial() returns an error channel, which needs to be drained + // in order to avoid blocking. + slConn, errCh, err := syslog5424.Dial(opts.SyslogProto, opts.SyslogAddr) if err != nil { return nil, err } @@ -84,10 +98,64 @@ func NewRFC5424Hook(opts Target) (logrus.Hook, error) { return nil, err } - return &RFC5424Hook{syslog: syslogServer, sender: slConn, msgID: opts.SyslogMsgID}, nil + r := &RFC5424Hook{ + syslog: syslogServer, sender: slConn, msgID: opts.SyslogMsgID, + // We can change the circuit breaker settings as desired - including making + // them configurable and/or dynamically adjustable based on runtime conditions. + // + // Please note, however, that a 3-failure threshold breaker with default settings + // was found to work well with varying load and different states of a log target. + // Specifically, the breaker will remain tripped when sending messages to the target + // that is consistently failing, and will reset quickly when delivery begins to succeed. + cb: circuit.NewThresholdBreaker(3), + } + + // A signal channel that is used to stop the goroutine reporting on circuit breaker state changes. + doneCh := make(chan struct{}) + + // Consume errors from errCh until it is closed. + go func() { + for { + err, ok := <-errCh + if err != nil { + r.cb.Fail() // Register a failure with the circuit breaker. + } + if !ok { + close(doneCh) + return + } + } + }() + + // Report on circuit breaker state changes. + cbStateCh := r.cb.Subscribe() + go func() { + for { + select { + case e, ok := <-cbStateCh: + if !ok { + return + } + var state string + switch e { + case circuit.BreakerTripped: + state = "too many connection errors, log delivery is stopped until this improves" + case circuit.BreakerReset: + state = "resuming log delivery" + default: + continue + } + fmt.Println(time.Now().Format(time.RFC3339), "syslog target", opts.SyslogAddr, "("+opts.SyslogTag+"):", state) + case <-doneCh: + return + } + } + }() + + return r, nil } func (r RFC5424Hook) Close() error { - r.sender.End() + r.sender.End() // This will also close errCh returned by syslog.Dial() in NewRFC5424Hook(), causing related goroutines to exit. return nil }