From 865fca55f8f964c5fba7a276db761b051d7033fd Mon Sep 17 00:00:00 2001 From: frenchzed Date: Mon, 29 Apr 2024 08:39:47 -0400 Subject: [PATCH] make error handling more robust in docker provider --- go.mod | 1 + internal/core/docker.go | 143 +++++++++++++++++++++------------- internal/core/docker_test.go | 5 +- internal/servers/dnsserver.go | 32 ++++---- internal/servers/http.go | 24 +++--- 5 files changed, 127 insertions(+), 78 deletions(-) diff --git a/go.mod b/go.mod index a2755a8..e2ac317 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( require ( github.com/Microsoft/go-winio v0.4.14 // indirect github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 github.com/containerd/log v0.1.0 // indirect github.com/distribution/reference v0.6.0 // indirect github.com/docker/go-connections v0.5.0 // indirect diff --git a/internal/core/docker.go b/internal/core/docker.go index 171db65..bc40adc 100644 --- a/internal/core/docker.go +++ b/internal/core/docker.go @@ -17,9 +17,11 @@ import ( "regexp" "strconv" "strings" + "time" "github.com/aacebedo/dnsdock/internal/servers" "github.com/aacebedo/dnsdock/internal/utils" + "github.com/cenkalti/backoff/v4" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/events" @@ -27,6 +29,9 @@ import ( "github.com/docker/docker/client" ) +// DockerProvider is the name of the provider used for services added by the Docker client +const DockerProvider = "docker" + // DockerManager is the entrypoint to the docker daemon type DockerManager struct { config *utils.Config @@ -50,119 +55,151 @@ func (d *DockerManager) Start() (err error) { ctx, cancel := context.WithCancel(context.Background()) d.cancel = cancel - messageChan, _ := d.client.Events(ctx, types.EventsOptions{ + go backoff.RetryNotify(func() error { + return d.run(ctx) + }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx), func(err error, d time.Duration) { + logger.Errorf("Error running docker manager, retrying in %v: %s", d, err) + }) + + return nil +} + +func (d *DockerManager) run(ctx context.Context) error { + messageChan, errorChan := d.client.Events(ctx, types.EventsOptions{ Filters: filters.NewArgs(filters.Arg("type", "container")), }) - go d.watch(messageChan) - containers, err := d.client.ContainerList(context.Background(), container.ListOptions{}) + containers, err := d.client.ContainerList(ctx, container.ListOptions{}) if err != nil { - return errors.New("Error getting containers: " + err.Error()) + return fmt.Errorf("error getting containers: %w", err) } + services := make(map[string]struct{}) for _, container := range containers { service, err := d.getService(container.ID) if err != nil { - logger.Errorf("%s", err) - continue + return fmt.Errorf("error getting service: %w", err) } err = d.list.AddService(container.ID, *service) if err != nil { - return fmt.Errorf("Error adding service: %s", err) + return fmt.Errorf("error adding service: %w", err) } + services[container.ID] = struct{}{} } - return nil -} + for id, srv := range d.list.GetAllServices() { + if _, ok := services[id]; !ok && srv.Provider == DockerProvider { + err := d.list.RemoveService(id) + if err != nil { + return fmt.Errorf("error removing service: %w", err) + } + } + } -func (d *DockerManager) watch(messageChan <-chan events.Message) { - for m := range messageChan { - switch m.Action { - case "create": - go d.createHandler(m) - case "start": - go d.startHandler(m) - case "unpause": - go d.startHandler(m) - case "die": - go d.stopHandler(m) - case "pause": - go d.stopHandler(m) - case "destroy": - go d.destroyHandler(m) - case "rename": - go d.renameHandler(m) + for { + select { + case m := <-messageChan: + err := d.handler(m) + if err != nil { + return err + } + case err := <-errorChan: + return err + case <-ctx.Done(): + return nil } } } -func (d *DockerManager) createHandler(m events.Message) { +func (d *DockerManager) handler(m events.Message) error { + switch m.Action { + case "create": + return d.createHandler(m) + case "start": + return d.startHandler(m) + case "unpause": + return d.startHandler(m) + case "die": + return d.stopHandler(m) + case "pause": + return d.stopHandler(m) + case "destroy": + return d.destroyHandler(m) + case "rename": + return d.renameHandler(m) + } + return nil +} + +func (d *DockerManager) createHandler(m events.Message) error { logger.Debugf("Created container '%s'", m.ID) if d.config.All { service, err := d.getService(m.ID) if err != nil { - logger.Errorf("%s", err) - } else { - err = d.list.AddService(m.ID, *service) - if err != nil { - logger.Errorf("Error adding service: %s", err) - } + return fmt.Errorf("error getting service: %w", err) + } + err = d.list.AddService(m.ID, *service) + if err != nil { + return fmt.Errorf("error adding service: %w", err) } } + return nil } -func (d *DockerManager) startHandler(m events.Message) { +func (d *DockerManager) startHandler(m events.Message) error { logger.Debugf("Started container '%s'", m.ID) if !d.config.All { service, err := d.getService(m.ID) if err != nil { - logger.Errorf("%s", err) - } else { - err = d.list.AddService(m.ID, *service) - if err != nil { - logger.Errorf("Error adding service: %s", err) - } + return fmt.Errorf("error getting service: %w", err) + } + err = d.list.AddService(m.ID, *service) + if err != nil { + return fmt.Errorf("error adding service: %w", err) } } + return nil } -func (d *DockerManager) stopHandler(m events.Message) { +func (d *DockerManager) stopHandler(m events.Message) error { logger.Debugf("Stopped container '%s'", m.ID) if !d.config.All { err := d.list.RemoveService(m.ID) if err != nil { - logger.Errorf("%s", err) + return fmt.Errorf("error removing service: %w", err) } } else { logger.Debugf("Stopped container '%s' not removed as --all argument is true", m.ID) } + return nil } -func (d *DockerManager) renameHandler(m events.Message) { +func (d *DockerManager) renameHandler(m events.Message) error { logger.Debugf("Renamed container '%s'", m.ID) err := d.list.RemoveService(m.ID) if err != nil { - logger.Errorf("%s", err) + return fmt.Errorf("error removing service: %w", err) } service, err := d.getService(m.ID) if err != nil { - logger.Errorf("%s", err) - } else { - res := d.list.AddService(m.ID, *service) - if res != nil { - logger.Errorf("Error adding service: %s", res) - } + return fmt.Errorf("error getting service: %w", err) + } + res := d.list.AddService(m.ID, *service) + if res != nil { + return fmt.Errorf("error removing service: %w", err) } + return nil } -func (d *DockerManager) destroyHandler(m events.Message) { +func (d *DockerManager) destroyHandler(m events.Message) error { logger.Debugf("Destroy container '%s'", m.ID) if d.config.All { err := d.list.RemoveService(m.ID) if err != nil { - logger.Errorf("%s", err) + return fmt.Errorf("error removing service: %w", err) } } + return nil } // Stop stops the DockerManager @@ -176,7 +213,7 @@ func (d *DockerManager) getService(id string) (*servers.Service, error) { return nil, err } - service := servers.NewService() + service := servers.NewService(DockerProvider) service.Aliases = make([]string, 0) service.Image = getImageName(desc.Config.Image) diff --git a/internal/core/docker_test.go b/internal/core/docker_test.go index f575878..c7f24bf 100644 --- a/internal/core/docker_test.go +++ b/internal/core/docker_test.go @@ -9,9 +9,10 @@ package core import ( - "github.com/aacebedo/dnsdock/internal/servers" "reflect" "testing" + + "github.com/aacebedo/dnsdock/internal/servers" ) func TestGetImageName(t *testing.T) { @@ -78,7 +79,7 @@ func TestSplitEnv(t *testing.T) { func TestOverrideFromEnv(t *testing.T) { getService := func() *servers.Service { - service := servers.NewService() + service := servers.NewService(DockerProvider) service.Name = "myfoo" service.Image = "mybar" return service diff --git a/internal/servers/dnsserver.go b/internal/servers/dnsserver.go index 4ea681c..a7ba869 100644 --- a/internal/servers/dnsserver.go +++ b/internal/servers/dnsserver.go @@ -11,13 +11,14 @@ package servers import ( "errors" "fmt" - "github.com/aacebedo/dnsdock/internal/utils" - "github.com/miekg/dns" "net" "regexp" "strings" "sync" "time" + + "github.com/aacebedo/dnsdock/internal/utils" + "github.com/miekg/dns" ) // Service represents a container and an attached DNS record @@ -27,19 +28,23 @@ type Service struct { IPs []net.IP TTL int Aliases []string + + // Provider tracks the creator of a service + Provider string `json:"-"` } // NewService creates a new service -func NewService() (s *Service) { - s = &Service{TTL: -1} +func NewService(provider string) (s *Service) { + s = &Service{TTL: -1, Provider: provider} return } func (s Service) String() string { - return fmt.Sprintf(` Name: %s - Aliases: %s - IPs: %s - TTL: %d - `, s.Name, s.Aliases, s.IPs, s.TTL) + return fmt.Sprintf(` Name: %s + Aliases: %s + IPs: %s + TTL: %d + Provider: %s + `, s.Name, s.Aliases, s.IPs, s.TTL, s.Provider) } // ServiceListProvider represents the entrypoint to get containers @@ -504,7 +509,7 @@ func (s *DNSServer) getExpandedID(in string) (out string, err error) { return } - re, err := regexp.Compile("^[0-9a-f]+$"); + re, err := regexp.Compile("^[0-9a-f]+$") if err != nil { return "", err } @@ -554,9 +559,10 @@ func (s *DNSServer) createSOA() []dns.RR { // namely, the query may be longer than "name" and still be a valid prefix // query for "name". // Examples: -// foo.bar.baz.qux is a valid query for bar.baz.qux (longer prefix is okay) -// foo.*.baz.qux is a valid query for bar.baz.qux (wildcards okay) -// *.baz.qux is a valid query for baz.baz.qux (wildcard prefix okay) +// +// foo.bar.baz.qux is a valid query for bar.baz.qux (longer prefix is okay) +// foo.*.baz.qux is a valid query for bar.baz.qux (wildcards okay) +// *.baz.qux is a valid query for baz.baz.qux (wildcard prefix okay) func isPrefixQuery(query, name []string) bool { for i, j := len(query)-1, len(name)-1; i >= 0 && j >= 0; i, j = i-1, j-1 { if query[i] != name[j] && query[i] != "*" { diff --git a/internal/servers/http.go b/internal/servers/http.go index 0bfc360..2b4c834 100644 --- a/internal/servers/http.go +++ b/internal/servers/http.go @@ -10,11 +10,15 @@ package servers import ( "encoding/json" + "net/http" + "github.com/aacebedo/dnsdock/internal/utils" "github.com/gorilla/mux" - "net/http" ) +// HTTPProvider is the name of the provider used for services added by the HTTP server +const HTTPProvider = "http" + // HTTPServer represents the http endpoint type HTTPServer struct { config *utils.Config @@ -89,7 +93,7 @@ func (s *HTTPServer) addService(w http.ResponseWriter, req *http.Request) { return } - service := NewService() + service := NewService(HTTPProvider) if err := json.NewDecoder(req.Body).Decode(&service); err != nil { logger.Errorf("JSON decoding error: %s", err) http.Error(w, err.Error(), http.StatusInternalServerError) @@ -112,10 +116,10 @@ func (s *HTTPServer) addService(w http.ResponseWriter, req *http.Request) { } err := s.list.AddService(id, *service) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } } func (s *HTTPServer) removeService(w http.ResponseWriter, req *http.Request) { @@ -182,10 +186,10 @@ func (s *HTTPServer) updateService(w http.ResponseWriter, req *http.Request) { // todo: this probably needs to be moved. consider stop event in the // middle of sending PATCH. container would not be removed. err = s.list.AddService(id, service) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } } func (s *HTTPServer) setTTL(w http.ResponseWriter, req *http.Request) {