diff --git a/go.mod b/go.mod index 5ff1b23a931..869c0eb54f0 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/go-logr/logr v1.4.2 github.com/go-logr/zapr v1.3.0 github.com/goccy/go-json v0.10.3 + github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 github.com/jpillora/backoff v1.0.0 diff --git a/go.sum b/go.sum index e8e01479b1c..1a91a6d61dc 100644 --- a/go.sum +++ b/go.sum @@ -138,6 +138,8 @@ github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 h1:zN2lZNZRflqFyxVaTIU61KNKQ9C0055u9CAfpmqUvo4= +github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3/go.mod h1:nPpo7qLxd6XL3hWJG/O60sR8ZKfMCiIoNap5GvD12KU= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= diff --git a/internal/dataplane/sendconfig/dbmode.go b/internal/dataplane/sendconfig/dbmode.go index 2ecc0c7687e..692186037e6 100644 --- a/internal/dataplane/sendconfig/dbmode.go +++ b/internal/dataplane/sendconfig/dbmode.go @@ -165,6 +165,8 @@ func (s *UpdateStrategyDBMode) HandleEvents( case event := <-events: if event.Error == nil { s.logger.V(logging.DebugLevel).Info("updated gateway entity", "action", event.Action, "kind", event.Entity.Kind, "name", event.Entity.Name) + eventDiff := diagnostics.NewEntityDiff(event.Diff, string(event.Action)) + diff.Entities = append(diff.Entities, eventDiff) } else { s.logger.Error(event.Error, "failed updating gateway entity", "action", event.Action, "kind", event.Entity.Kind, "name", event.Entity.Name) parsed, err := resourceErrorFromEntityAction(event) diff --git a/internal/diagnostics/diff.go b/internal/diagnostics/diff.go index 60bd3c5debc..acb50a8dc15 100644 --- a/internal/diagnostics/diff.go +++ b/internal/diagnostics/diff.go @@ -1,5 +1,10 @@ package diagnostics +import ( + "github.com/golang-collections/collections/queue" + "github.com/kong/go-database-reconciler/pkg/diff" +) + // TRR TODO this holds guts for diff endpoints. need to reorg the types.go into something that splits out the // subcomponents into separate files and move the base stuff into server.go probably. // the /config/{successful|failed} API endpoint structure isn't great for anything other than "current failure, last @@ -61,6 +66,21 @@ type EntityDiff struct { Diff string `json:"diff,omitempty"` } +// NewEntityDiff creates a diagnostic entity diff. +func NewEntityDiff(diff string, action string, entity diff.Entity) EntityDiff { + return EntityDiff{ + // TODO this is mostly a stub at present. Need to either derive the source from tags or just omit it for now with + // a nice to have feature issue, or a simpler YAGNI but if someone asks add it TODO here. + Source: sourceResource{}, + Generated: generatedEntity{ + Name: entity.Name, + Kind: entity.Kind, + }, + Action: action, + Diff: diff, + } +} + // TRR TODO this is stolen from the error event builder, which parses regurgitated entity tags into k8s parents. // we want the same here, minus the additional error info. could probably make it a function in // internal/util/k8s.go along with sourceResource, but for now it's just sitting here for reference. @@ -91,3 +111,30 @@ type EntityDiff struct { // } // } //} + +// diffMap holds DB mode diff history. +type diffMap struct { + diffs map[string]ConfigDiff + hashQueue *queue.Queue + length int +} + +func newDiffMap(length int) diffMap { + return diffMap{ + diffs: map[string]ConfigDiff{}, + length: length, + hashQueue: queue.New(), + } +} + +// Update adds a diff to the diffMap. If the diffMap holds the maximum number of diffs in history, it removes the +// oldest diff. +func (d *diffMap) Update(diff ConfigDiff) { + if d.hashQueue.Len() == d.length { + oldest := d.hashQueue.Dequeue().(string) + delete(d.diffs, oldest) + } + d.hashQueue.Enqueue(diff.Hash) + d.diffs[diff.Hash] = diff + return +} diff --git a/internal/diagnostics/server.go b/internal/diagnostics/server.go index 90f6997153c..4210face7ec 100644 --- a/internal/diagnostics/server.go +++ b/internal/diagnostics/server.go @@ -26,6 +26,9 @@ const ( // variable) but do want a small amount of leeway to account for goroutine scheduling, so it // is not zero. diagnosticConfigBufferDepth = 3 + + // diffHistorySize is the number of diffs to keep in history. + diffHistorySize = 5 ) // Server is an HTTP server running exposing the pprof profiling tool, and processing diagnostic dumps of Kong configurations. @@ -43,8 +46,11 @@ type Server struct { currentFallbackCacheMetadata *fallback.GeneratedCacheMetadata + diffs diffMap + configLock *sync.RWMutex fallbackLock *sync.RWMutex + diffLock *sync.RWMutex } // ServerConfig contains configuration for the diagnostics server. @@ -66,6 +72,8 @@ func NewServer(logger logr.Logger, cfg ServerConfig) Server { profilingEnabled: cfg.ProfilingEnabled, configLock: &sync.RWMutex{}, fallbackLock: &sync.RWMutex{}, + diffLock: &sync.RWMutex{}, + diffs: newDiffMap(diffHistorySize), } if cfg.ConfigDumpsEnabled { @@ -103,7 +111,7 @@ func (s *Server) Listen(ctx context.Context, port int) error { } errChan := make(chan error) - go s.receiveConfig(ctx) + go s.receiveDiagnostics(ctx) go func() { err := httpServer.ListenAndServe() @@ -126,20 +134,22 @@ func (s *Server) Listen(ctx context.Context, port int) error { } } -// receiveConfig watches the config update channel. -func (s *Server) receiveConfig(ctx context.Context) { +// receiveDiagnostics watches the diagnostic update channels. +func (s *Server) receiveDiagnostics(ctx context.Context) { for { select { case dump := <-s.clientDiagnostic.Configs: s.onConfigDump(dump) case meta := <-s.clientDiagnostic.FallbackCacheMetadata: s.onFallbackCacheMetadata(meta) + case diff := <-s.clientDiagnostic.Diffs: + s.onDiff(diff) case <-ctx.Done(): if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) { - s.logger.Error(err, "Shutting down diagnostic config collection: context completed with error") + s.logger.Error(err, "Shutting down diagnostic collection: context completed with error") return } - s.logger.V(logging.InfoLevel).Info("Shutting down diagnostic config collection: context completed") + s.logger.V(logging.InfoLevel).Info("Shutting down diagnostic collection: context completed") return } } @@ -175,6 +185,12 @@ func (s *Server) onFallbackCacheMetadata(meta fallback.GeneratedCacheMetadata) { s.currentFallbackCacheMetadata = &meta } +func (s *Server) onDiff(diff ConfigDiff) { + s.diffLock.Lock() + defer s.diffLock.Unlock() + s.diffs.Update(diff) +} + // installProfilingHandlers adds the Profiling webservice to the given mux. func installProfilingHandlers(mux *http.ServeMux) { mux.HandleFunc("/debug/pprof", redirectTo("/debug/pprof/"))