diff --git a/go.mod b/go.mod index 0712d6598..db3e904ba 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 github.com/alicebob/miniredis/v2 v2.30.5 github.com/bitly/go-simplejson v0.5.1 + github.com/bmatcuk/doublestar/v4 v4.0.2 github.com/bufbuild/protocompile v0.13.0 github.com/cenkalti/backoff/v4 v4.2.1 github.com/cespare/xxhash/v2 v2.2.0 diff --git a/go.sum b/go.sum index bc79b75e2..1a46bcbb8 100644 --- a/go.sum +++ b/go.sum @@ -29,6 +29,8 @@ github.com/bitly/go-simplejson v0.5.1 h1:xgwPbetQScXt1gh9BmoJ6j9JMr3TElvuIyjR8pg github.com/bitly/go-simplejson v0.5.1/go.mod h1:YOPVLzCfwK14b4Sff3oP1AmGhI9T9Vsg84etUnlyp+Q= github.com/bufbuild/protocompile v0.13.0 h1:6cwUB0Y2tSvmNxsbunwzmIto3xOlJOV7ALALuVOs92M= github.com/bufbuild/protocompile v0.13.0/go.mod h1:dr++fGGeMPWHv7jPeT06ZKukm45NJscd7rUxQVzEKRk= +github.com/bmatcuk/doublestar/v4 v4.0.2 h1:X0krlUVAVmtr2cRoTqR8aDMrDqnB36ht8wpWTiQ3jsA= +github.com/bmatcuk/doublestar/v4 v4.0.2/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc= github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3eG1c= github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= diff --git a/plugin/README.md b/plugin/README.md index 4f94109fd..b5554e32f 100755 --- a/plugin/README.md +++ b/plugin/README.md @@ -30,18 +30,6 @@ But update events don't work with symlinks, so watcher also periodically manuall > ⚠ Use add_file_name plugin if you want to add filename to events. -**Reading docker container log files:** -```yaml -pipelines: - example_docker_pipeline: - input: - type: file - watching_dir: /var/lib/docker/containers - offsets_file: /data/offsets.yaml - filename_pattern: "*-json.log" - persistence_mode: async -``` - [More details...](plugin/input/file/README.md) ## http Reads events from HTTP requests with the body delimited by a new line. diff --git a/plugin/input/README.md b/plugin/input/README.md index 8afbeb3a2..cd1db25d2 100755 --- a/plugin/input/README.md +++ b/plugin/input/README.md @@ -29,18 +29,6 @@ But update events don't work with symlinks, so watcher also periodically manuall > ⚠ Use add_file_name plugin if you want to add filename to events. -**Reading docker container log files:** -```yaml -pipelines: - example_docker_pipeline: - input: - type: file - watching_dir: /var/lib/docker/containers - offsets_file: /data/offsets.yaml - filename_pattern: "*-json.log" - persistence_mode: async -``` - [More details...](plugin/input/file/README.md) ## http Reads events from HTTP requests with the body delimited by a new line. diff --git a/plugin/input/file/README.idoc.md b/plugin/input/file/README.idoc.md index 7c700b4ac..83ea9cc00 100644 --- a/plugin/input/file/README.idoc.md +++ b/plugin/input/file/README.idoc.md @@ -1,6 +1,21 @@ # File plugin @introduction +**Reading docker container log files:** +```yaml +pipelines: + example_docker_pipeline: + input: + type: file + paths: + include: + - '/var/lib/docker/containers/**/*-json.log' + exclude: + - '/var/lib/docker/containers/19aa5027343f4*/*-json.log' + offsets_file: /data/offsets.yaml + persistence_mode: async +``` + ### Config params @config-params|description diff --git a/plugin/input/file/README.md b/plugin/input/file/README.md index 37ff65d34..f5ee6b8d2 100755 --- a/plugin/input/file/README.md +++ b/plugin/input/file/README.md @@ -25,19 +25,22 @@ pipelines: example_docker_pipeline: input: type: file - watching_dir: /var/lib/docker/containers + paths: + include: + - '/var/lib/docker/containers/**/*-json.log' + exclude: + - '/var/lib/docker/containers/19aa5027343f4*/*-json.log' offsets_file: /data/offsets.yaml - filename_pattern: "*-json.log" persistence_mode: async ``` ### Config params -**`watching_dir`** *`string`* *`required`* +**`paths`** *`Paths`* -The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have -`/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`. -Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more -different directories, it's recommended to setup separate pipelines for each. +Set paths in glob format + +* `include` *`[]string`* +* `exclude` *`[]string`*
@@ -48,20 +51,6 @@ The filename to store offsets of processed files. Offsets are loaded only on ini
-**`filename_pattern`** *`string`* *`default=*`* - -Files that don't meet this pattern will be ignored. -> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. - -
- -**`dir_pattern`** *`string`* *`default=*`* - -Dirs that don't meet this pattern will be ignored. -> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. - -
- **`persistence_mode`** *`string`* *`default=async`* *`options=async|sync`* It defines how to save the offsets file: @@ -147,6 +136,29 @@ Example: ```filename: '{{ .filename }}'```
+**`watching_dir`** **Deprecated format** + +The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have +`/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`. +Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more +different directories, it's recommended to setup separate pipelines for each. + +
+ +**`filename_pattern`** *`string`* *`default=*`* + +Files that don't meet this pattern will be ignored. +> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. + +
+ +**`dir_pattern`** *`string`* *`default=*`* + +Dirs that don't meet this pattern will be ignored. +> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. + +
+ ### Meta params **`filename`** diff --git a/plugin/input/file/file.go b/plugin/input/file/file.go index 36928eaad..7eea49e47 100644 --- a/plugin/input/file/file.go +++ b/plugin/input/file/file.go @@ -5,6 +5,7 @@ import ( "path/filepath" "time" + "github.com/bmatcuk/doublestar/v4" "github.com/ozontech/file.d/cfg" "github.com/ozontech/file.d/fd" "github.com/ozontech/file.d/metric" @@ -34,18 +35,6 @@ But update events don't work with symlinks, so watcher also periodically manuall > By default the plugin is notified only on file creations. Note that following for changes is more CPU intensive. > ⚠ Use add_file_name plugin if you want to add filename to events. - -**Reading docker container log files:** -```yaml -pipelines: - example_docker_pipeline: - input: - type: file - watching_dir: /var/lib/docker/containers - offsets_file: /data/offsets.yaml - filename_pattern: "*-json.log" - persistence_mode: async -``` }*/ type Plugin struct { @@ -81,17 +70,22 @@ const ( offsetsOpReset // * `reset` – resets an offset to the beginning of the file ) +type Paths struct { + Include []string `json:"include"` + Exclude []string `json:"exclude"` +} + type Config struct { // ! config-params // ^ config-params // > @3@4@5@6 // > - // > The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have - // > `/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`. - // > Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more - // > different directories, it's recommended to setup separate pipelines for each. - WatchingDir string `json:"watching_dir" required:"true"` // * + // > Set paths in glob format + // > + // > * `include` *`[]string`* + // > * `exclude` *`[]string`* + Paths Paths `json:"paths"` // * // > @3@4@5@6 // > @@ -100,18 +94,6 @@ type Config struct { OffsetsFile string `json:"offsets_file" required:"true"` // * OffsetsFileTmp string - // > @3@4@5@6 - // > - // > Files that don't meet this pattern will be ignored. - // > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. - FilenamePattern string `json:"filename_pattern" default:"*"` // * - - // > @3@4@5@6 - // > - // > Dirs that don't meet this pattern will be ignored. - // > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. - DirPattern string `json:"dir_pattern" default:"*"` // * - // > @3@4@5@6 // > // > It defines how to save the offsets file: @@ -184,6 +166,26 @@ type Config struct { // > // > Example: ```filename: '{{ .filename }}'``` Meta cfg.MetaTemplates `json:"meta"` // * + + // > **Deprecated format** + // > + // > The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have + // > `/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`. + // > Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more + // > different directories, it's recommended to setup separate pipelines for each. + WatchingDir string `json:"watching_dir"` // * + + // > @3@4@5@6 + // > + // > Files that don't meet this pattern will be ignored. + // > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. + FilenamePattern string `json:"filename_pattern" default:"*"` // * + + // > @3@4@5@6 + // > + // > Dirs that don't meet this pattern will be ignored. + // > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. + DirPattern string `json:"dir_pattern" default:"*"` // * } var offsetFiles = make(map[string]string) @@ -222,6 +224,20 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa offsetFiles[offsetFilePath] = params.PipelineName } + for _, pattern := range p.config.Paths.Include { + _, err := doublestar.PathMatch(pattern, ".") + if err != nil { + p.logger.Fatalf("wrong paths include pattern %q: %s", pattern, err.Error()) + } + } + + for _, pattern := range p.config.Paths.Exclude { + _, err := doublestar.PathMatch(pattern, ".") + if err != nil { + p.logger.Fatalf("wrong paths exclude pattern %q: %s", pattern, err.Error()) + } + } + p.jobProvider = NewJobProvider( p.config, newMetricCollection( diff --git a/plugin/input/file/info.go b/plugin/input/file/info.go index 3aa3403c2..a478b5cb6 100644 --- a/plugin/input/file/info.go +++ b/plugin/input/file/info.go @@ -45,10 +45,16 @@ func (ir *InfoRegistry) Info(w http.ResponseWriter, r *http.Request) { _, _ = w.Write([]byte(jobsInfo)) - watcherInfo := logger.Header("watch_dirs") + watcherInfo := logger.Header("watch_paths") + for _, source := range plugin.jobProvider.watcher.basePaths { + watcherInfo += fmt.Sprintf( + "%s\n", + source, + ) + } watcherInfo += fmt.Sprintf( - "%s\n", - plugin.jobProvider.watcher.path, + "commonPath: %s\n", + plugin.jobProvider.watcher.commonPath, ) _, _ = w.Write([]byte(watcherInfo)) diff --git a/plugin/input/file/provider.go b/plugin/input/file/provider.go index d2fb412e2..0784938a4 100644 --- a/plugin/input/file/provider.go +++ b/plugin/input/file/provider.go @@ -145,10 +145,23 @@ func NewJobProvider(config *Config, metrics *metricCollection, sugLogger *zap.Su numberOfCurrentJobsMetric: metrics.numberOfCurrentJobsMetric, } + if len(config.Paths.Include) == 0 { + if config.DirPattern == "*" { + config.Paths.Include = append( + config.Paths.Include, + filepath.Join(config.WatchingDir, filepath.Join("**", config.FilenamePattern)), + ) + } else { + config.Paths.Include = append( + config.Paths.Include, + filepath.Join(config.WatchingDir, filepath.Join(config.DirPattern, config.FilenamePattern)), + ) + } + } + jp.watcher = NewWatcher( config.WatchingDir, - config.FilenamePattern, - config.DirPattern, + config.Paths, jp.processNotification, config.ShouldWatchChanges, metrics.notifyChannelLengthMetric, diff --git a/plugin/input/file/provider_test.go b/plugin/input/file/provider_test.go index 7973bea71..adc4545b2 100644 --- a/plugin/input/file/provider_test.go +++ b/plugin/input/file/provider_test.go @@ -5,11 +5,17 @@ import ( "path/filepath" "testing" + "runtime" + "github.com/ozontech/file.d/logger" "github.com/ozontech/file.d/metric" "github.com/prometheus/client_golang/prometheus" uuid "github.com/satori/go.uuid" "github.com/stretchr/testify/require" + + "github.com/ozontech/file.d/test" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" ) func TestRefreshSymlinkOnBrokenLink(t *testing.T) { @@ -48,3 +54,87 @@ func TestRefreshSymlinkOnBrokenLink(t *testing.T) { jp.maintenanceSymlinks() require.Equal(t, 0, len(jp.symlinks)) } + +func TestProvierWatcherPaths(t *testing.T) { + tests := []struct { + name string + config *Config + expectedPathes Paths + }{ + { + name: "default config", + config: &Config{ + WatchingDir: "/var/log/", + OffsetsFile: "offset.json", + }, + expectedPathes: Paths{ + Include: []string{"/var/log/**/*"}, + }, + }, + { + name: "filename pattern config", + config: &Config{ + WatchingDir: "/var/log/", + FilenamePattern: "error.log", + OffsetsFile: "offset.json", + }, + expectedPathes: Paths{ + Include: []string{"/var/log/**/error.log"}, + }, + }, + { + name: "filename and dir pattern config", + config: &Config{ + WatchingDir: "/var/log/", + FilenamePattern: "error.log", + DirPattern: "nginx-ingress-*", + OffsetsFile: "offset.json", + }, + expectedPathes: Paths{ + Include: []string{"/var/log/nginx-ingress-*/error.log"}, + }, + }, + { + name: "dir pattern config", + config: &Config{ + WatchingDir: "/var/log/", + DirPattern: "nginx-ingress-*", + OffsetsFile: "offset.json", + }, + expectedPathes: Paths{ + Include: []string{"/var/log/nginx-ingress-*/*"}, + }, + }, + { + name: "ignore filename and dir pattern", + config: &Config{ + WatchingDir: "/var/log/", + FilenamePattern: "error.log", + DirPattern: "nginx-ingress-*", + OffsetsFile: "offset.json", + Paths: Paths{ + Include: []string{"/var/log/access.log"}, + }, + }, + expectedPathes: Paths{ + Include: []string{"/var/log/access.log"}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctl := metric.NewCtl("test", prometheus.NewRegistry()) + config := tt.config + test.NewConfig(config, map[string]int{"gomaxprocs": runtime.GOMAXPROCS(0)}) + metrics := newMetricCollection( + ctl.RegisterCounter("worker1", "help_test"), + ctl.RegisterCounter("worker2", "help_test"), + ctl.RegisterGauge("worker3", "help_test"), + ctl.RegisterGauge("worker4", "help_test"), + ) + jp := NewJobProvider(config, metrics, &zap.SugaredLogger{}) + + assert.Equal(t, tt.expectedPathes, jp.watcher.paths) + }) + } +} diff --git a/plugin/input/file/watcher.go b/plugin/input/file/watcher.go index 6f69aa792..d88ae9066 100644 --- a/plugin/input/file/watcher.go +++ b/plugin/input/file/watcher.go @@ -3,16 +3,18 @@ package file import ( "os" "path/filepath" + "strings" + "github.com/bmatcuk/doublestar/v4" "github.com/prometheus/client_golang/prometheus" "github.com/rjeczalik/notify" "go.uber.org/zap" ) type watcher struct { - path string // dir in which watch for files - filenamePattern string // files which match this pattern will be watched - dirPattern string // dirs which match this pattern will be watched + commonPath string + basePaths []string + paths Paths notifyFn notifyFn // function to receive notifications watcherCh chan notify.EventInfo shouldWatchWrites bool @@ -25,18 +27,15 @@ type notifyFn func(e notify.Event, filename string, stat os.FileInfo) // NewWatcher creates a watcher that see file creations in the path // and if they match filePattern and dirPattern, pass them to notifyFn. func NewWatcher( - path string, - filenamePattern string, - dirPattern string, + dir string, + paths Paths, notifyFn notifyFn, shouldWatchWrites bool, notifyChannelLengthMetric prometheus.Gauge, logger *zap.SugaredLogger, ) *watcher { return &watcher{ - path: path, - filenamePattern: filenamePattern, - dirPattern: dirPattern, + paths: paths, notifyFn: notifyFn, shouldWatchWrites: shouldWatchWrites, notifyChannelLengthMetric: notifyChannelLengthMetric, @@ -45,15 +44,17 @@ func NewWatcher( } func (w *watcher) start() { - w.logger.Infof("starting watcher path=%s, pattern=%s", w.path, w.filenamePattern) - - if _, err := filepath.Match(w.filenamePattern, "_"); err != nil { - w.logger.Fatalf("wrong file name pattern %q: %s", w.filenamePattern, err.Error()) + for _, pattern := range w.paths.Include { + // /var/lib/docker/containers/**/*-json.log -> /var/lib/docker/containers + basePattern, _ := doublestar.SplitPattern(pattern) + w.basePaths = append(w.basePaths, basePattern) } + w.commonPath = commonPathPrefix(w.basePaths) - if _, err := filepath.Match(w.dirPattern, "_"); err != nil { - w.logger.Fatalf("wrong dir name pattern %q: %s", w.dirPattern, err.Error()) - } + w.logger.Infof( + "starting watcher path=%s, pattern_included=%q, pattern_excluded=%q", + w.commonPath, w.paths.Include, w.paths.Exclude, + ) eventsCh := make(chan notify.EventInfo, 256) w.watcherCh = eventsCh @@ -64,7 +65,7 @@ func (w *watcher) start() { } // watch recursively. - err := notify.Watch(filepath.Join(w.path, "..."), eventsCh, events...) + err := notify.Watch(filepath.Join(w.commonPath, "..."), eventsCh, events...) if err != nil { w.logger.Warnf("can't create fs watcher: %s", err.Error()) return @@ -73,7 +74,33 @@ func (w *watcher) start() { go w.watch() - w.tryAddPath(w.path) + w.tryAddPath(w.commonPath) +} + +func commonPathPrefix(paths []string) string { + results := make([][]string, 0, len(paths)) + results = append(results, strings.Split(paths[0], string(os.PathSeparator))) + longest := results[0] + + cmpWithLongest := func(a []string) { + if len(a) < len(longest) { + longest = longest[:len(a)] + } + for i := 0; i < len(longest); i++ { + if a[i] != longest[i] { + longest = longest[:i] + return + } + } + } + + for i := 1; i < len(paths); i++ { + r := strings.Split(paths[i], string(os.PathSeparator)) + results = append(results, r) + cmpWithLongest(r) + } + + return filepath.Join(string(os.PathSeparator), filepath.Join(longest...)) } func (w *watcher) stop() { @@ -84,19 +111,21 @@ func (w *watcher) stop() { } func (w *watcher) tryAddPath(path string) { - files, err := os.ReadDir(path) - if err != nil { - return - } - w.logger.Infof("starting path watch: %s ", path) - for _, file := range files { - if file.Name() == "" || file.Name() == "." || file.Name() == ".." { - continue - } - - w.notify(notify.Create, filepath.Join(path, file.Name())) + err := filepath.Walk(path, + func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() { + w.notify(notify.Create, path) + } + return nil + }, + ) + if err != nil { + return } } @@ -106,10 +135,18 @@ func (w *watcher) notify(e notify.Event, path string) { return } - filename, err := filepath.Abs(filename) - if err != nil { - w.logger.Fatalf("can't get abs file name: %s", err.Error()) - return + w.logger.Infof("notify %s %s", e, path) + + for _, pattern := range w.paths.Exclude { + match, err := doublestar.PathMatch(pattern, path) + if err != nil { + w.logger.Errorf("wrong paths exclude pattern %q: %s", pattern, err.Error()) + return + } + if match { + w.logger.Infof("excluded %s by pattern %s", path, pattern) + return + } } stat, err := os.Lstat(filename) @@ -117,14 +154,49 @@ func (w *watcher) notify(e notify.Event, path string) { return } - match, _ := filepath.Match(w.filenamePattern, filepath.Base(filename)) - if match { - w.notifyFn(e, filename, stat) + if stat.IsDir() { + dirFilename := filename + check_dir: + for { + for _, path := range w.basePaths { + if path == dirFilename { + w.tryAddPath(filename) + break check_dir + } + } + if dirFilename == w.commonPath { + break + } + dirFilename = filepath.Dir(dirFilename) + } + return } - match, _ = filepath.Match(w.dirPattern, filepath.Base(filename)) - if stat.IsDir() && match { - w.tryAddPath(filename) + dirFilename := filepath.Dir(filename) +check_file: + for { + for _, path := range w.basePaths { + if path == dirFilename { + break check_file + } + } + if dirFilename == w.commonPath { + return + } + dirFilename = filepath.Dir(dirFilename) + } + + for _, pattern := range w.paths.Include { + match, err := doublestar.PathMatch(pattern, path) + if err != nil { + w.logger.Errorf("wrong paths include pattern %q: %s", pattern, err.Error()) + return + } + + if match { + w.logger.Infof("path %s matched by pattern %s", filename, pattern) + w.notifyFn(e, filename, stat) + } } } diff --git a/plugin/input/file/watcher_test.go b/plugin/input/file/watcher_test.go index 3b7715bc4..56b353ba5 100644 --- a/plugin/input/file/watcher_test.go +++ b/plugin/input/file/watcher_test.go @@ -10,6 +10,7 @@ import ( "github.com/ozontech/file.d/metric" "github.com/prometheus/client_golang/prometheus" "github.com/rjeczalik/notify" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" "go.uber.org/zap" @@ -19,7 +20,6 @@ func TestWatcher(t *testing.T) { tests := []struct { name string filenamePattern string - dirPattern string }{ { name: "should_ok_and_count_only_creation", @@ -29,16 +29,22 @@ func TestWatcher(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - path := t.TempDir() + dir := t.TempDir() shouldCreate := atomic.Int64{} notifyFn := func(_ notify.Event, _ string, _ os.FileInfo) { shouldCreate.Inc() } ctl := metric.NewCtl("test", prometheus.NewRegistry()) w := NewWatcher( - path, - tt.filenamePattern, - tt.dirPattern, + dir, + Paths{ + Include: []string{ + filepath.Join( + dir, + tt.filenamePattern, + ), + }, + }, notifyFn, false, ctl.RegisterGauge("worker", "help_test"), @@ -49,14 +55,14 @@ func TestWatcher(t *testing.T) { // create, write, remove files and ensure events are only passed for creation events. - f1Name := filepath.Join(path, "watch1.log") + f1Name := filepath.Join(dir, "watch1.log") f1, err := os.Create(f1Name) require.NoError(t, err) err = f1.Close() require.NoError(t, err) - f2, err := os.Create(filepath.Join(path, "watch2.log")) + f2, err := os.Create(filepath.Join(dir, "watch2.log")) require.NoError(t, err) err = f2.Close() require.NoError(t, err) @@ -81,3 +87,120 @@ func TestWatcher(t *testing.T) { }) } } + +// nolint:gocritic +func TestWatcherPaths(t *testing.T) { + dir := t.TempDir() + shouldCreate := atomic.Int64{} + notifyFn := func(_ notify.Event, _ string, _ os.FileInfo) { + shouldCreate.Inc() + } + ctl := metric.NewCtl("test", prometheus.NewRegistry()) + w := NewWatcher( + dir, + Paths{ + Include: []string{ + filepath.Join(dir, "nginx-ingress-*/error.log"), + filepath.Join(dir, "log/**/*"), + filepath.Join(dir, "access.log"), + filepath.Join(dir, "**/sub_access.log"), + }, + Exclude: []string{ + filepath.Join(dir, "log/payments/**"), + filepath.Join(dir, "nginx-ingress-payments/error.log"), + }, + }, + notifyFn, + false, + ctl.RegisterGauge("worker", "help_test"), + zap.L().Sugar(), + ) + w.start() + defer w.stop() + + tests := []struct { + name string + filename string + shouldNotify bool + }{ + { + filename: filepath.Join(dir, "nginx-ingress-0/error.log"), + shouldNotify: true, + }, + { + filename: filepath.Join(dir, "log/errors.log"), + shouldNotify: true, + }, + { + filename: filepath.Join(dir, "log/0/errors.log"), + shouldNotify: true, + }, + { + filename: filepath.Join(dir, "log/0/0/errors.log"), + shouldNotify: true, + }, + { + filename: filepath.Join(dir, "access.log"), + shouldNotify: true, + }, + { + filename: filepath.Join(dir, "sub_access.log"), + shouldNotify: true, + }, + { + filename: filepath.Join(dir, "access1.log"), + shouldNotify: false, + }, + { + filename: filepath.Join(dir, "nginx/errors.log"), + shouldNotify: false, + }, + { + filename: filepath.Join(dir, "log/payments/errors.log"), + shouldNotify: false, + }, + { + filename: filepath.Join(dir, "log/payments/nginx-ingress-0/errors.log"), + shouldNotify: false, + }, + { + filename: filepath.Join(dir, "nginx-ingress-payments/error.log"), + shouldNotify: false, + }, + } + for _, tt := range tests { + tt := tt + relFilename, _ := filepath.Rel(dir, tt.filename) + t.Run(relFilename, func(t *testing.T) { + filename := tt.filename + + err := os.MkdirAll(filepath.Dir(filename), 0o700) + require.NoError(t, err) + + f1, err := os.Create(filename) + require.NoError(t, err) + err = f1.Close() + require.NoError(t, err) + + time.Sleep(10 * time.Millisecond) + before := shouldCreate.Load() + w.notify(notify.Create, filename) + after := shouldCreate.Load() + + isNotified := after-before != 0 + require.Equal(t, tt.shouldNotify, isNotified) + }) + } +} + +func TestCommonPathPrefix(t *testing.T) { + a := assert.New(t) + + paths := []string{ + "/var/log/", + "/var/lib/docker/", + } + + result := commonPathPrefix(paths) + a.Equal("/var", result) +}