Skip to content

Commit

Permalink
Standardize watcher Download() interface
Browse files Browse the repository at this point in the history
Refs #867

- Update existing watcher Download() method and interface signatures to
  take a destination filepath (string) instead of an io.Writer stream
- Add a `filewatcher.Download()` method
- Create a `minio.Download()` method and move implementation specific
  code from `serviceImpl.Download()` to it
- Update mocks
  • Loading branch information
djjuhasz committed Feb 28, 2024
1 parent b0e5362 commit 72b2994
Show file tree
Hide file tree
Showing 12 changed files with 230 additions and 77 deletions.
16 changes: 11 additions & 5 deletions internal/filenotify/filenotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@
package filenotify

import (
"time"

"github.com/fsnotify/fsnotify"
"github.com/radovskyb/watcher"
)

// FileWatcher is an interface for implementing file notification watchers
type Config struct {
PollInterval time.Duration
}

// FileWatcher is an interface for implementing file notification watchers.
type FileWatcher interface {
Events() <-chan fsnotify.Event
Errors() <-chan error
Expand All @@ -18,11 +24,11 @@ type FileWatcher interface {
}

// New tries to use an fs-event watcher, and falls back to the poller if there is an error
func New() (FileWatcher, error) {
func New(cfg Config) (FileWatcher, error) {

Check warning on line 27 in internal/filenotify/filenotify.go

View check run for this annotation

Codecov / codecov/patch

internal/filenotify/filenotify.go#L27

Added line #L27 was not covered by tests
if watcher, err := NewEventWatcher(); err == nil {
return watcher, nil
}
return NewPollingWatcher()
return NewPollingWatcher(cfg)
}

Check warning on line 33 in internal/filenotify/filenotify.go

View check run for this annotation

Codecov / codecov/patch

internal/filenotify/filenotify.go#L31-L33

Added lines #L31 - L33 were not covered by tests
// NewEventWatcher returns an fs-event based file watcher
Expand All @@ -37,7 +43,7 @@ func NewEventWatcher() (FileWatcher, error) {
}

// NewPollingWatcher returns a poll-based file watcher
func NewPollingWatcher() (FileWatcher, error) {
func NewPollingWatcher(cfg Config) (FileWatcher, error) {
poller := &filePoller{
wr: watcher.New(),
events: make(chan fsnotify.Event),
Expand All @@ -49,7 +55,7 @@ func NewPollingWatcher() (FileWatcher, error) {
done := make(chan error)
{
go func() {
err := poller.wr.Start(watchWaitTime)
err := poller.wr.Start(cfg.PollInterval)
if err != nil {
done <- err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package filenotify
package filenotify_test

import (
"fmt"
Expand All @@ -8,10 +8,16 @@ import (
"time"

"github.com/fsnotify/fsnotify"

"github.com/artefactual-sdps/enduro/internal/filenotify"
)

const pollInterval = time.Millisecond * 5

func TestPollerEvent(t *testing.T) {
w, err := NewPollingWatcher()
w, err := filenotify.NewPollingWatcher(
filenotify.Config{PollInterval: pollInterval},
)
if err != nil {
t.Fatal("error creating poller")
}
Expand Down Expand Up @@ -44,7 +50,7 @@ func TestPollerEvent(t *testing.T) {
}
}

func assertEvent(w FileWatcher, eType fsnotify.Op) error {
func assertEvent(w filenotify.FileWatcher, eType fsnotify.Op) error {
var err error
select {
case e := <-w.Events():
Expand All @@ -53,7 +59,7 @@ func assertEvent(w FileWatcher, eType fsnotify.Op) error {
}
case e := <-w.Errors():
err = fmt.Errorf("got unexpected error waiting for events %v: %v", eType, e)
case <-time.After(watchWaitTime * 3):
case <-time.After(pollInterval * 2):
err = fmt.Errorf("timeout waiting for event %v", eType)
}
return err
Expand Down
9 changes: 2 additions & 7 deletions internal/filenotify/poller.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
package filenotify

import (
"time"

"github.com/fsnotify/fsnotify"
"github.com/radovskyb/watcher"
)

// watchWaitTime is the time to wait between file poll loops
const watchWaitTime = 200 * time.Millisecond

// filePoller is used to poll files for changes, especially in cases where fsnotify
// can't be run (e.g. when inotify handles are exhausted)
// filePoller satisfies the FileWatcher interface
Expand All @@ -33,9 +28,9 @@ func (w *filePoller) loop() {
switch event.Op {
case watcher.Create:
op = fsnotify.Create
case watcher.Rename:
fallthrough
case watcher.Move:
fallthrough
case watcher.Rename:
op = fsnotify.Rename
default:
continue
Expand Down
22 changes: 17 additions & 5 deletions internal/watcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"time"
)

const defaultPollInterval = 200 * time.Millisecond

type Config struct {
Filesystem []*FilesystemConfig
Minio []*MinioConfig
Expand All @@ -29,14 +31,24 @@ func (c Config) CompletedDirs() []string {

// See filesystem.go for more.
type FilesystemConfig struct {
Name string
Path string
Inotify bool
Ignore string
Name string
Path string
CompletedDir string
Ignore string
Inotify bool

RetentionPeriod *time.Duration
CompletedDir string
StripTopLevelDir bool

// PollInterval sets the length of time between filesystem polls (default:
// 200ms). If Inotify is true then PollInterval is ignored.
PollInterval time.Duration
}

func (cfg *FilesystemConfig) SetDefaults() {
if cfg.PollInterval == 0 {
cfg.PollInterval = defaultPollInterval

Check warning on line 50 in internal/watcher/config.go

View check run for this annotation

Codecov / codecov/patch

internal/watcher/config.go#L50

Added line #L50 was not covered by tests
}
}

// See minio.go for more.
Expand Down
7 changes: 3 additions & 4 deletions internal/watcher/fake/mock_service.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 38 additions & 0 deletions internal/watcher/fake/mock_watcher.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 45 additions & 14 deletions internal/watcher/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"runtime"

"github.com/fsnotify/fsnotify"
cp "github.com/otiai10/copy"
"gocloud.dev/blob"
"gocloud.dev/blob/fileblob"

Expand All @@ -20,7 +21,8 @@ import (
// filesystemWatcher implements a Watcher for watching paths in a local filesystem.
type filesystemWatcher struct {
ctx context.Context
fsw filenotify.FileWatcher
cfg *FilesystemConfig
fw filenotify.FileWatcher
ch chan *fsnotify.Event
path string
regex *regexp.Regexp
Expand All @@ -30,6 +32,8 @@ type filesystemWatcher struct {
var _ Watcher = (*filesystemWatcher)(nil)

func NewFilesystemWatcher(ctx context.Context, config *FilesystemConfig) (*filesystemWatcher, error) {
config.SetDefaults()

stat, err := os.Stat(config.Path)
if err != nil {
return nil, fmt.Errorf("error looking up stat info: %w", err)
Expand All @@ -53,20 +57,15 @@ func NewFilesystemWatcher(ctx context.Context, config *FilesystemConfig) (*files
return nil, errors.New("cannot use completedDir and retentionPeriod simultaneously")
}

// The inotify API isn't always available, fall back to polling.
var fsw filenotify.FileWatcher
if config.Inotify && runtime.GOOS != "windows" {
fsw, err = filenotify.New()
} else {
fsw, err = filenotify.NewPollingWatcher()
}
fw, err := fileWatcher(config)
if err != nil {
return nil, fmt.Errorf("error creating filesystem watcher: %w", err)
return nil, err

Check warning on line 62 in internal/watcher/filesystem.go

View check run for this annotation

Codecov / codecov/patch

internal/watcher/filesystem.go#L62

Added line #L62 was not covered by tests
}

w := &filesystemWatcher{
ctx: ctx,
fsw: fsw,
cfg: config,
fw: fw,
ch: make(chan *fsnotify.Event, 100),
path: abspath,
regex: regex,
Expand All @@ -80,17 +79,38 @@ func NewFilesystemWatcher(ctx context.Context, config *FilesystemConfig) (*files

go w.loop()

if err := fsw.Add(abspath); err != nil {
if err := fw.Add(abspath); err != nil {
return nil, fmt.Errorf("error configuring filesystem watcher: %w", err)
}

return w, nil
}

func fileWatcher(cfg *FilesystemConfig) (filenotify.FileWatcher, error) {
var (
fsw filenotify.FileWatcher
err error
)

// The inotify API isn't always available, fall back to polling.
if cfg.Inotify && runtime.GOOS != "windows" {
fsw, err = filenotify.New(filenotify.Config{PollInterval: cfg.PollInterval})

Check warning on line 97 in internal/watcher/filesystem.go

View check run for this annotation

Codecov / codecov/patch

internal/watcher/filesystem.go#L97

Added line #L97 was not covered by tests
} else {
fsw, err = filenotify.NewPollingWatcher(
filenotify.Config{PollInterval: cfg.PollInterval},
)
}
if err != nil {
return nil, fmt.Errorf("error creating filesystem watcher: %w", err)

Check warning on line 104 in internal/watcher/filesystem.go

View check run for this annotation

Codecov / codecov/patch

internal/watcher/filesystem.go#L104

Added line #L104 was not covered by tests
}

return fsw, nil
}

func (w *filesystemWatcher) loop() {
for {
select {
case event, ok := <-w.fsw.Events():
case event, ok := <-w.fw.Events():
if !ok {
continue
}
Expand All @@ -104,12 +124,12 @@ func (w *filesystemWatcher) loop() {
continue
}
w.ch <- &event
case _, ok := <-w.fsw.Errors():
case _, ok := <-w.fw.Errors():
if !ok {
continue
}
case <-w.ctx.Done():
_ = w.fsw.Close()
_ = w.fw.Close()

Check warning on line 132 in internal/watcher/filesystem.go

View check run for this annotation

Codecov / codecov/patch

internal/watcher/filesystem.go#L132

Added line #L132 was not covered by tests
close(w.ch)
return
}
Expand Down Expand Up @@ -154,3 +174,14 @@ func (w *filesystemWatcher) Dispose(key string) error {

return fsutil.Move(src, dst)
}

// Download recursively copies the contents of key to dest. Key may be the name
// of a directory or file.
func (w *filesystemWatcher) Download(ctx context.Context, dest, key string) error {
src := filepath.Clean(filepath.Join(w.path, key))
if err := cp.Copy(src, dest); err != nil {
return fmt.Errorf("filesystem watcher: download: %v", err)

Check warning on line 183 in internal/watcher/filesystem.go

View check run for this annotation

Codecov / codecov/patch

internal/watcher/filesystem.go#L180-L183

Added lines #L180 - L183 were not covered by tests
}

return nil

Check warning on line 186 in internal/watcher/filesystem.go

View check run for this annotation

Codecov / codecov/patch

internal/watcher/filesystem.go#L186

Added line #L186 was not covered by tests
}
Loading

0 comments on commit 72b2994

Please sign in to comment.