From 21095e5cdbb0586e46b900fcb19e05d1a343836d Mon Sep 17 00:00:00 2001 From: David Juhasz Date: Thu, 22 Feb 2024 16:24:09 -0800 Subject: [PATCH] Standardize the watcher Download() interface Refs #867 - Update the existing watcher `Download()` method and interface signatures to take a destination filepath (string) instead of an `io.Writer` stream - Add a `filewatcher.Download()` method - Create a `minio.Download()` method and move implementation specific code from `serviceImpl.Download()` to it - Add `MinioConfig.URL` field to set bucket config by URL for testing - Add unit tests - Update mocks --- internal/filenotify/filenotify.go | 16 +- .../{poller_test.go => filenotify_test.go} | 14 +- internal/filenotify/poller.go | 9 +- internal/watcher/config.go | 23 +- internal/watcher/fake/mock_service.go | 7 +- internal/watcher/fake/mock_watcher.go | 38 ++++ internal/watcher/filesystem.go | 69 ++++-- internal/watcher/filesystem_test.go | 198 ++++++++++++++++++ internal/watcher/minio.go | 33 +++ internal/watcher/minio_test.go | 53 ++++- internal/watcher/watcher.go | 30 +-- internal/workflow/activities/download.go | 12 +- internal/workflow/activities/download_test.go | 20 +- 13 files changed, 431 insertions(+), 91 deletions(-) rename internal/filenotify/{poller_test.go => filenotify_test.go} (76%) create mode 100644 internal/watcher/filesystem_test.go diff --git a/internal/filenotify/filenotify.go b/internal/filenotify/filenotify.go index f15130003..1a9bf4c5f 100644 --- a/internal/filenotify/filenotify.go +++ b/internal/filenotify/filenotify.go @@ -4,11 +4,17 @@ package filenotify import ( + "time" + "github.com/fsnotify/fsnotify" "github.com/radovskyb/watcher" ) -// FileWatcher is an interface for implementing file notification watchers +type Config struct { + PollInterval time.Duration +} + +// FileWatcher is an interface for implementing file notification watchers. type FileWatcher interface { Events() <-chan fsnotify.Event Errors() <-chan error @@ -18,11 +24,11 @@ type FileWatcher interface { } // New tries to use an fs-event watcher, and falls back to the poller if there is an error -func New() (FileWatcher, error) { +func New(cfg Config) (FileWatcher, error) { if watcher, err := NewEventWatcher(); err == nil { return watcher, nil } - return NewPollingWatcher() + return NewPollingWatcher(cfg) } // NewEventWatcher returns an fs-event based file watcher @@ -37,7 +43,7 @@ func NewEventWatcher() (FileWatcher, error) { } // NewPollingWatcher returns a poll-based file watcher -func NewPollingWatcher() (FileWatcher, error) { +func NewPollingWatcher(cfg Config) (FileWatcher, error) { poller := &filePoller{ wr: watcher.New(), events: make(chan fsnotify.Event), @@ -49,7 +55,7 @@ func NewPollingWatcher() (FileWatcher, error) { done := make(chan error) { go func() { - err := poller.wr.Start(watchWaitTime) + err := poller.wr.Start(cfg.PollInterval) if err != nil { done <- err } diff --git a/internal/filenotify/poller_test.go b/internal/filenotify/filenotify_test.go similarity index 76% rename from internal/filenotify/poller_test.go rename to internal/filenotify/filenotify_test.go index 089463d07..9c1827736 100644 --- a/internal/filenotify/poller_test.go +++ b/internal/filenotify/filenotify_test.go @@ -1,4 +1,4 @@ -package filenotify +package filenotify_test import ( "fmt" @@ -8,10 +8,16 @@ import ( "time" "github.com/fsnotify/fsnotify" + + "github.com/artefactual-sdps/enduro/internal/filenotify" ) +const pollInterval = time.Millisecond * 5 + func TestPollerEvent(t *testing.T) { - w, err := NewPollingWatcher() + w, err := filenotify.NewPollingWatcher( + filenotify.Config{PollInterval: pollInterval}, + ) if err != nil { t.Fatal("error creating poller") } @@ -44,7 +50,7 @@ func TestPollerEvent(t *testing.T) { } } -func assertEvent(w FileWatcher, eType fsnotify.Op) error { +func assertEvent(w filenotify.FileWatcher, eType fsnotify.Op) error { var err error select { case e := <-w.Events(): @@ -53,7 +59,7 @@ func assertEvent(w FileWatcher, eType fsnotify.Op) error { } case e := <-w.Errors(): err = fmt.Errorf("got unexpected error waiting for events %v: %v", eType, e) - case <-time.After(watchWaitTime * 3): + case <-time.After(pollInterval * 2): err = fmt.Errorf("timeout waiting for event %v", eType) } return err diff --git a/internal/filenotify/poller.go b/internal/filenotify/poller.go index 156ea0b9d..9470d3ff5 100644 --- a/internal/filenotify/poller.go +++ b/internal/filenotify/poller.go @@ -1,15 +1,10 @@ package filenotify import ( - "time" - "github.com/fsnotify/fsnotify" "github.com/radovskyb/watcher" ) -// watchWaitTime is the time to wait between file poll loops -const watchWaitTime = 200 * time.Millisecond - // filePoller is used to poll files for changes, especially in cases where fsnotify // can't be run (e.g. when inotify handles are exhausted) // filePoller satisfies the FileWatcher interface @@ -33,9 +28,9 @@ func (w *filePoller) loop() { switch event.Op { case watcher.Create: op = fsnotify.Create - case watcher.Rename: - fallthrough case watcher.Move: + fallthrough + case watcher.Rename: op = fsnotify.Rename default: continue diff --git a/internal/watcher/config.go b/internal/watcher/config.go index afd4bfbf1..8537d4c5a 100644 --- a/internal/watcher/config.go +++ b/internal/watcher/config.go @@ -5,6 +5,8 @@ import ( "time" ) +const defaultPollInterval = 200 * time.Millisecond + type Config struct { Filesystem []*FilesystemConfig Minio []*MinioConfig @@ -29,14 +31,24 @@ func (c Config) CompletedDirs() []string { // See filesystem.go for more. type FilesystemConfig struct { - Name string - Path string - Inotify bool - Ignore string + Name string + Path string + CompletedDir string + Ignore string + Inotify bool RetentionPeriod *time.Duration - CompletedDir string StripTopLevelDir bool + + // PollInterval sets the length of time between filesystem polls (default: + // 200ms). If Inotify is true then PollInterval is ignored. + PollInterval time.Duration +} + +func (cfg *FilesystemConfig) setDefaults() { + if cfg.PollInterval == 0 { + cfg.PollInterval = defaultPollInterval + } } // See minio.go for more. @@ -53,6 +65,7 @@ type MinioConfig struct { Secret string Token string Bucket string + URL string RetentionPeriod *time.Duration StripTopLevelDir bool diff --git a/internal/watcher/fake/mock_service.go b/internal/watcher/fake/mock_service.go index 20dff0fc7..0006482ac 100644 --- a/internal/watcher/fake/mock_service.go +++ b/internal/watcher/fake/mock_service.go @@ -11,7 +11,6 @@ package fake import ( context "context" - io "io" reflect "reflect" watcher "github.com/artefactual-sdps/enduro/internal/watcher" @@ -157,7 +156,7 @@ func (c *MockServiceDisposeCall) DoAndReturn(f func(context.Context, string, str } // Download mocks base method. -func (m *MockService) Download(arg0 context.Context, arg1 io.Writer, arg2, arg3 string) error { +func (m *MockService) Download(arg0 context.Context, arg1, arg2, arg3 string) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Download", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(error) @@ -183,13 +182,13 @@ func (c *MockServiceDownloadCall) Return(arg0 error) *MockServiceDownloadCall { } // Do rewrite *gomock.Call.Do -func (c *MockServiceDownloadCall) Do(f func(context.Context, io.Writer, string, string) error) *MockServiceDownloadCall { +func (c *MockServiceDownloadCall) Do(f func(context.Context, string, string, string) error) *MockServiceDownloadCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *MockServiceDownloadCall) DoAndReturn(f func(context.Context, io.Writer, string, string) error) *MockServiceDownloadCall { +func (c *MockServiceDownloadCall) DoAndReturn(f func(context.Context, string, string, string) error) *MockServiceDownloadCall { c.Call = c.Call.DoAndReturn(f) return c } diff --git a/internal/watcher/fake/mock_watcher.go b/internal/watcher/fake/mock_watcher.go index 7bdbf07f8..5e10da037 100644 --- a/internal/watcher/fake/mock_watcher.go +++ b/internal/watcher/fake/mock_watcher.go @@ -80,6 +80,44 @@ func (c *MockWatcherCompletedDirCall) DoAndReturn(f func() string) *MockWatcherC return c } +// Download mocks base method. +func (m *MockWatcher) Download(arg0 context.Context, arg1, arg2 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Download", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// Download indicates an expected call of Download. +func (mr *MockWatcherMockRecorder) Download(arg0, arg1, arg2 any) *MockWatcherDownloadCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Download", reflect.TypeOf((*MockWatcher)(nil).Download), arg0, arg1, arg2) + return &MockWatcherDownloadCall{Call: call} +} + +// MockWatcherDownloadCall wrap *gomock.Call +type MockWatcherDownloadCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockWatcherDownloadCall) Return(arg0 error) *MockWatcherDownloadCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockWatcherDownloadCall) Do(f func(context.Context, string, string) error) *MockWatcherDownloadCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockWatcherDownloadCall) DoAndReturn(f func(context.Context, string, string) error) *MockWatcherDownloadCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + // OpenBucket mocks base method. func (m *MockWatcher) OpenBucket(arg0 context.Context) (*blob.Bucket, error) { m.ctrl.T.Helper() diff --git a/internal/watcher/filesystem.go b/internal/watcher/filesystem.go index aa8d34696..8e4d05677 100644 --- a/internal/watcher/filesystem.go +++ b/internal/watcher/filesystem.go @@ -10,9 +10,11 @@ import ( "runtime" "github.com/fsnotify/fsnotify" + cp "github.com/otiai10/copy" "gocloud.dev/blob" - "gocloud.dev/blob/fileblob" + _ "gocloud.dev/blob/fileblob" + "github.com/artefactual-sdps/enduro/internal/bucket" "github.com/artefactual-sdps/enduro/internal/filenotify" "github.com/artefactual-sdps/enduro/internal/fsutil" ) @@ -20,7 +22,8 @@ import ( // filesystemWatcher implements a Watcher for watching paths in a local filesystem. type filesystemWatcher struct { ctx context.Context - fsw filenotify.FileWatcher + cfg *FilesystemConfig + fw filenotify.FileWatcher ch chan *fsnotify.Event path string regex *regexp.Regexp @@ -30,6 +33,8 @@ type filesystemWatcher struct { var _ Watcher = (*filesystemWatcher)(nil) func NewFilesystemWatcher(ctx context.Context, config *FilesystemConfig) (*filesystemWatcher, error) { + config.setDefaults() + stat, err := os.Stat(config.Path) if err != nil { return nil, fmt.Errorf("error looking up stat info: %w", err) @@ -53,20 +58,15 @@ func NewFilesystemWatcher(ctx context.Context, config *FilesystemConfig) (*files return nil, errors.New("cannot use completedDir and retentionPeriod simultaneously") } - // The inotify API isn't always available, fall back to polling. - var fsw filenotify.FileWatcher - if config.Inotify && runtime.GOOS != "windows" { - fsw, err = filenotify.New() - } else { - fsw, err = filenotify.NewPollingWatcher() - } + fw, err := fileWatcher(config) if err != nil { - return nil, fmt.Errorf("error creating filesystem watcher: %w", err) + return nil, err } w := &filesystemWatcher{ ctx: ctx, - fsw: fsw, + cfg: config, + fw: fw, ch: make(chan *fsnotify.Event, 100), path: abspath, regex: regex, @@ -80,17 +80,38 @@ func NewFilesystemWatcher(ctx context.Context, config *FilesystemConfig) (*files go w.loop() - if err := fsw.Add(abspath); err != nil { + if err := fw.Add(abspath); err != nil { return nil, fmt.Errorf("error configuring filesystem watcher: %w", err) } return w, nil } +func fileWatcher(cfg *FilesystemConfig) (filenotify.FileWatcher, error) { + var ( + fsw filenotify.FileWatcher + err error + ) + + // The inotify API isn't always available, fall back to polling. + if cfg.Inotify && runtime.GOOS != "windows" { + fsw, err = filenotify.New(filenotify.Config{PollInterval: cfg.PollInterval}) + } else { + fsw, err = filenotify.NewPollingWatcher( + filenotify.Config{PollInterval: cfg.PollInterval}, + ) + } + if err != nil { + return nil, fmt.Errorf("error creating filesystem watcher: %w", err) + } + + return fsw, nil +} + func (w *filesystemWatcher) loop() { for { select { - case event, ok := <-w.fsw.Events(): + case event, ok := <-w.fw.Events(): if !ok { continue } @@ -104,12 +125,12 @@ func (w *filesystemWatcher) loop() { continue } w.ch <- &event - case _, ok := <-w.fsw.Errors(): + case _, ok := <-w.fw.Errors(): if !ok { continue } case <-w.ctx.Done(): - _ = w.fsw.Close() + _ = w.fw.Close() close(w.ch) return } @@ -136,8 +157,10 @@ func (w *filesystemWatcher) Path() string { return w.path } -func (w *filesystemWatcher) OpenBucket(context.Context) (*blob.Bucket, error) { - return fileblob.OpenBucket(w.path, nil) +func (w *filesystemWatcher) OpenBucket(ctx context.Context) (*blob.Bucket, error) { + return bucket.Open(ctx, &bucket.Config{ + URL: fmt.Sprintf("file://%s", w.path), + }) } func (w *filesystemWatcher) RemoveAll(key string) error { @@ -154,3 +177,15 @@ func (w *filesystemWatcher) Dispose(key string) error { return fsutil.Move(src, dst) } + +// Download recursively copies the contents of key to dest. Key may be the name +// of a directory or file. +func (w *filesystemWatcher) Download(ctx context.Context, dest, key string) error { + src := filepath.Clean(filepath.Join(w.path, key)) + dest = filepath.Clean(filepath.Join(dest, key)) + if err := cp.Copy(src, dest); err != nil { + return fmt.Errorf("filesystem watcher: download: %v", err) + } + + return nil +} diff --git a/internal/watcher/filesystem_test.go b/internal/watcher/filesystem_test.go new file mode 100644 index 000000000..1a93cdf6a --- /dev/null +++ b/internal/watcher/filesystem_test.go @@ -0,0 +1,198 @@ +package watcher_test + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "gotest.tools/v3/assert" + "gotest.tools/v3/fs" + "gotest.tools/v3/poll" + + "github.com/artefactual-sdps/enduro/internal/watcher" +) + +type file struct { + name string + contents []byte +} + +func TestFileSystemWatcher(t *testing.T) { + td := fs.NewDir(t, "enduro-test-fs-watcher") + type test struct { + name string + config *watcher.FilesystemConfig + file file + want *watcher.BlobEvent + } + for _, tt := range []test{ + { + name: "Polling watcher returns a blob event", + config: &watcher.FilesystemConfig{ + Name: "filesystem", + Path: t.TempDir(), + PollInterval: time.Millisecond * 5, + }, + file: file{name: "test.txt"}, + want: &watcher.BlobEvent{Key: "test.txt"}, + }, + { + name: "Inotify watcher returns a blob event", + config: &watcher.FilesystemConfig{ + Name: "filesystem", + Path: t.TempDir(), + Inotify: true, + }, + file: file{name: "test.txt"}, + want: &watcher.BlobEvent{Key: "test.txt"}, + }, + } { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + w, err := watcher.NewFilesystemWatcher(ctx, tt.config) + assert.NilError(t, err) + + check := func(t poll.LogT) poll.Result { + got, _, err := w.Watch(ctx) + if err != nil { + return poll.Error(fmt.Errorf("watcher error: %w", err)) + } + if got.Key != tt.want.Key || got.IsDir != tt.want.IsDir { + return poll.Error(fmt.Errorf( + "expected: *watcher.BlobEvent(Key: %q, IsDir: %t); got: *watcher.BlobEvent(Key: %q, IsDir: %t)", + tt.want.Key, tt.want.IsDir, got.Key, got.IsDir, + )) + } + + return poll.Success() + } + + if err = os.WriteFile( + filepath.Join(tt.config.Path, tt.file.name), + tt.file.contents, + 0o600, + ); err != nil { + t.Fatalf("Couldn't create text.txt in %q", td.Path()) + } + + poll.WaitOn(t, check, poll.WithTimeout(time.Millisecond*15)) + }) + } + + t.Run("Path returns the watcher path", func(t *testing.T) { + t.Parallel() + + td := t.TempDir() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + w, err := watcher.NewFilesystemWatcher(ctx, &watcher.FilesystemConfig{ + Name: "filesystem", + Path: td, + }) + assert.NilError(t, err) + assert.Equal(t, w.Path(), td) + }) + + t.Run("OpenBucket returns a bucket", func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + w, err := watcher.NewFilesystemWatcher(ctx, &watcher.FilesystemConfig{ + Name: "filesystem", + Path: t.TempDir(), + }) + assert.NilError(t, err) + + b, err := w.OpenBucket(ctx) + assert.NilError(t, err) + assert.Equal(t, fmt.Sprintf("%T", b), "*blob.Bucket") + b.Close() + }) + + t.Run("RemoveAll deletes a directory", func(t *testing.T) { + t.Parallel() + + td := fs.NewDir(t, "enduro-test-fswatcher", + fs.WithDir("transfer", fs.WithFile("test.txt", "A test file.")), + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + w, err := watcher.NewFilesystemWatcher(ctx, &watcher.FilesystemConfig{ + Name: "filesystem", + Path: td.Path(), + }) + assert.NilError(t, err) + + err = w.RemoveAll("transfer") + assert.NilError(t, err) + assert.Assert(t, fs.Equal(w.Path(), fs.Expected(t))) + }) + + t.Run("Dispose moves transfer to CompletedDir", func(t *testing.T) { + t.Parallel() + + src := fs.NewDir(t, "enduro-test-fswatcher", + fs.WithDir("transfer", fs.WithFile("test.txt", "A test file.")), + ) + dest := fs.NewDir(t, "enduro-test-fswatcher") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + w, err := watcher.NewFilesystemWatcher(ctx, &watcher.FilesystemConfig{ + Name: "filesystem", + Path: src.Path(), + CompletedDir: dest.Path(), + }) + assert.NilError(t, err) + + err = w.Dispose("transfer") + assert.NilError(t, err) + assert.Assert(t, fs.Equal(dest.Path(), fs.Expected(t, + fs.WithDir("transfer", fs.WithFile("test.txt", "A test file.")), + ))) + }) + + t.Run("Download copies a directory", func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + src := fs.NewDir(t, "enduro-test-fswatcher", + fs.WithDir("transfer", + fs.WithFile("test.txt", "A test file."), + fs.WithFile("test2", "Another test file."), + ), + ) + dest := fs.NewDir(t, "enduro-test-fswatcher") + + w, err := watcher.NewFilesystemWatcher(ctx, &watcher.FilesystemConfig{ + Name: "filesystem", + Path: src.Path(), + Inotify: true, + }) + assert.NilError(t, err) + + err = w.Download(context.Background(), dest.Path(), "transfer") + assert.NilError(t, err) + assert.Assert(t, fs.Equal(dest.Path(), fs.Expected(t, fs.WithMode(0o700), + fs.WithDir("transfer", fs.WithMode(0o755), + fs.WithFile("test.txt", "A test file.", fs.WithMode(0o644)), + fs.WithFile("test2", "Another test file.", fs.WithMode(0o644)), + ), + ))) + }) +} diff --git a/internal/watcher/minio.go b/internal/watcher/minio.go index 726522811..b16df4315 100644 --- a/internal/watcher/minio.go +++ b/internal/watcher/minio.go @@ -5,7 +5,9 @@ import ( "encoding/json" "errors" "fmt" + "io" "net/url" + "os" "time" "github.com/go-logr/logr" @@ -50,6 +52,7 @@ func NewMinioWatcher(ctx context.Context, logger logr.Logger, config *MinioConfi Profile: config.Profile, Region: config.Region, PathStyle: config.PathStyle, + URL: config.URL, } if config.RedisFailedList == "" { @@ -146,3 +149,33 @@ func (w *minioWatcher) event(blob string) (*BlobEvent, error) { func (w *minioWatcher) OpenBucket(ctx context.Context) (*blob.Bucket, error) { return bucket.Open(ctx, w.bucketConfig) } + +// Download copies the contents of the blob identified by key to dest. +func (w *minioWatcher) Download(ctx context.Context, dest, key string) error { + bucket, err := w.OpenBucket(ctx) + if err != nil { + return fmt.Errorf("error opening bucket: %w", err) + } + defer bucket.Close() + + reader, err := bucket.NewReader(ctx, key, nil) + if err != nil { + return fmt.Errorf("error creating reader: %w", err) + } + defer reader.Close() + + writer, err := os.Create(dest) // #nosec G304 -- trusted file path. + if err != nil { + return fmt.Errorf("error creating writer: %w", err) + } + defer writer.Close() + + if _, err := io.Copy(writer, reader); err != nil { + return fmt.Errorf("error copying blob: %w", err) + } + + // Try to set the file mode but ignore any errors. + _ = os.Chmod(dest, 0o600) + + return nil +} diff --git a/internal/watcher/minio_test.go b/internal/watcher/minio_test.go index 9832ef852..60f971bb0 100644 --- a/internal/watcher/minio_test.go +++ b/internal/watcher/minio_test.go @@ -10,12 +10,14 @@ import ( "github.com/alicebob/miniredis/v2" "github.com/go-logr/logr" + "gotest.tools/v3/assert" + "gotest.tools/v3/fs" "gotest.tools/v3/poll" "github.com/artefactual-sdps/enduro/internal/watcher" ) -func newWatcher(t *testing.T) (*miniredis.Miniredis, watcher.Watcher) { +func newWatcher(t *testing.T, updateCfg func(c *watcher.MinioConfig)) (*miniredis.Miniredis, watcher.Watcher) { t.Helper() m, err := miniredis.Run() @@ -24,7 +26,9 @@ func newWatcher(t *testing.T) (*miniredis.Miniredis, watcher.Watcher) { } dur := time.Duration(time.Second) - config := watcher.MinioConfig{ + + // Default config. + config := &watcher.MinioConfig{ Name: "minio-watcher", RedisAddress: fmt.Sprintf("redis://%s", m.Addr()), RedisList: "minio-events", @@ -40,9 +44,12 @@ func newWatcher(t *testing.T) (*miniredis.Miniredis, watcher.Watcher) { StripTopLevelDir: true, } - var w watcher.Watcher - logger := logr.Discard() - w, err = watcher.NewMinioWatcher(context.Background(), logger, &config) + // Modify default config. + if updateCfg != nil { + updateCfg(config) + } + + w, err := watcher.NewMinioWatcher(context.Background(), logr.Discard(), config) if err != nil { t.Fatal(err) } @@ -57,7 +64,7 @@ func cleanup(t *testing.T, m *miniredis.Miniredis) { } func TestWatcherReturnsErrWhenNoMessages(t *testing.T) { - m, w := newWatcher(t) + m, w := newWatcher(t, nil) defer cleanup(t, m) // TODO: slow test, should inject smaller timeout. @@ -80,7 +87,7 @@ func TestWatcherReturnsErrWhenNoMessages(t *testing.T) { } func TestWatcherReturnsErrOnInvalidMessages(t *testing.T) { - m, w := newWatcher(t) + m, w := newWatcher(t, nil) defer cleanup(t, m) m.Lpush("minio-events", "{}") @@ -103,7 +110,7 @@ func TestWatcherReturnsErrOnInvalidMessages(t *testing.T) { } func TestWatcherReturnsErrOnMessageInWrongBucket(t *testing.T) { - m, w := newWatcher(t) + m, w := newWatcher(t, nil) defer cleanup(t, m) // Message with a bucket we're not watching. @@ -180,7 +187,7 @@ func TestWatcherReturnsErrOnMessageInWrongBucket(t *testing.T) { } func TestWatcherReturnsOnValidMessage(t *testing.T) { - m, w := newWatcher(t) + m, w := newWatcher(t, nil) defer cleanup(t, m) m.Lpush("minio-events", `[ @@ -254,7 +261,7 @@ func TestWatcherReturnsOnValidMessage(t *testing.T) { } func TestWatcherReturnsDecodedObjectKey(t *testing.T) { - m, w := newWatcher(t) + m, w := newWatcher(t, nil) defer cleanup(t, m) // Message with an encoded object key @@ -292,7 +299,7 @@ func TestWatcherReturnsDecodedObjectKey(t *testing.T) { } func TestWatcherReturnsErrOnInvalidObjectKey(t *testing.T) { - m, w := newWatcher(t) + m, w := newWatcher(t, nil) defer cleanup(t, m) // Message with an invalid encoded object key @@ -328,3 +335,27 @@ func TestWatcherReturnsErrOnInvalidObjectKey(t *testing.T) { poll.WaitOn(t, check, poll.WithTimeout(time.Second*3)) } + +func TestMinioWatcherDownload(t *testing.T) { + t.Run("Downloads a file", func(t *testing.T) { + t.Parallel() + + wd := fs.NewDir(t, "enduro-test-minio-watcher", + fs.WithFile("test", "A test file."), + ) + m, w := newWatcher(t, func(c *watcher.MinioConfig) { + c.URL = fmt.Sprintf("file://%s", wd.Path()) + }) + defer cleanup(t, m) + + dest := fs.NewDir(t, "enduro-test-minio-watcher") + err := w.Download(context.Background(), dest.Join("test"), "test") + assert.NilError(t, err) + assert.Assert(t, fs.Equal( + dest.Path(), + fs.Expected(t, + fs.WithFile("test", "A test file.", fs.WithMode(0o600)), + ), + )) + }) +} diff --git a/internal/watcher/watcher.go b/internal/watcher/watcher.go index 4e986d12b..3f59d6b62 100644 --- a/internal/watcher/watcher.go +++ b/internal/watcher/watcher.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "io" "os" "sync" "time" @@ -31,8 +30,12 @@ type Watcher interface { // Implementors must not return a nil function. Watch(ctx context.Context) (*BlobEvent, Cleanup, error) + // Download copies the file or directory identified by key to dest. + Download(ctx context.Context, dest, key string) error + // OpenBucket returns the bucket where the blobs can be found. OpenBucket(ctx context.Context) (*blob.Bucket, error) + RetentionPeriod() *time.Duration CompletedDir() string StripTopLevelDir() bool @@ -73,8 +76,9 @@ type Service interface { // Return a watcher given its name. ByName(name string) (Watcher, error) - // Download blob given an event. - Download(ctx context.Context, w io.Writer, watcherName, key string) error + // Download copies the watcherName file or directory identified by key to + // dest. + Download(ctx context.Context, dest, watcherName, key string) error // Delete blob given an event. Delete(ctx context.Context, watcherName, key string) error @@ -147,29 +151,13 @@ func (svc *serviceImpl) ByName(name string) (Watcher, error) { return svc.watcher(name) } -func (svc *serviceImpl) Download(ctx context.Context, writer io.Writer, watcherName, key string) error { +func (svc *serviceImpl) Download(ctx context.Context, dest, watcherName, key string) error { w, err := svc.watcher(watcherName) if err != nil { return err } - bucket, err := w.OpenBucket(ctx) - if err != nil { - return fmt.Errorf("error opening bucket: %w", err) - } - defer bucket.Close() - - reader, err := bucket.NewReader(ctx, key, nil) - if err != nil { - return fmt.Errorf("error creating reader: %w", err) - } - defer reader.Close() - - if _, err := io.Copy(writer, reader); err != nil { - return fmt.Errorf("error copying blob: %w", err) - } - - return nil + return w.Download(ctx, dest, key) } func (svc *serviceImpl) Delete(ctx context.Context, watcherName, key string) error { diff --git a/internal/workflow/activities/download.go b/internal/workflow/activities/download.go index dbb6e0584..ce094b950 100644 --- a/internal/workflow/activities/download.go +++ b/internal/workflow/activities/download.go @@ -35,8 +35,6 @@ func NewDownloadActivity(logger logr.Logger, wsvc watcher.Service) *DownloadActi } func (a *DownloadActivity) Execute(ctx context.Context, params *DownloadActivityParams) (*DownloadActivityResult, error) { - var err error - a.logger.V(1).Info("Executing DownloadActivity", "Key", params.Key, "WatcherName", params.WatcherName, @@ -48,14 +46,8 @@ func (a *DownloadActivity) Execute(ctx context.Context, params *DownloadActivity } dest := filepath.Clean(filepath.Join(destDir, params.Key)) - writer, err := os.Create(dest) - if err != nil { - return nil, fmt.Errorf("create file: %v", err) - } - defer writer.Close() //#nosec G307 -- Errors returned by Close() here do not require specific handling. - - if err := a.wsvc.Download(ctx, writer, params.WatcherName, params.Key); err != nil { - return nil, temporal_tools.NewNonRetryableError(fmt.Errorf("download blob: %v", err)) + if err := a.wsvc.Download(ctx, dest, params.WatcherName, params.Key); err != nil { + return nil, temporal_tools.NewNonRetryableError(fmt.Errorf("download: %v", err)) } return &DownloadActivityResult{Path: dest}, nil diff --git a/internal/workflow/activities/download_test.go b/internal/workflow/activities/download_test.go index 4c9f2a781..55b65b1d8 100644 --- a/internal/workflow/activities/download_test.go +++ b/internal/workflow/activities/download_test.go @@ -3,7 +3,6 @@ package activities_test import ( "context" "fmt" - "io" "os" "testing" @@ -20,7 +19,7 @@ import ( ) func TestDownloadActivity(t *testing.T) { - key := "transfer.zip" + key := "jabber.txt" watcherName := "watcher" type test struct { @@ -38,9 +37,15 @@ func TestDownloadActivity(t *testing.T) { WatcherName: watcherName, }, rec: func(r *watcherfake.MockServiceMockRecorder) { - r.Download(mockutil.Context(), gomock.AssignableToTypeOf((*os.File)(nil)), watcherName, key). - DoAndReturn(func(ctx context.Context, w io.Writer, watcherName, key string) error { - _, err := w.Write([]byte("’Twas brillig, and the slithy toves Did gyre and gimble in the wabe:")) + var T string + r.Download(mockutil.Context(), gomock.AssignableToTypeOf(T), watcherName, key). + DoAndReturn(func(ctx context.Context, dest, watcherName, key string) error { + w, err := os.Create(dest) + if err != nil { + return err + } + + _, err = w.Write([]byte("’Twas brillig, and the slithy toves Did gyre and gimble in the wabe:")) return err }) }, @@ -53,11 +58,12 @@ func TestDownloadActivity(t *testing.T) { WatcherName: watcherName, }, rec: func(r *watcherfake.MockServiceMockRecorder) { - r.Download(mockutil.Context(), gomock.AssignableToTypeOf((*os.File)(nil)), watcherName, key).Return( + var T string + r.Download(mockutil.Context(), gomock.AssignableToTypeOf(T), watcherName, key).Return( fmt.Errorf("error loading watcher: unknown watcher %s", watcherName), ) }, - wantErr: fmt.Sprintf("activity error (type: download-activity, scheduledEventID: 0, startedEventID: 0, identity: ): download blob: error loading watcher: unknown watcher %s", watcherName), + wantErr: fmt.Sprintf("activity error (type: download-activity, scheduledEventID: 0, startedEventID: 0, identity: ): download: error loading watcher: unknown watcher %s", watcherName), }, } { t.Run(tt.name, func(t *testing.T) {