Skip to content

Commit

Permalink
make error handling more robust in docker provider
Browse files Browse the repository at this point in the history
  • Loading branch information
frenchzed committed Apr 29, 2024
1 parent 2b4a7e5 commit 865fca5
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 78 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
require (
github.com/Microsoft/go-winio v0.4.14 // indirect
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/cenkalti/backoff/v4 v4.3.0
github.com/containerd/log v0.1.0 // indirect
github.com/distribution/reference v0.6.0 // indirect
github.com/docker/go-connections v0.5.0 // indirect
Expand Down
143 changes: 90 additions & 53 deletions internal/core/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@ import (
"regexp"
"strconv"
"strings"
"time"

"github.com/aacebedo/dnsdock/internal/servers"
"github.com/aacebedo/dnsdock/internal/utils"
"github.com/cenkalti/backoff/v4"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
)

// DockerProvider is the name of the provider used for services added by the Docker client
const DockerProvider = "docker"

// DockerManager is the entrypoint to the docker daemon
type DockerManager struct {
config *utils.Config
Expand All @@ -50,119 +55,151 @@ func (d *DockerManager) Start() (err error) {
ctx, cancel := context.WithCancel(context.Background())
d.cancel = cancel

messageChan, _ := d.client.Events(ctx, types.EventsOptions{
go backoff.RetryNotify(func() error {
return d.run(ctx)
}, backoff.WithContext(backoff.NewExponentialBackOff(), ctx), func(err error, d time.Duration) {
logger.Errorf("Error running docker manager, retrying in %v: %s", d, err)
})

return nil
}

func (d *DockerManager) run(ctx context.Context) error {
messageChan, errorChan := d.client.Events(ctx, types.EventsOptions{
Filters: filters.NewArgs(filters.Arg("type", "container")),
})
go d.watch(messageChan)

containers, err := d.client.ContainerList(context.Background(), container.ListOptions{})
containers, err := d.client.ContainerList(ctx, container.ListOptions{})
if err != nil {
return errors.New("Error getting containers: " + err.Error())
return fmt.Errorf("error getting containers: %w", err)
}

services := make(map[string]struct{})
for _, container := range containers {
service, err := d.getService(container.ID)
if err != nil {
logger.Errorf("%s", err)
continue
return fmt.Errorf("error getting service: %w", err)
}
err = d.list.AddService(container.ID, *service)
if err != nil {
return fmt.Errorf("Error adding service: %s", err)
return fmt.Errorf("error adding service: %w", err)
}
services[container.ID] = struct{}{}
}

return nil
}
for id, srv := range d.list.GetAllServices() {
if _, ok := services[id]; !ok && srv.Provider == DockerProvider {
err := d.list.RemoveService(id)
if err != nil {
return fmt.Errorf("error removing service: %w", err)
}
}
}

func (d *DockerManager) watch(messageChan <-chan events.Message) {
for m := range messageChan {
switch m.Action {
case "create":
go d.createHandler(m)
case "start":
go d.startHandler(m)
case "unpause":
go d.startHandler(m)
case "die":
go d.stopHandler(m)
case "pause":
go d.stopHandler(m)
case "destroy":
go d.destroyHandler(m)
case "rename":
go d.renameHandler(m)
for {
select {
case m := <-messageChan:
err := d.handler(m)
if err != nil {
return err
}
case err := <-errorChan:
return err
case <-ctx.Done():
return nil
}
}
}

func (d *DockerManager) createHandler(m events.Message) {
func (d *DockerManager) handler(m events.Message) error {
switch m.Action {
case "create":
return d.createHandler(m)
case "start":
return d.startHandler(m)
case "unpause":
return d.startHandler(m)
case "die":
return d.stopHandler(m)
case "pause":
return d.stopHandler(m)
case "destroy":
return d.destroyHandler(m)
case "rename":
return d.renameHandler(m)
}
return nil
}

func (d *DockerManager) createHandler(m events.Message) error {
logger.Debugf("Created container '%s'", m.ID)
if d.config.All {
service, err := d.getService(m.ID)
if err != nil {
logger.Errorf("%s", err)
} else {
err = d.list.AddService(m.ID, *service)
if err != nil {
logger.Errorf("Error adding service: %s", err)
}
return fmt.Errorf("error getting service: %w", err)
}
err = d.list.AddService(m.ID, *service)
if err != nil {
return fmt.Errorf("error adding service: %w", err)
}
}
return nil
}

func (d *DockerManager) startHandler(m events.Message) {
func (d *DockerManager) startHandler(m events.Message) error {
logger.Debugf("Started container '%s'", m.ID)
if !d.config.All {
service, err := d.getService(m.ID)
if err != nil {
logger.Errorf("%s", err)
} else {
err = d.list.AddService(m.ID, *service)
if err != nil {
logger.Errorf("Error adding service: %s", err)
}
return fmt.Errorf("error getting service: %w", err)
}
err = d.list.AddService(m.ID, *service)
if err != nil {
return fmt.Errorf("error adding service: %w", err)
}
}
return nil
}

func (d *DockerManager) stopHandler(m events.Message) {
func (d *DockerManager) stopHandler(m events.Message) error {
logger.Debugf("Stopped container '%s'", m.ID)
if !d.config.All {
err := d.list.RemoveService(m.ID)
if err != nil {
logger.Errorf("%s", err)
return fmt.Errorf("error removing service: %w", err)
}
} else {
logger.Debugf("Stopped container '%s' not removed as --all argument is true", m.ID)
}
return nil
}

func (d *DockerManager) renameHandler(m events.Message) {
func (d *DockerManager) renameHandler(m events.Message) error {
logger.Debugf("Renamed container '%s'", m.ID)
err := d.list.RemoveService(m.ID)
if err != nil {
logger.Errorf("%s", err)
return fmt.Errorf("error removing service: %w", err)
}
service, err := d.getService(m.ID)
if err != nil {
logger.Errorf("%s", err)
} else {
res := d.list.AddService(m.ID, *service)
if res != nil {
logger.Errorf("Error adding service: %s", res)
}
return fmt.Errorf("error getting service: %w", err)
}
res := d.list.AddService(m.ID, *service)
if res != nil {
return fmt.Errorf("error removing service: %w", err)
}
return nil
}

func (d *DockerManager) destroyHandler(m events.Message) {
func (d *DockerManager) destroyHandler(m events.Message) error {
logger.Debugf("Destroy container '%s'", m.ID)
if d.config.All {
err := d.list.RemoveService(m.ID)
if err != nil {
logger.Errorf("%s", err)
return fmt.Errorf("error removing service: %w", err)
}
}
return nil
}

// Stop stops the DockerManager
Expand All @@ -176,7 +213,7 @@ func (d *DockerManager) getService(id string) (*servers.Service, error) {
return nil, err
}

service := servers.NewService()
service := servers.NewService(DockerProvider)
service.Aliases = make([]string, 0)

service.Image = getImageName(desc.Config.Image)
Expand Down
5 changes: 3 additions & 2 deletions internal/core/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
package core

import (
"github.com/aacebedo/dnsdock/internal/servers"
"reflect"
"testing"

"github.com/aacebedo/dnsdock/internal/servers"
)

func TestGetImageName(t *testing.T) {
Expand Down Expand Up @@ -78,7 +79,7 @@ func TestSplitEnv(t *testing.T) {

func TestOverrideFromEnv(t *testing.T) {
getService := func() *servers.Service {
service := servers.NewService()
service := servers.NewService(DockerProvider)
service.Name = "myfoo"
service.Image = "mybar"
return service
Expand Down
32 changes: 19 additions & 13 deletions internal/servers/dnsserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ package servers
import (
"errors"
"fmt"
"github.com/aacebedo/dnsdock/internal/utils"
"github.com/miekg/dns"
"net"
"regexp"
"strings"
"sync"
"time"

"github.com/aacebedo/dnsdock/internal/utils"
"github.com/miekg/dns"
)

// Service represents a container and an attached DNS record
Expand All @@ -27,19 +28,23 @@ type Service struct {
IPs []net.IP
TTL int
Aliases []string

// Provider tracks the creator of a service
Provider string `json:"-"`
}

// NewService creates a new service
func NewService() (s *Service) {
s = &Service{TTL: -1}
func NewService(provider string) (s *Service) {
s = &Service{TTL: -1, Provider: provider}
return
}
func (s Service) String() string {
return fmt.Sprintf(` Name: %s
Aliases: %s
IPs: %s
TTL: %d
`, s.Name, s.Aliases, s.IPs, s.TTL)
return fmt.Sprintf(` Name: %s
Aliases: %s
IPs: %s
TTL: %d
Provider: %s
`, s.Name, s.Aliases, s.IPs, s.TTL, s.Provider)
}

// ServiceListProvider represents the entrypoint to get containers
Expand Down Expand Up @@ -504,7 +509,7 @@ func (s *DNSServer) getExpandedID(in string) (out string, err error) {
return
}

re, err := regexp.Compile("^[0-9a-f]+$");
re, err := regexp.Compile("^[0-9a-f]+$")
if err != nil {
return "", err
}
Expand Down Expand Up @@ -554,9 +559,10 @@ func (s *DNSServer) createSOA() []dns.RR {
// namely, the query may be longer than "name" and still be a valid prefix
// query for "name".
// Examples:
// foo.bar.baz.qux is a valid query for bar.baz.qux (longer prefix is okay)
// foo.*.baz.qux is a valid query for bar.baz.qux (wildcards okay)
// *.baz.qux is a valid query for baz.baz.qux (wildcard prefix okay)
//
// foo.bar.baz.qux is a valid query for bar.baz.qux (longer prefix is okay)
// foo.*.baz.qux is a valid query for bar.baz.qux (wildcards okay)
// *.baz.qux is a valid query for baz.baz.qux (wildcard prefix okay)
func isPrefixQuery(query, name []string) bool {
for i, j := len(query)-1, len(name)-1; i >= 0 && j >= 0; i, j = i-1, j-1 {
if query[i] != name[j] && query[i] != "*" {
Expand Down
24 changes: 14 additions & 10 deletions internal/servers/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ package servers

import (
"encoding/json"
"net/http"

"github.com/aacebedo/dnsdock/internal/utils"
"github.com/gorilla/mux"
"net/http"
)

// HTTPProvider is the name of the provider used for services added by the HTTP server
const HTTPProvider = "http"

// HTTPServer represents the http endpoint
type HTTPServer struct {
config *utils.Config
Expand Down Expand Up @@ -89,7 +93,7 @@ func (s *HTTPServer) addService(w http.ResponseWriter, req *http.Request) {
return
}

service := NewService()
service := NewService(HTTPProvider)
if err := json.NewDecoder(req.Body).Decode(&service); err != nil {
logger.Errorf("JSON decoding error: %s", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -112,10 +116,10 @@ func (s *HTTPServer) addService(w http.ResponseWriter, req *http.Request) {
}

err := s.list.AddService(id, *service)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}

func (s *HTTPServer) removeService(w http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -182,10 +186,10 @@ func (s *HTTPServer) updateService(w http.ResponseWriter, req *http.Request) {
// todo: this probably needs to be moved. consider stop event in the
// middle of sending PATCH. container would not be removed.
err = s.list.AddService(id, service)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}

func (s *HTTPServer) setTTL(w http.ResponseWriter, req *http.Request) {
Expand Down

0 comments on commit 865fca5

Please sign in to comment.