Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Standardize watcher Download() interface #868

Merged
merged 1 commit into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions internal/filenotify/filenotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@
package filenotify

import (
"time"

"github.com/fsnotify/fsnotify"
"github.com/radovskyb/watcher"
)

// FileWatcher is an interface for implementing file notification watchers
type Config struct {
PollInterval time.Duration
}

// FileWatcher is an interface for implementing file notification watchers.
type FileWatcher interface {
Events() <-chan fsnotify.Event
Errors() <-chan error
Expand All @@ -18,13 +24,13 @@
}

// New tries to use an fs-event watcher, and falls back to the poller if there is an error
func New() (FileWatcher, error) {
func New(cfg Config) (FileWatcher, error) {

Check warning on line 27 in internal/filenotify/filenotify.go

View check run for this annotation

Codecov / codecov/patch

internal/filenotify/filenotify.go#L27

Added line #L27 was not covered by tests
if watcher, err := NewEventWatcher(); err == nil {
return watcher, nil
}
return NewPollingWatcher()
return NewPollingWatcher(cfg)
}

Check warning on line 33 in internal/filenotify/filenotify.go

View check run for this annotation

Codecov / codecov/patch

internal/filenotify/filenotify.go#L31-L33

Added lines #L31 - L33 were not covered by tests
// NewEventWatcher returns an fs-event based file watcher
func NewEventWatcher() (FileWatcher, error) {
watcher, err := fsnotify.NewWatcher()
Expand All @@ -37,7 +43,7 @@
}

// NewPollingWatcher returns a poll-based file watcher
func NewPollingWatcher() (FileWatcher, error) {
func NewPollingWatcher(cfg Config) (FileWatcher, error) {
poller := &filePoller{
wr: watcher.New(),
events: make(chan fsnotify.Event),
Expand All @@ -49,7 +55,7 @@
done := make(chan error)
{
go func() {
err := poller.wr.Start(watchWaitTime)
err := poller.wr.Start(cfg.PollInterval)
if err != nil {
done <- err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package filenotify
package filenotify_test

import (
"fmt"
Expand All @@ -8,10 +8,16 @@ import (
"time"

"github.com/fsnotify/fsnotify"

"github.com/artefactual-sdps/enduro/internal/filenotify"
)

const pollInterval = time.Millisecond * 5

func TestPollerEvent(t *testing.T) {
w, err := NewPollingWatcher()
w, err := filenotify.NewPollingWatcher(
filenotify.Config{PollInterval: pollInterval},
)
if err != nil {
t.Fatal("error creating poller")
}
Expand Down Expand Up @@ -44,7 +50,7 @@ func TestPollerEvent(t *testing.T) {
}
}

func assertEvent(w FileWatcher, eType fsnotify.Op) error {
func assertEvent(w filenotify.FileWatcher, eType fsnotify.Op) error {
var err error
select {
case e := <-w.Events():
Expand All @@ -53,7 +59,7 @@ func assertEvent(w FileWatcher, eType fsnotify.Op) error {
}
case e := <-w.Errors():
err = fmt.Errorf("got unexpected error waiting for events %v: %v", eType, e)
case <-time.After(watchWaitTime * 3):
case <-time.After(pollInterval * 2):
err = fmt.Errorf("timeout waiting for event %v", eType)
}
return err
Expand Down
9 changes: 2 additions & 7 deletions internal/filenotify/poller.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
package filenotify

import (
"time"

"github.com/fsnotify/fsnotify"
"github.com/radovskyb/watcher"
)

// watchWaitTime is the time to wait between file poll loops
const watchWaitTime = 200 * time.Millisecond

// filePoller is used to poll files for changes, especially in cases where fsnotify
// can't be run (e.g. when inotify handles are exhausted)
// filePoller satisfies the FileWatcher interface
Expand All @@ -33,9 +28,9 @@ func (w *filePoller) loop() {
switch event.Op {
case watcher.Create:
op = fsnotify.Create
case watcher.Rename:
fallthrough
case watcher.Move:
fallthrough
case watcher.Rename:
op = fsnotify.Rename
default:
continue
Expand Down
23 changes: 18 additions & 5 deletions internal/watcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"time"
)

const defaultPollInterval = 200 * time.Millisecond

type Config struct {
Filesystem []*FilesystemConfig
Minio []*MinioConfig
Expand All @@ -29,14 +31,24 @@ func (c Config) CompletedDirs() []string {

// See filesystem.go for more.
type FilesystemConfig struct {
Name string
Path string
Inotify bool
Ignore string
Name string
Path string
CompletedDir string
Ignore string
Inotify bool

RetentionPeriod *time.Duration
CompletedDir string
StripTopLevelDir bool

// PollInterval sets the length of time between filesystem polls (default:
// 200ms). If Inotify is true then PollInterval is ignored.
PollInterval time.Duration
}

func (cfg *FilesystemConfig) setDefaults() {
if cfg.PollInterval == 0 {
cfg.PollInterval = defaultPollInterval
}
}

// See minio.go for more.
Expand All @@ -53,6 +65,7 @@ type MinioConfig struct {
Secret string
Token string
Bucket string
URL string

RetentionPeriod *time.Duration
StripTopLevelDir bool
Expand Down
7 changes: 3 additions & 4 deletions internal/watcher/fake/mock_service.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 38 additions & 0 deletions internal/watcher/fake/mock_watcher.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 52 additions & 17 deletions internal/watcher/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,20 @@
"runtime"

"github.com/fsnotify/fsnotify"
cp "github.com/otiai10/copy"
"gocloud.dev/blob"
"gocloud.dev/blob/fileblob"
_ "gocloud.dev/blob/fileblob"

"github.com/artefactual-sdps/enduro/internal/bucket"
"github.com/artefactual-sdps/enduro/internal/filenotify"
"github.com/artefactual-sdps/enduro/internal/fsutil"
)

// filesystemWatcher implements a Watcher for watching paths in a local filesystem.
type filesystemWatcher struct {
ctx context.Context
fsw filenotify.FileWatcher
cfg *FilesystemConfig
fw filenotify.FileWatcher
ch chan *fsnotify.Event
path string
regex *regexp.Regexp
Expand All @@ -30,6 +33,8 @@
var _ Watcher = (*filesystemWatcher)(nil)

func NewFilesystemWatcher(ctx context.Context, config *FilesystemConfig) (*filesystemWatcher, error) {
config.setDefaults()

stat, err := os.Stat(config.Path)
if err != nil {
return nil, fmt.Errorf("error looking up stat info: %w", err)
Expand All @@ -53,20 +58,15 @@
return nil, errors.New("cannot use completedDir and retentionPeriod simultaneously")
}

// The inotify API isn't always available, fall back to polling.
var fsw filenotify.FileWatcher
if config.Inotify && runtime.GOOS != "windows" {
fsw, err = filenotify.New()
} else {
fsw, err = filenotify.NewPollingWatcher()
}
fw, err := fileWatcher(config)
if err != nil {
return nil, fmt.Errorf("error creating filesystem watcher: %w", err)
return nil, err

Check warning on line 63 in internal/watcher/filesystem.go

View check run for this annotation

Codecov / codecov/patch

internal/watcher/filesystem.go#L63

Added line #L63 was not covered by tests
}

w := &filesystemWatcher{
ctx: ctx,
fsw: fsw,
cfg: config,
fw: fw,
ch: make(chan *fsnotify.Event, 100),
path: abspath,
regex: regex,
Expand All @@ -80,17 +80,38 @@

go w.loop()

if err := fsw.Add(abspath); err != nil {
if err := fw.Add(abspath); err != nil {
return nil, fmt.Errorf("error configuring filesystem watcher: %w", err)
}

return w, nil
}

func fileWatcher(cfg *FilesystemConfig) (filenotify.FileWatcher, error) {
var (
fsw filenotify.FileWatcher
err error
)

// The inotify API isn't always available, fall back to polling.
if cfg.Inotify && runtime.GOOS != "windows" {
fsw, err = filenotify.New(filenotify.Config{PollInterval: cfg.PollInterval})
} else {
fsw, err = filenotify.NewPollingWatcher(
filenotify.Config{PollInterval: cfg.PollInterval},
)
}
if err != nil {
return nil, fmt.Errorf("error creating filesystem watcher: %w", err)

Check warning on line 105 in internal/watcher/filesystem.go

View check run for this annotation

Codecov / codecov/patch

internal/watcher/filesystem.go#L105

Added line #L105 was not covered by tests
}

return fsw, nil
}

func (w *filesystemWatcher) loop() {
for {
select {
case event, ok := <-w.fsw.Events():
case event, ok := <-w.fw.Events():
if !ok {
continue
}
Expand All @@ -104,12 +125,12 @@
continue
}
w.ch <- &event
case _, ok := <-w.fsw.Errors():
case _, ok := <-w.fw.Errors():

Check warning on line 128 in internal/watcher/filesystem.go

View check run for this annotation

Codecov / codecov/patch

internal/watcher/filesystem.go#L128

Added line #L128 was not covered by tests
if !ok {
continue
}
case <-w.ctx.Done():
_ = w.fsw.Close()
_ = w.fw.Close()
close(w.ch)
return
}
Expand All @@ -136,8 +157,10 @@
return w.path
}

func (w *filesystemWatcher) OpenBucket(context.Context) (*blob.Bucket, error) {
return fileblob.OpenBucket(w.path, nil)
func (w *filesystemWatcher) OpenBucket(ctx context.Context) (*blob.Bucket, error) {
return bucket.Open(ctx, &bucket.Config{
URL: fmt.Sprintf("file://%s", w.path),
})
}

func (w *filesystemWatcher) RemoveAll(key string) error {
Expand All @@ -154,3 +177,15 @@

return fsutil.Move(src, dst)
}

// Download recursively copies the contents of key to dest. Key may be the name
// of a directory or file.
func (w *filesystemWatcher) Download(ctx context.Context, dest, key string) error {
src := filepath.Clean(filepath.Join(w.path, key))
dest = filepath.Clean(filepath.Join(dest, key))
if err := cp.Copy(src, dest); err != nil {
return fmt.Errorf("filesystem watcher: download: %v", err)

Check warning on line 187 in internal/watcher/filesystem.go

View check run for this annotation

Codecov / codecov/patch

internal/watcher/filesystem.go#L187

Added line #L187 was not covered by tests
}

return nil
}
Loading
Loading