diff --git a/Makefile b/Makefile index ed5eae18..73ea05e1 100755 --- a/Makefile +++ b/Makefile @@ -42,3 +42,6 @@ rapid-crash: sudo docker run --restart=always --name test_crash debian:bookworm-slim /bin/cat &&\ sleep 3 &&\ sudo docker rm -f test_crash + +debug-list-containers: + bash -c 'echo -e "GET /containers/json HTTP/1.0\r\n" | sudo netcat -U /var/run/docker.sock | tail -n +9 | jq' \ No newline at end of file diff --git a/src/docker/client.go b/src/docker/client.go index e6f654ad..baebb3f4 100644 --- a/src/docker/client.go +++ b/src/docker/client.go @@ -16,6 +16,8 @@ type Client struct { key string refCount *atomic.Int32 *client.Client + + l logrus.FieldLogger } func (c Client) DaemonHostname() string { @@ -23,10 +25,13 @@ func (c Client) DaemonHostname() string { return url.Hostname() } +func (c Client) Connected() bool { + return c.Client != nil +} + // if the client is still referenced, this is no-op -func (c Client) Close() error { - if c.refCount.Load() > 0 { - c.refCount.Add(-1) +func (c *Client) Close() error { + if c.refCount.Add(-1) > 0 { return nil } @@ -34,7 +39,15 @@ func (c Client) Close() error { defer clientMapMu.Unlock() delete(clientMap, c.key) - return c.Client.Close() + client := c.Client + c.Client = nil + + c.l.Debugf("client closed") + + if client != nil { + return client.Close() + } + return nil } // ConnectClient creates a new Docker client connection to the specified host. @@ -94,12 +107,16 @@ func ConnectClient(host string) (Client, E.NestedError) { return Client{}, err } - clientMap[host] = Client{ + c := Client{ Client: client, key: host, refCount: &atomic.Int32{}, + l: logger.WithField("docker_client", client.DaemonHost()), } - clientMap[host].refCount.Add(1) + c.refCount.Add(1) + c.l.Debugf("client connected") + + clientMap[host] = c return clientMap[host], nil } diff --git a/src/docker/container.go b/src/docker/container.go index 97e5e8a4..9a7aa3ba 100644 --- a/src/docker/container.go +++ b/src/docker/container.go @@ -10,17 +10,18 @@ import ( ) type ProxyProperties struct { - DockerHost string `yaml:"docker_host" json:"docker_host"` - ContainerName string `yaml:"container_name" json:"container_name"` - ImageName string `yaml:"image_name" json:"image_name"` - Aliases []string `yaml:"aliases" json:"aliases"` - IsExcluded bool `yaml:"is_excluded" json:"is_excluded"` - FirstPort string `yaml:"first_port" json:"first_port"` - IdleTimeout string `yaml:"idle_timeout" json:"idle_timeout"` - WakeTimeout string `yaml:"wake_timeout" json:"wake_timeout"` - StopMethod string `yaml:"stop_method" json:"stop_method"` - StopTimeout string `yaml:"stop_timeout" json:"stop_timeout"` // stop_method = "stop" only - StopSignal string `yaml:"stop_signal" json:"stop_signal"` // stop_method = "stop" | "kill" only + DockerHost string `yaml:"-" json:"docker_host"` + ContainerName string `yaml:"-" json:"container_name"` + ImageName string `yaml:"-" json:"image_name"` + Aliases []string `yaml:"-" json:"aliases"` + IsExcluded bool `yaml:"-" json:"is_excluded"` + FirstPort string `yaml:"-" json:"first_port"` + IdleTimeout string `yaml:"-" json:"idle_timeout"` + WakeTimeout string `yaml:"-" json:"wake_timeout"` + StopMethod string `yaml:"-" json:"stop_method"` + StopTimeout string `yaml:"-" json:"stop_timeout"` // stop_method = "stop" only + StopSignal string `yaml:"-" json:"stop_signal"` // stop_method = "stop" | "kill" only + Running bool `yaml:"-" json:"running"` } type Container struct { @@ -42,6 +43,7 @@ func FromDocker(c *types.Container, dockerHost string) (res Container) { StopMethod: res.getDeleteLabel(LabelStopMethod), StopTimeout: res.getDeleteLabel(LabelStopTimeout), StopSignal: res.getDeleteLabel(LabelStopSignal), + Running: c.Status == "running", } return } diff --git a/src/docker/idlewatcher/watcher.go b/src/docker/idlewatcher/watcher.go index e02dd3c0..a42a2e8f 100644 --- a/src/docker/idlewatcher/watcher.go +++ b/src/docker/idlewatcher/watcher.go @@ -15,10 +15,13 @@ import ( E "github.com/yusing/go-proxy/error" P "github.com/yusing/go-proxy/proxy" PT "github.com/yusing/go-proxy/proxy/fields" + W "github.com/yusing/go-proxy/watcher" + event "github.com/yusing/go-proxy/watcher/events" ) type watcher struct { *P.ReverseProxyEntry + client D.Client refCount atomic.Int32 @@ -26,6 +29,7 @@ type watcher struct { stopByMethod StopCallback wakeCh chan struct{} wakeDone chan E.NestedError + running atomic.Bool ctx context.Context cancel context.CancelFunc @@ -36,7 +40,7 @@ type watcher struct { type ( WakeDone <-chan error WakeFunc func() WakeDone - StopCallback func() (bool, E.NestedError) + StopCallback func() E.NestedError ) func Register(entry *P.ReverseProxyEntry) (*watcher, E.NestedError) { @@ -51,6 +55,7 @@ func Register(entry *P.ReverseProxyEntry) (*watcher, E.NestedError) { if w, ok := watcherMap[entry.ContainerName]; ok { w.refCount.Add(1) + w.ReverseProxyEntry = entry return w, nil } @@ -67,8 +72,9 @@ func Register(entry *P.ReverseProxyEntry) (*watcher, E.NestedError) { l: logger.WithField("container", entry.ContainerName), } w.refCount.Add(1) - + w.running.Store(entry.ContainerRunning) w.stopByMethod = w.getStopCallback() + watcherMap[w.ContainerName] = w go func() { @@ -84,13 +90,14 @@ func Unregister(containerName string) { defer watcherMapMu.Unlock() if w, ok := watcherMap[containerName]; ok { - if w.refCount.Load() == 0 { + if w.refCount.Add(-1) > 0 { + return + } + if w.cancel != nil { w.cancel() - close(w.wakeCh) - delete(watcherMap, containerName) - } else { - w.refCount.Add(-1) } + w.client.Close() + delete(watcherMap, containerName) } } @@ -131,19 +138,26 @@ func (w *watcher) PatchRoundTripper(rtp http.RoundTripper) roundTripper { } func (w *watcher) roundTrip(origRoundTrip roundTripFunc, req *http.Request) (*http.Response, error) { - timeout := time.After(w.WakeTimeout) w.wakeCh <- struct{}{} + + if w.running.Load() { + return origRoundTrip(req) + } + timeout := time.After(w.WakeTimeout) + for { + if w.running.Load() { + return origRoundTrip(req) + } select { + case <-req.Context().Done(): + return nil, req.Context().Err() case err := <-w.wakeDone: if err != nil { return nil, err.Error() } - return origRoundTrip(req) case <-timeout: - resp := loadingResponse - resp.TLS = req.TLS - return &resp, nil + return getLoadingResponse(), nil } } } @@ -178,36 +192,23 @@ func (w *watcher) containerStatus() (string, E.NestedError) { return json.State.Status, nil } -func (w *watcher) wakeIfStopped() (bool, E.NestedError) { - failure := E.Failure("wake") +func (w *watcher) wakeIfStopped() E.NestedError { status, err := w.containerStatus() if err.HasError() { - return false, failure.With(err) + return err } // "created", "running", "paused", "restarting", "removing", "exited", or "dead" switch status { case "exited", "dead": - err = E.From(w.containerStart()) + return E.From(w.containerStart()) case "paused": - err = E.From(w.containerUnpause()) + return E.From(w.containerUnpause()) case "running": - return false, nil + w.running.Store(true) + return nil default: - return false, failure.With(E.Unexpected("container state", status)) - } - - if err.HasError() { - return false, failure.With(err) - } - - status, err = w.containerStatus() - if err.HasError() { - return false, failure.With(err) - } else if status != "running" { - return false, failure.With(E.Unexpected("container state", status)) - } else { - return true, nil + return E.Unexpected("container state", status) } } @@ -223,19 +224,15 @@ func (w *watcher) getStopCallback() StopCallback { default: panic("should not reach here") } - return func() (bool, E.NestedError) { + return func() E.NestedError { status, err := w.containerStatus() if err.HasError() { - return false, E.FailWith("stop", err) + return err } if status != "running" { - return false, nil - } - err = E.From(cb()) - if err.HasError() { - return false, E.FailWith("stop", err) + return nil } - return true, nil + return E.From(cb()) } } @@ -244,55 +241,69 @@ func (w *watcher) watch() { w.ctx = watcherCtx w.cancel = watcherCancel + dockerWatcher := W.NewDockerWatcherWithClient(w.client) + + defer close(w.wakeCh) + + dockerEventCh, dockerEventErrCh := dockerWatcher.EventsWithOptions(w.ctx, W.DockerListOptions{ + Filters: W.NewDockerFilter( + W.DockerFilterContainer, + W.DockerrFilterContainerName(w.ContainerName), + W.DockerFilterStart, + W.DockerFilterStop, + W.DockerFilterDie, + W.DockerFilterKill, + W.DockerFilterPause, + W.DockerFilterUnpause, + ), + }) + ticker := time.NewTicker(w.IdleTimeout) defer ticker.Stop() for { select { case <-mainLoopCtx.Done(): - watcherCancel() + w.cancel() case <-watcherCtx.Done(): w.l.Debug("stopped") return + case err := <-dockerEventErrCh: + if err != nil && err.IsNot(context.Canceled) { + w.l.Error(E.FailWith("docker watcher", err)) + } + case e := <-dockerEventCh: + switch e.Action { + case event.ActionDockerStartUnpause: + w.running.Store(true) + w.l.Infof("%s %s", e.ActorName, e.Action) + case event.ActionDockerStopPause: + w.running.Store(false) + w.l.Infof("%s %s", e.ActorName, e.Action) + } case <-ticker.C: w.l.Debug("timeout") - stopped, err := w.stopByMethod() - if err.HasError() { - w.l.Error(err.Extraf("stop method: %s", w.StopMethod)) - } else if stopped { - w.l.Infof("%s: ok", w.StopMethod) - } else { - ticker.Stop() + ticker.Stop() + if err := w.stopByMethod(); err != nil && err.IsNot(context.Canceled) { + w.l.Error(E.FailWith("stop", err).Extraf("stop method: %s", w.StopMethod)) } case <-w.wakeCh: - w.l.Debug("wake received") - go func() { - started, err := w.wakeIfStopped() - if err != nil { - w.l.Error(err) - } else if started { - w.l.Infof("awaken") - ticker.Reset(w.IdleTimeout) - } - w.wakeDone <- err // this is passed to roundtrip - }() + w.l.Debug("wake signal received") + ticker.Reset(w.IdleTimeout) + err := w.wakeIfStopped() + if err != nil && err.IsNot(context.Canceled) { + w.l.Error(E.FailWith("wake", err)) + } + select { + case w.wakeDone <- err: // this is passed to roundtrip + default: + } } } } -var ( - mainLoopCtx context.Context - mainLoopCancel context.CancelFunc - mainLoopWg sync.WaitGroup - - watcherMap = make(map[string]*watcher) - watcherMapMu sync.Mutex - - newWatcherCh = make(chan *watcher) - - logger = logrus.WithField("module", "idle_watcher") - - loadingResponse = http.Response{ +func getLoadingResponse() *http.Response { + return &http.Response{ StatusCode: http.StatusAccepted, Header: http.Header{ "Content-Type": {"text/html"}, @@ -305,6 +316,19 @@ var ( Body: io.NopCloser(bytes.NewReader((loadingPage))), ContentLength: int64(len(loadingPage)), } +} + +var ( + mainLoopCtx context.Context + mainLoopCancel context.CancelFunc + mainLoopWg sync.WaitGroup + + watcherMap = make(map[string]*watcher) + watcherMapMu sync.Mutex + + newWatcherCh = make(chan *watcher) + + logger = logrus.WithField("module", "idle_watcher") loadingPage = []byte(` @@ -317,12 +341,16 @@ var ( -

Container is starting... Please wait

+

Container is starting... Please wait

`[1:]) diff --git a/src/go.mod b/src/go.mod index 2d471d22..5d7980d9 100644 --- a/src/go.mod +++ b/src/go.mod @@ -3,8 +3,8 @@ module github.com/yusing/go-proxy go 1.22.0 require ( - github.com/docker/cli v27.2.1+incompatible - github.com/docker/docker v27.2.1+incompatible + github.com/docker/cli v27.3.1+incompatible + github.com/docker/docker v27.3.1+incompatible github.com/fsnotify/fsnotify v1.7.0 github.com/go-acme/lego/v4 v4.18.0 github.com/puzpuzpuz/xsync/v3 v3.4.0 diff --git a/src/go.sum b/src/go.sum index 8a92f6d4..e4b70538 100644 --- a/src/go.sum +++ b/src/go.sum @@ -13,10 +13,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= -github.com/docker/cli v27.2.1+incompatible h1:U5BPtiD0viUzjGAjV1p0MGB8eVA3L3cbIrnyWmSJI70= -github.com/docker/cli v27.2.1+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= -github.com/docker/docker v27.2.1+incompatible h1:fQdiLfW7VLscyoeYEBz7/J8soYFDZV1u6VW6gJEjNMI= -github.com/docker/docker v27.2.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/cli v27.3.1+incompatible h1:qEGdFBF3Xu6SCvCYhc7CzaQTlBmqDuzxPDpigSyeKQQ= +github.com/docker/cli v27.3.1+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= +github.com/docker/docker v27.3.1+incompatible h1:KttF0XoteNTicmUtBO0L2tP+J7FGRFTjaEF4k6WdhfI= +github.com/docker/docker v27.3.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= diff --git a/src/proxy/entry.go b/src/proxy/entry.go index 896367b6..e1ce85ff 100644 --- a/src/proxy/entry.go +++ b/src/proxy/entry.go @@ -22,13 +22,14 @@ type ( HideHeaders []string /* Docker only */ - IdleTimeout time.Duration - WakeTimeout time.Duration - StopMethod T.StopMethod - StopTimeout int - StopSignal T.Signal - DockerHost string - ContainerName string + IdleTimeout time.Duration + WakeTimeout time.Duration + StopMethod T.StopMethod + StopTimeout int + StopSignal T.Signal + DockerHost string + ContainerName string + ContainerRunning bool } StreamEntry struct { Alias T.Alias `json:"alias"` @@ -102,20 +103,21 @@ func validateRPEntry(m *M.ProxyEntry, s T.Scheme, b E.Builder) *ReverseProxyEntr } return &ReverseProxyEntry{ - Alias: T.NewAlias(m.Alias), - Scheme: s, - URL: url, - NoTLSVerify: m.NoTLSVerify, - PathPatterns: pathPatterns, - SetHeaders: setHeaders, - HideHeaders: m.HideHeaders, - IdleTimeout: idleTimeout, - WakeTimeout: wakeTimeout, - StopMethod: stopMethod, - StopTimeout: int(stopTimeOut.Seconds()), // docker api takes integer seconds for timeout argument - StopSignal: stopSignal, - DockerHost: m.DockerHost, - ContainerName: m.ContainerName, + Alias: T.NewAlias(m.Alias), + Scheme: s, + URL: url, + NoTLSVerify: m.NoTLSVerify, + PathPatterns: pathPatterns, + SetHeaders: setHeaders, + HideHeaders: m.HideHeaders, + IdleTimeout: idleTimeout, + WakeTimeout: wakeTimeout, + StopMethod: stopMethod, + StopTimeout: int(stopTimeOut.Seconds()), // docker api takes integer seconds for timeout argument + StopSignal: stopSignal, + DockerHost: m.DockerHost, + ContainerName: m.ContainerName, + ContainerRunning: m.Running, } } diff --git a/src/proxy/provider/docker_provider.go b/src/proxy/provider/docker_provider.go index 25986524..71284974 100755 --- a/src/proxy/provider/docker_provider.go +++ b/src/proxy/provider/docker_provider.go @@ -1,12 +1,13 @@ package provider import ( + "strconv" + D "github.com/yusing/go-proxy/docker" E "github.com/yusing/go-proxy/error" M "github.com/yusing/go-proxy/models" R "github.com/yusing/go-proxy/route" W "github.com/yusing/go-proxy/watcher" - . "github.com/yusing/go-proxy/watcher/event" ) type DockerProvider struct { @@ -60,7 +61,7 @@ func (p *DockerProvider) LoadRoutesImpl() (routes R.Routes, err E.NestedError) { return routes, errors.Build() } -func (p *DockerProvider) OnEvent(event Event, routes R.Routes) (res EventResult) { +func (p *DockerProvider) OnEvent(event W.Event, routes R.Routes) (res EventResult) { b := E.NewBuilder("event %s error", event) defer b.To(&res.err) @@ -72,36 +73,33 @@ func (p *DockerProvider) OnEvent(event Event, routes R.Routes) (res EventResult) } }) - switch event.Action { - case ActionStarted, ActionCreated, ActionModified: - client, err := D.ConnectClient(p.dockerHost) - if err.HasError() { - b.Add(E.FailWith("connect to docker", err)) - return - } - defer client.Close() - cont, err := client.Inspect(event.ActorID) - if err.HasError() { - b.Add(E.FailWith("inspect container", err)) - return - } - entries, err := p.entriesFromContainerLabels(cont) - b.Add(err) - - entries.RangeAll(func(alias string, entry *M.ProxyEntry) { - if routes.Has(alias) { - b.Add(E.AlreadyExist("alias", alias)) + client, err := D.ConnectClient(p.dockerHost) + if err.HasError() { + b.Add(E.FailWith("connect to docker", err)) + return + } + defer client.Close() + cont, err := client.Inspect(event.ActorID) + if err.HasError() { + b.Add(E.FailWith("inspect container", err)) + return + } + entries, err := p.entriesFromContainerLabels(cont) + b.Add(err) + + entries.RangeAll(func(alias string, entry *M.ProxyEntry) { + if routes.Has(alias) { + b.Add(E.AlreadyExist("alias", alias)) + } else { + if route, err := R.NewRoute(entry); err.HasError() { + b.Add(err) } else { - if route, err := R.NewRoute(entry); err.HasError() { - b.Add(err) - } else { - routes.Store(alias, route) - b.Add(route.Start()) - res.nAdded++ - } + routes.Store(alias, route) + b.Add(route.Start()) + res.nAdded++ } - }) - } + } + }) return } @@ -125,6 +123,22 @@ func (p *DockerProvider) entriesFromContainerLabels(container D.Container) (M.Pr errors.Add(p.applyLabel(entries, key, val)) } + // selecting correct host port + if container.HostConfig.NetworkMode != "host" { + for _, a := range container.Aliases { + entry, ok := entries.Load(a) + if !ok { + continue + } + for _, p := range container.Ports { + containerPort := strconv.Itoa(int(p.PrivatePort)) + if containerPort == entry.Port { + entry.Port = strconv.Itoa(int(p.PublicPort)) + } + } + } + } + return entries, errors.Build().Subject(container.ContainerName) } diff --git a/src/proxy/provider/file_provider.go b/src/proxy/provider/file_provider.go index 10e9763e..aeb54193 100644 --- a/src/proxy/provider/file_provider.go +++ b/src/proxy/provider/file_provider.go @@ -10,7 +10,6 @@ import ( R "github.com/yusing/go-proxy/route" U "github.com/yusing/go-proxy/utils" W "github.com/yusing/go-proxy/watcher" - . "github.com/yusing/go-proxy/watcher/event" ) type FileProvider struct { @@ -29,7 +28,7 @@ func Validate(data []byte) E.NestedError { return U.ValidateYaml(U.GetSchema(common.ProvidersSchemaPath), data) } -func (p FileProvider) OnEvent(event Event, routes R.Routes) (res EventResult) { +func (p FileProvider) OnEvent(event W.Event, routes R.Routes) (res EventResult) { b := E.NewBuilder("event %s error", event) defer b.To(&res.err) diff --git a/src/proxy/provider/provider.go b/src/proxy/provider/provider.go index 0063a6ef..a34c17d2 100644 --- a/src/proxy/provider/provider.go +++ b/src/proxy/provider/provider.go @@ -9,7 +9,6 @@ import ( E "github.com/yusing/go-proxy/error" R "github.com/yusing/go-proxy/route" W "github.com/yusing/go-proxy/watcher" - . "github.com/yusing/go-proxy/watcher/event" ) type ( @@ -29,7 +28,7 @@ type ( ProviderImpl interface { NewWatcher() W.Watcher LoadRoutesImpl() (R.Routes, E.NestedError) - OnEvent(event Event, routes R.Routes) EventResult + OnEvent(event W.Event, routes R.Routes) EventResult } ProviderType string EventResult struct { diff --git a/src/watcher/docker_watcher.go b/src/watcher/docker_watcher.go index 72f8f97b..a6229141 100644 --- a/src/watcher/docker_watcher.go +++ b/src/watcher/docker_watcher.go @@ -4,72 +4,101 @@ import ( "context" "time" - "github.com/docker/docker/api/types/events" + docker_events "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/filters" + "github.com/sirupsen/logrus" D "github.com/yusing/go-proxy/docker" E "github.com/yusing/go-proxy/error" - . "github.com/yusing/go-proxy/watcher/event" + "github.com/yusing/go-proxy/watcher/events" ) -type DockerWatcher struct { - host string +type ( + DockerWatcher struct { + host string + client D.Client + logrus.FieldLogger + } + DockerListOptions = docker_events.ListOptions +) + +// https://docs.docker.com/reference/api/engine/version/v1.47/#tag/System/operation/SystemPingHead +var ( + DockerFilterContainer = filters.Arg("type", string(docker_events.ContainerEventType)) + DockerFilterStart = filters.Arg("event", string(docker_events.ActionStart)) + DockerFilterStop = filters.Arg("event", string(docker_events.ActionStop)) + DockerFilterDie = filters.Arg("event", string(docker_events.ActionDie)) + DockerFilterKill = filters.Arg("event", string(docker_events.ActionKill)) + DockerFilterPause = filters.Arg("event", string(docker_events.ActionPause)) + DockerFilterUnpause = filters.Arg("event", string(docker_events.ActionUnPause)) + + NewDockerFilter = filters.NewArgs +) + +func DockerrFilterContainerName(name string) filters.KeyValuePair { + return filters.Arg("container", name) +} + +func NewDockerWatcher(host string) DockerWatcher { + return DockerWatcher{host: host, FieldLogger: logrus.WithField("module", "docker_watcher")} } -func NewDockerWatcher(host string) *DockerWatcher { - return &DockerWatcher{host: host} +func NewDockerWatcherWithClient(client D.Client) DockerWatcher { + return DockerWatcher{client: client, FieldLogger: logrus.WithField("module", "docker_watcher")} } -func (w *DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan E.NestedError) { +func (w DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan E.NestedError) { + return w.EventsWithOptions(ctx, optionsWatchAll) +} + +func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerListOptions) (<-chan Event, <-chan E.NestedError) { eventCh := make(chan Event) errCh := make(chan E.NestedError) started := make(chan struct{}) go func() { + defer close(eventCh) defer close(errCh) - var cl D.Client - var err E.NestedError - for range 3 { - cl, err = D.ConnectClient(w.host) - if err.NoError() { - break + if !w.client.Connected() { + var err E.NestedError + for range 3 { + w.client, err = D.ConnectClient(w.host) + if err != nil { + defer w.client.Close() + break + } + time.Sleep(1 * time.Second) + } + if err.HasError() { + errCh <- E.FailWith("docker connection", err) + return } - errCh <- err - time.Sleep(1 * time.Second) - } - if err.HasError() { - errCh <- E.Failure("connecting to docker") - return } - defer cl.Close() - cEventCh, cErrCh := cl.Events(ctx, dwOptions) + cEventCh, cErrCh := w.client.Events(ctx, options) started <- struct{}{} for { select { case <-ctx.Done(): - if err := <-cErrCh; err != nil { - errCh <- E.From(err) + if err := E.From(ctx.Err()); err != nil && err.IsNot(context.Canceled) { + errCh <- err } return case msg := <-cEventCh: - var Action Action - switch msg.Action { - case events.ActionStart: - Action = ActionCreated - case events.ActionDie: - Action = ActionStopped - default: // NOTE: should not happen - Action = ActionModified + action, ok := events.DockerEventMap[msg.Action] + if !ok { + w.Debugf("ignored unknown docker event: %s for container %s", msg.Action, msg.Actor.Attributes["name"]) + continue } - eventCh <- Event{ - Type: EventTypeDocker, + event := Event{ + Type: events.EventTypeDocker, ActorID: msg.Actor.ID, ActorAttributes: msg.Actor.Attributes, // labels ActorName: msg.Actor.Attributes["name"], - Action: Action, + Action: action, } + eventCh <- event case err := <-cErrCh: if err == nil { continue @@ -81,7 +110,7 @@ func (w *DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan E.Nest default: if D.IsErrConnectionFailed(err) { time.Sleep(100 * time.Millisecond) - cEventCh, cErrCh = cl.Events(ctx, dwOptions) + cEventCh, cErrCh = w.client.Events(ctx, options) } } } @@ -92,8 +121,9 @@ func (w *DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan E.Nest return eventCh, errCh } -var dwOptions = events.ListOptions{Filters: filters.NewArgs( - filters.Arg("type", string(events.ContainerEventType)), - filters.Arg("event", string(events.ActionStart)), - filters.Arg("event", string(events.ActionDie)), // 'stop' already triggering 'die' +var optionsWatchAll = DockerListOptions{Filters: NewDockerFilter( + DockerFilterContainer, + DockerFilterStart, + DockerFilterStop, + DockerFilterDie, )} diff --git a/src/watcher/event/event.go b/src/watcher/event/event.go deleted file mode 100644 index 8dc1213f..00000000 --- a/src/watcher/event/event.go +++ /dev/null @@ -1,34 +0,0 @@ -package event - -import "fmt" - -type ( - Event struct { - Type EventType - ActorName string - ActorID string - ActorAttributes map[string]string - Action Action - } - Action string - EventType string -) - -const ( - ActionModified Action = "modified" - ActionCreated Action = "created" - ActionStarted Action = "started" - ActionDeleted Action = "deleted" - ActionStopped Action = "stopped" - - EventTypeDocker EventType = "docker" - EventTypeFile EventType = "file" -) - -func (e Event) String() string { - return fmt.Sprintf("%s %s", e.ActorName, e.Action) -} - -func (a Action) IsDelete() bool { - return a == ActionDeleted -} diff --git a/src/watcher/events/events.go b/src/watcher/events/events.go new file mode 100644 index 00000000..87abcd20 --- /dev/null +++ b/src/watcher/events/events.go @@ -0,0 +1,49 @@ +package events + +import ( + "fmt" + + dockerEvents "github.com/docker/docker/api/types/events" +) + +type ( + Event struct { + Type EventType + ActorName string + ActorID string + ActorAttributes map[string]string + Action Action + } + Action string + EventType string +) + +const ( + ActionFileModified Action = "modified" + ActionFileCreated Action = "created" + ActionFileDeleted Action = "deleted" + + ActionDockerStartUnpause Action = "start" + ActionDockerStopPause Action = "stop" + + EventTypeDocker EventType = "docker" + EventTypeFile EventType = "file" +) + +var DockerEventMap = map[dockerEvents.Action]Action{ + dockerEvents.ActionCreate: ActionDockerStartUnpause, + dockerEvents.ActionStart: ActionDockerStartUnpause, + dockerEvents.ActionPause: ActionDockerStartUnpause, + dockerEvents.ActionDie: ActionDockerStopPause, + dockerEvents.ActionStop: ActionDockerStopPause, + dockerEvents.ActionUnPause: ActionDockerStopPause, + dockerEvents.ActionKill: ActionDockerStopPause, +} + +func (e Event) String() string { + return fmt.Sprintf("%s %s", e.ActorName, e.Action) +} + +func (a Action) IsDelete() bool { + return a == ActionFileDeleted +} diff --git a/src/watcher/file_watcher.go b/src/watcher/file_watcher.go index 1da16a24..f4cda622 100644 --- a/src/watcher/file_watcher.go +++ b/src/watcher/file_watcher.go @@ -6,7 +6,6 @@ import ( "github.com/yusing/go-proxy/common" E "github.com/yusing/go-proxy/error" - . "github.com/yusing/go-proxy/watcher/event" ) type fileWatcher struct { diff --git a/src/watcher/file_watcher_helper.go b/src/watcher/file_watcher_helper.go index 98f9802a..351b7035 100644 --- a/src/watcher/file_watcher_helper.go +++ b/src/watcher/file_watcher_helper.go @@ -9,7 +9,7 @@ import ( "github.com/fsnotify/fsnotify" "github.com/sirupsen/logrus" E "github.com/yusing/go-proxy/error" - . "github.com/yusing/go-proxy/watcher/event" + "github.com/yusing/go-proxy/watcher/events" ) type fileWatcherHelper struct { @@ -81,30 +81,30 @@ func (h *fileWatcherHelper) start() { for { select { - case event, ok := <-h.w.Events: + case fsEvent, ok := <-h.w.Events: if !ok { // closed manually? fsLogger.Error("channel closed") return } // retrieve the watcher - w, ok := h.m[path.Base(event.Name)] + w, ok := h.m[path.Base(fsEvent.Name)] if !ok { // watcher for this file does not exist continue } msg := Event{ - Type: EventTypeFile, + Type: events.EventTypeFile, ActorName: w.filename, } switch { - case event.Has(fsnotify.Create): - msg.Action = ActionCreated - case event.Has(fsnotify.Write): - msg.Action = ActionModified - case event.Has(fsnotify.Remove), event.Has(fsnotify.Rename): - msg.Action = ActionDeleted + case fsEvent.Has(fsnotify.Create): + msg.Action = events.ActionFileCreated + case fsEvent.Has(fsnotify.Write): + msg.Action = events.ActionFileModified + case fsEvent.Has(fsnotify.Remove), fsEvent.Has(fsnotify.Rename): + msg.Action = events.ActionFileDeleted default: // ignore other events continue } diff --git a/src/watcher/watcher.go b/src/watcher/watcher.go index 9869ee19..17b70dd8 100644 --- a/src/watcher/watcher.go +++ b/src/watcher/watcher.go @@ -4,9 +4,11 @@ import ( "context" E "github.com/yusing/go-proxy/error" - . "github.com/yusing/go-proxy/watcher/event" + "github.com/yusing/go-proxy/watcher/events" ) +type Event = events.Event + type Watcher interface { Events(ctx context.Context) (<-chan Event, <-chan E.NestedError) }