diff --git a/go.mod b/go.mod
index 0712d6598..db3e904ba 100644
--- a/go.mod
+++ b/go.mod
@@ -12,6 +12,7 @@ require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137
github.com/alicebob/miniredis/v2 v2.30.5
github.com/bitly/go-simplejson v0.5.1
+ github.com/bmatcuk/doublestar/v4 v4.0.2
github.com/bufbuild/protocompile v0.13.0
github.com/cenkalti/backoff/v4 v4.2.1
github.com/cespare/xxhash/v2 v2.2.0
diff --git a/go.sum b/go.sum
index bc79b75e2..1a46bcbb8 100644
--- a/go.sum
+++ b/go.sum
@@ -29,6 +29,8 @@ github.com/bitly/go-simplejson v0.5.1 h1:xgwPbetQScXt1gh9BmoJ6j9JMr3TElvuIyjR8pg
github.com/bitly/go-simplejson v0.5.1/go.mod h1:YOPVLzCfwK14b4Sff3oP1AmGhI9T9Vsg84etUnlyp+Q=
github.com/bufbuild/protocompile v0.13.0 h1:6cwUB0Y2tSvmNxsbunwzmIto3xOlJOV7ALALuVOs92M=
github.com/bufbuild/protocompile v0.13.0/go.mod h1:dr++fGGeMPWHv7jPeT06ZKukm45NJscd7rUxQVzEKRk=
+github.com/bmatcuk/doublestar/v4 v4.0.2 h1:X0krlUVAVmtr2cRoTqR8aDMrDqnB36ht8wpWTiQ3jsA=
+github.com/bmatcuk/doublestar/v4 v4.0.2/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc=
github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3eG1c=
github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
diff --git a/plugin/README.md b/plugin/README.md
index 4f94109fd..b5554e32f 100755
--- a/plugin/README.md
+++ b/plugin/README.md
@@ -30,18 +30,6 @@ But update events don't work with symlinks, so watcher also periodically manuall
> ⚠ Use add_file_name plugin if you want to add filename to events.
-**Reading docker container log files:**
-```yaml
-pipelines:
- example_docker_pipeline:
- input:
- type: file
- watching_dir: /var/lib/docker/containers
- offsets_file: /data/offsets.yaml
- filename_pattern: "*-json.log"
- persistence_mode: async
-```
-
[More details...](plugin/input/file/README.md)
## http
Reads events from HTTP requests with the body delimited by a new line.
diff --git a/plugin/input/README.md b/plugin/input/README.md
index 8afbeb3a2..cd1db25d2 100755
--- a/plugin/input/README.md
+++ b/plugin/input/README.md
@@ -29,18 +29,6 @@ But update events don't work with symlinks, so watcher also periodically manuall
> ⚠ Use add_file_name plugin if you want to add filename to events.
-**Reading docker container log files:**
-```yaml
-pipelines:
- example_docker_pipeline:
- input:
- type: file
- watching_dir: /var/lib/docker/containers
- offsets_file: /data/offsets.yaml
- filename_pattern: "*-json.log"
- persistence_mode: async
-```
-
[More details...](plugin/input/file/README.md)
## http
Reads events from HTTP requests with the body delimited by a new line.
diff --git a/plugin/input/file/README.idoc.md b/plugin/input/file/README.idoc.md
index 7c700b4ac..83ea9cc00 100644
--- a/plugin/input/file/README.idoc.md
+++ b/plugin/input/file/README.idoc.md
@@ -1,6 +1,21 @@
# File plugin
@introduction
+**Reading docker container log files:**
+```yaml
+pipelines:
+ example_docker_pipeline:
+ input:
+ type: file
+ paths:
+ include:
+ - '/var/lib/docker/containers/**/*-json.log'
+ exclude:
+ - '/var/lib/docker/containers/19aa5027343f4*/*-json.log'
+ offsets_file: /data/offsets.yaml
+ persistence_mode: async
+```
+
### Config params
@config-params|description
diff --git a/plugin/input/file/README.md b/plugin/input/file/README.md
index 37ff65d34..f5ee6b8d2 100755
--- a/plugin/input/file/README.md
+++ b/plugin/input/file/README.md
@@ -25,19 +25,22 @@ pipelines:
example_docker_pipeline:
input:
type: file
- watching_dir: /var/lib/docker/containers
+ paths:
+ include:
+ - '/var/lib/docker/containers/**/*-json.log'
+ exclude:
+ - '/var/lib/docker/containers/19aa5027343f4*/*-json.log'
offsets_file: /data/offsets.yaml
- filename_pattern: "*-json.log"
persistence_mode: async
```
### Config params
-**`watching_dir`** *`string`* *`required`*
+**`paths`** *`Paths`*
-The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have
-`/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`.
-Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more
-different directories, it's recommended to setup separate pipelines for each.
+Set paths in glob format
+
+* `include` *`[]string`*
+* `exclude` *`[]string`*
@@ -48,20 +51,6 @@ The filename to store offsets of processed files. Offsets are loaded only on ini
-**`filename_pattern`** *`string`* *`default=*`*
-
-Files that don't meet this pattern will be ignored.
-> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
-
-
-
-**`dir_pattern`** *`string`* *`default=*`*
-
-Dirs that don't meet this pattern will be ignored.
-> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
-
-
-
**`persistence_mode`** *`string`* *`default=async`* *`options=async|sync`*
It defines how to save the offsets file:
@@ -147,6 +136,29 @@ Example: ```filename: '{{ .filename }}'```
+**`watching_dir`** **Deprecated format**
+
+The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have
+`/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`.
+Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more
+different directories, it's recommended to setup separate pipelines for each.
+
+
+
+**`filename_pattern`** *`string`* *`default=*`*
+
+Files that don't meet this pattern will be ignored.
+> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
+
+
+
+**`dir_pattern`** *`string`* *`default=*`*
+
+Dirs that don't meet this pattern will be ignored.
+> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
+
+
+
### Meta params
**`filename`**
diff --git a/plugin/input/file/file.go b/plugin/input/file/file.go
index 36928eaad..7eea49e47 100644
--- a/plugin/input/file/file.go
+++ b/plugin/input/file/file.go
@@ -5,6 +5,7 @@ import (
"path/filepath"
"time"
+ "github.com/bmatcuk/doublestar/v4"
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/metric"
@@ -34,18 +35,6 @@ But update events don't work with symlinks, so watcher also periodically manuall
> By default the plugin is notified only on file creations. Note that following for changes is more CPU intensive.
> ⚠ Use add_file_name plugin if you want to add filename to events.
-
-**Reading docker container log files:**
-```yaml
-pipelines:
- example_docker_pipeline:
- input:
- type: file
- watching_dir: /var/lib/docker/containers
- offsets_file: /data/offsets.yaml
- filename_pattern: "*-json.log"
- persistence_mode: async
-```
}*/
type Plugin struct {
@@ -81,17 +70,22 @@ const (
offsetsOpReset // * `reset` – resets an offset to the beginning of the file
)
+type Paths struct {
+ Include []string `json:"include"`
+ Exclude []string `json:"exclude"`
+}
+
type Config struct {
// ! config-params
// ^ config-params
// > @3@4@5@6
// >
- // > The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have
- // > `/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`.
- // > Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more
- // > different directories, it's recommended to setup separate pipelines for each.
- WatchingDir string `json:"watching_dir" required:"true"` // *
+ // > Set paths in glob format
+ // >
+ // > * `include` *`[]string`*
+ // > * `exclude` *`[]string`*
+ Paths Paths `json:"paths"` // *
// > @3@4@5@6
// >
@@ -100,18 +94,6 @@ type Config struct {
OffsetsFile string `json:"offsets_file" required:"true"` // *
OffsetsFileTmp string
- // > @3@4@5@6
- // >
- // > Files that don't meet this pattern will be ignored.
- // > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
- FilenamePattern string `json:"filename_pattern" default:"*"` // *
-
- // > @3@4@5@6
- // >
- // > Dirs that don't meet this pattern will be ignored.
- // > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
- DirPattern string `json:"dir_pattern" default:"*"` // *
-
// > @3@4@5@6
// >
// > It defines how to save the offsets file:
@@ -184,6 +166,26 @@ type Config struct {
// >
// > Example: ```filename: '{{ .filename }}'```
Meta cfg.MetaTemplates `json:"meta"` // *
+
+ // > **Deprecated format**
+ // >
+ // > The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have
+ // > `/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`.
+ // > Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more
+ // > different directories, it's recommended to setup separate pipelines for each.
+ WatchingDir string `json:"watching_dir"` // *
+
+ // > @3@4@5@6
+ // >
+ // > Files that don't meet this pattern will be ignored.
+ // > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
+ FilenamePattern string `json:"filename_pattern" default:"*"` // *
+
+ // > @3@4@5@6
+ // >
+ // > Dirs that don't meet this pattern will be ignored.
+ // > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
+ DirPattern string `json:"dir_pattern" default:"*"` // *
}
var offsetFiles = make(map[string]string)
@@ -222,6 +224,20 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa
offsetFiles[offsetFilePath] = params.PipelineName
}
+ for _, pattern := range p.config.Paths.Include {
+ _, err := doublestar.PathMatch(pattern, ".")
+ if err != nil {
+ p.logger.Fatalf("wrong paths include pattern %q: %s", pattern, err.Error())
+ }
+ }
+
+ for _, pattern := range p.config.Paths.Exclude {
+ _, err := doublestar.PathMatch(pattern, ".")
+ if err != nil {
+ p.logger.Fatalf("wrong paths exclude pattern %q: %s", pattern, err.Error())
+ }
+ }
+
p.jobProvider = NewJobProvider(
p.config,
newMetricCollection(
diff --git a/plugin/input/file/info.go b/plugin/input/file/info.go
index 3aa3403c2..a478b5cb6 100644
--- a/plugin/input/file/info.go
+++ b/plugin/input/file/info.go
@@ -45,10 +45,16 @@ func (ir *InfoRegistry) Info(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(jobsInfo))
- watcherInfo := logger.Header("watch_dirs")
+ watcherInfo := logger.Header("watch_paths")
+ for _, source := range plugin.jobProvider.watcher.basePaths {
+ watcherInfo += fmt.Sprintf(
+ "%s\n",
+ source,
+ )
+ }
watcherInfo += fmt.Sprintf(
- "%s\n",
- plugin.jobProvider.watcher.path,
+ "commonPath: %s\n",
+ plugin.jobProvider.watcher.commonPath,
)
_, _ = w.Write([]byte(watcherInfo))
diff --git a/plugin/input/file/provider.go b/plugin/input/file/provider.go
index d2fb412e2..0784938a4 100644
--- a/plugin/input/file/provider.go
+++ b/plugin/input/file/provider.go
@@ -145,10 +145,23 @@ func NewJobProvider(config *Config, metrics *metricCollection, sugLogger *zap.Su
numberOfCurrentJobsMetric: metrics.numberOfCurrentJobsMetric,
}
+ if len(config.Paths.Include) == 0 {
+ if config.DirPattern == "*" {
+ config.Paths.Include = append(
+ config.Paths.Include,
+ filepath.Join(config.WatchingDir, filepath.Join("**", config.FilenamePattern)),
+ )
+ } else {
+ config.Paths.Include = append(
+ config.Paths.Include,
+ filepath.Join(config.WatchingDir, filepath.Join(config.DirPattern, config.FilenamePattern)),
+ )
+ }
+ }
+
jp.watcher = NewWatcher(
config.WatchingDir,
- config.FilenamePattern,
- config.DirPattern,
+ config.Paths,
jp.processNotification,
config.ShouldWatchChanges,
metrics.notifyChannelLengthMetric,
diff --git a/plugin/input/file/provider_test.go b/plugin/input/file/provider_test.go
index 7973bea71..adc4545b2 100644
--- a/plugin/input/file/provider_test.go
+++ b/plugin/input/file/provider_test.go
@@ -5,11 +5,17 @@ import (
"path/filepath"
"testing"
+ "runtime"
+
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/metric"
"github.com/prometheus/client_golang/prometheus"
uuid "github.com/satori/go.uuid"
"github.com/stretchr/testify/require"
+
+ "github.com/ozontech/file.d/test"
+ "github.com/stretchr/testify/assert"
+ "go.uber.org/zap"
)
func TestRefreshSymlinkOnBrokenLink(t *testing.T) {
@@ -48,3 +54,87 @@ func TestRefreshSymlinkOnBrokenLink(t *testing.T) {
jp.maintenanceSymlinks()
require.Equal(t, 0, len(jp.symlinks))
}
+
+func TestProvierWatcherPaths(t *testing.T) {
+ tests := []struct {
+ name string
+ config *Config
+ expectedPathes Paths
+ }{
+ {
+ name: "default config",
+ config: &Config{
+ WatchingDir: "/var/log/",
+ OffsetsFile: "offset.json",
+ },
+ expectedPathes: Paths{
+ Include: []string{"/var/log/**/*"},
+ },
+ },
+ {
+ name: "filename pattern config",
+ config: &Config{
+ WatchingDir: "/var/log/",
+ FilenamePattern: "error.log",
+ OffsetsFile: "offset.json",
+ },
+ expectedPathes: Paths{
+ Include: []string{"/var/log/**/error.log"},
+ },
+ },
+ {
+ name: "filename and dir pattern config",
+ config: &Config{
+ WatchingDir: "/var/log/",
+ FilenamePattern: "error.log",
+ DirPattern: "nginx-ingress-*",
+ OffsetsFile: "offset.json",
+ },
+ expectedPathes: Paths{
+ Include: []string{"/var/log/nginx-ingress-*/error.log"},
+ },
+ },
+ {
+ name: "dir pattern config",
+ config: &Config{
+ WatchingDir: "/var/log/",
+ DirPattern: "nginx-ingress-*",
+ OffsetsFile: "offset.json",
+ },
+ expectedPathes: Paths{
+ Include: []string{"/var/log/nginx-ingress-*/*"},
+ },
+ },
+ {
+ name: "ignore filename and dir pattern",
+ config: &Config{
+ WatchingDir: "/var/log/",
+ FilenamePattern: "error.log",
+ DirPattern: "nginx-ingress-*",
+ OffsetsFile: "offset.json",
+ Paths: Paths{
+ Include: []string{"/var/log/access.log"},
+ },
+ },
+ expectedPathes: Paths{
+ Include: []string{"/var/log/access.log"},
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ ctl := metric.NewCtl("test", prometheus.NewRegistry())
+ config := tt.config
+ test.NewConfig(config, map[string]int{"gomaxprocs": runtime.GOMAXPROCS(0)})
+ metrics := newMetricCollection(
+ ctl.RegisterCounter("worker1", "help_test"),
+ ctl.RegisterCounter("worker2", "help_test"),
+ ctl.RegisterGauge("worker3", "help_test"),
+ ctl.RegisterGauge("worker4", "help_test"),
+ )
+ jp := NewJobProvider(config, metrics, &zap.SugaredLogger{})
+
+ assert.Equal(t, tt.expectedPathes, jp.watcher.paths)
+ })
+ }
+}
diff --git a/plugin/input/file/watcher.go b/plugin/input/file/watcher.go
index 6f69aa792..d88ae9066 100644
--- a/plugin/input/file/watcher.go
+++ b/plugin/input/file/watcher.go
@@ -3,16 +3,18 @@ package file
import (
"os"
"path/filepath"
+ "strings"
+ "github.com/bmatcuk/doublestar/v4"
"github.com/prometheus/client_golang/prometheus"
"github.com/rjeczalik/notify"
"go.uber.org/zap"
)
type watcher struct {
- path string // dir in which watch for files
- filenamePattern string // files which match this pattern will be watched
- dirPattern string // dirs which match this pattern will be watched
+ commonPath string
+ basePaths []string
+ paths Paths
notifyFn notifyFn // function to receive notifications
watcherCh chan notify.EventInfo
shouldWatchWrites bool
@@ -25,18 +27,15 @@ type notifyFn func(e notify.Event, filename string, stat os.FileInfo)
// NewWatcher creates a watcher that see file creations in the path
// and if they match filePattern and dirPattern, pass them to notifyFn.
func NewWatcher(
- path string,
- filenamePattern string,
- dirPattern string,
+ dir string,
+ paths Paths,
notifyFn notifyFn,
shouldWatchWrites bool,
notifyChannelLengthMetric prometheus.Gauge,
logger *zap.SugaredLogger,
) *watcher {
return &watcher{
- path: path,
- filenamePattern: filenamePattern,
- dirPattern: dirPattern,
+ paths: paths,
notifyFn: notifyFn,
shouldWatchWrites: shouldWatchWrites,
notifyChannelLengthMetric: notifyChannelLengthMetric,
@@ -45,15 +44,17 @@ func NewWatcher(
}
func (w *watcher) start() {
- w.logger.Infof("starting watcher path=%s, pattern=%s", w.path, w.filenamePattern)
-
- if _, err := filepath.Match(w.filenamePattern, "_"); err != nil {
- w.logger.Fatalf("wrong file name pattern %q: %s", w.filenamePattern, err.Error())
+ for _, pattern := range w.paths.Include {
+ // /var/lib/docker/containers/**/*-json.log -> /var/lib/docker/containers
+ basePattern, _ := doublestar.SplitPattern(pattern)
+ w.basePaths = append(w.basePaths, basePattern)
}
+ w.commonPath = commonPathPrefix(w.basePaths)
- if _, err := filepath.Match(w.dirPattern, "_"); err != nil {
- w.logger.Fatalf("wrong dir name pattern %q: %s", w.dirPattern, err.Error())
- }
+ w.logger.Infof(
+ "starting watcher path=%s, pattern_included=%q, pattern_excluded=%q",
+ w.commonPath, w.paths.Include, w.paths.Exclude,
+ )
eventsCh := make(chan notify.EventInfo, 256)
w.watcherCh = eventsCh
@@ -64,7 +65,7 @@ func (w *watcher) start() {
}
// watch recursively.
- err := notify.Watch(filepath.Join(w.path, "..."), eventsCh, events...)
+ err := notify.Watch(filepath.Join(w.commonPath, "..."), eventsCh, events...)
if err != nil {
w.logger.Warnf("can't create fs watcher: %s", err.Error())
return
@@ -73,7 +74,33 @@ func (w *watcher) start() {
go w.watch()
- w.tryAddPath(w.path)
+ w.tryAddPath(w.commonPath)
+}
+
+func commonPathPrefix(paths []string) string {
+ results := make([][]string, 0, len(paths))
+ results = append(results, strings.Split(paths[0], string(os.PathSeparator)))
+ longest := results[0]
+
+ cmpWithLongest := func(a []string) {
+ if len(a) < len(longest) {
+ longest = longest[:len(a)]
+ }
+ for i := 0; i < len(longest); i++ {
+ if a[i] != longest[i] {
+ longest = longest[:i]
+ return
+ }
+ }
+ }
+
+ for i := 1; i < len(paths); i++ {
+ r := strings.Split(paths[i], string(os.PathSeparator))
+ results = append(results, r)
+ cmpWithLongest(r)
+ }
+
+ return filepath.Join(string(os.PathSeparator), filepath.Join(longest...))
}
func (w *watcher) stop() {
@@ -84,19 +111,21 @@ func (w *watcher) stop() {
}
func (w *watcher) tryAddPath(path string) {
- files, err := os.ReadDir(path)
- if err != nil {
- return
- }
-
w.logger.Infof("starting path watch: %s ", path)
- for _, file := range files {
- if file.Name() == "" || file.Name() == "." || file.Name() == ".." {
- continue
- }
-
- w.notify(notify.Create, filepath.Join(path, file.Name()))
+ err := filepath.Walk(path,
+ func(path string, info os.FileInfo, err error) error {
+ if err != nil {
+ return err
+ }
+ if !info.IsDir() {
+ w.notify(notify.Create, path)
+ }
+ return nil
+ },
+ )
+ if err != nil {
+ return
}
}
@@ -106,10 +135,18 @@ func (w *watcher) notify(e notify.Event, path string) {
return
}
- filename, err := filepath.Abs(filename)
- if err != nil {
- w.logger.Fatalf("can't get abs file name: %s", err.Error())
- return
+ w.logger.Infof("notify %s %s", e, path)
+
+ for _, pattern := range w.paths.Exclude {
+ match, err := doublestar.PathMatch(pattern, path)
+ if err != nil {
+ w.logger.Errorf("wrong paths exclude pattern %q: %s", pattern, err.Error())
+ return
+ }
+ if match {
+ w.logger.Infof("excluded %s by pattern %s", path, pattern)
+ return
+ }
}
stat, err := os.Lstat(filename)
@@ -117,14 +154,49 @@ func (w *watcher) notify(e notify.Event, path string) {
return
}
- match, _ := filepath.Match(w.filenamePattern, filepath.Base(filename))
- if match {
- w.notifyFn(e, filename, stat)
+ if stat.IsDir() {
+ dirFilename := filename
+ check_dir:
+ for {
+ for _, path := range w.basePaths {
+ if path == dirFilename {
+ w.tryAddPath(filename)
+ break check_dir
+ }
+ }
+ if dirFilename == w.commonPath {
+ break
+ }
+ dirFilename = filepath.Dir(dirFilename)
+ }
+ return
}
- match, _ = filepath.Match(w.dirPattern, filepath.Base(filename))
- if stat.IsDir() && match {
- w.tryAddPath(filename)
+ dirFilename := filepath.Dir(filename)
+check_file:
+ for {
+ for _, path := range w.basePaths {
+ if path == dirFilename {
+ break check_file
+ }
+ }
+ if dirFilename == w.commonPath {
+ return
+ }
+ dirFilename = filepath.Dir(dirFilename)
+ }
+
+ for _, pattern := range w.paths.Include {
+ match, err := doublestar.PathMatch(pattern, path)
+ if err != nil {
+ w.logger.Errorf("wrong paths include pattern %q: %s", pattern, err.Error())
+ return
+ }
+
+ if match {
+ w.logger.Infof("path %s matched by pattern %s", filename, pattern)
+ w.notifyFn(e, filename, stat)
+ }
}
}
diff --git a/plugin/input/file/watcher_test.go b/plugin/input/file/watcher_test.go
index 3b7715bc4..56b353ba5 100644
--- a/plugin/input/file/watcher_test.go
+++ b/plugin/input/file/watcher_test.go
@@ -10,6 +10,7 @@ import (
"github.com/ozontech/file.d/metric"
"github.com/prometheus/client_golang/prometheus"
"github.com/rjeczalik/notify"
+ "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/zap"
@@ -19,7 +20,6 @@ func TestWatcher(t *testing.T) {
tests := []struct {
name string
filenamePattern string
- dirPattern string
}{
{
name: "should_ok_and_count_only_creation",
@@ -29,16 +29,22 @@ func TestWatcher(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
- path := t.TempDir()
+ dir := t.TempDir()
shouldCreate := atomic.Int64{}
notifyFn := func(_ notify.Event, _ string, _ os.FileInfo) {
shouldCreate.Inc()
}
ctl := metric.NewCtl("test", prometheus.NewRegistry())
w := NewWatcher(
- path,
- tt.filenamePattern,
- tt.dirPattern,
+ dir,
+ Paths{
+ Include: []string{
+ filepath.Join(
+ dir,
+ tt.filenamePattern,
+ ),
+ },
+ },
notifyFn,
false,
ctl.RegisterGauge("worker", "help_test"),
@@ -49,14 +55,14 @@ func TestWatcher(t *testing.T) {
// create, write, remove files and ensure events are only passed for creation events.
- f1Name := filepath.Join(path, "watch1.log")
+ f1Name := filepath.Join(dir, "watch1.log")
f1, err := os.Create(f1Name)
require.NoError(t, err)
err = f1.Close()
require.NoError(t, err)
- f2, err := os.Create(filepath.Join(path, "watch2.log"))
+ f2, err := os.Create(filepath.Join(dir, "watch2.log"))
require.NoError(t, err)
err = f2.Close()
require.NoError(t, err)
@@ -81,3 +87,120 @@ func TestWatcher(t *testing.T) {
})
}
}
+
+// nolint:gocritic
+func TestWatcherPaths(t *testing.T) {
+ dir := t.TempDir()
+ shouldCreate := atomic.Int64{}
+ notifyFn := func(_ notify.Event, _ string, _ os.FileInfo) {
+ shouldCreate.Inc()
+ }
+ ctl := metric.NewCtl("test", prometheus.NewRegistry())
+ w := NewWatcher(
+ dir,
+ Paths{
+ Include: []string{
+ filepath.Join(dir, "nginx-ingress-*/error.log"),
+ filepath.Join(dir, "log/**/*"),
+ filepath.Join(dir, "access.log"),
+ filepath.Join(dir, "**/sub_access.log"),
+ },
+ Exclude: []string{
+ filepath.Join(dir, "log/payments/**"),
+ filepath.Join(dir, "nginx-ingress-payments/error.log"),
+ },
+ },
+ notifyFn,
+ false,
+ ctl.RegisterGauge("worker", "help_test"),
+ zap.L().Sugar(),
+ )
+ w.start()
+ defer w.stop()
+
+ tests := []struct {
+ name string
+ filename string
+ shouldNotify bool
+ }{
+ {
+ filename: filepath.Join(dir, "nginx-ingress-0/error.log"),
+ shouldNotify: true,
+ },
+ {
+ filename: filepath.Join(dir, "log/errors.log"),
+ shouldNotify: true,
+ },
+ {
+ filename: filepath.Join(dir, "log/0/errors.log"),
+ shouldNotify: true,
+ },
+ {
+ filename: filepath.Join(dir, "log/0/0/errors.log"),
+ shouldNotify: true,
+ },
+ {
+ filename: filepath.Join(dir, "access.log"),
+ shouldNotify: true,
+ },
+ {
+ filename: filepath.Join(dir, "sub_access.log"),
+ shouldNotify: true,
+ },
+ {
+ filename: filepath.Join(dir, "access1.log"),
+ shouldNotify: false,
+ },
+ {
+ filename: filepath.Join(dir, "nginx/errors.log"),
+ shouldNotify: false,
+ },
+ {
+ filename: filepath.Join(dir, "log/payments/errors.log"),
+ shouldNotify: false,
+ },
+ {
+ filename: filepath.Join(dir, "log/payments/nginx-ingress-0/errors.log"),
+ shouldNotify: false,
+ },
+ {
+ filename: filepath.Join(dir, "nginx-ingress-payments/error.log"),
+ shouldNotify: false,
+ },
+ }
+ for _, tt := range tests {
+ tt := tt
+ relFilename, _ := filepath.Rel(dir, tt.filename)
+ t.Run(relFilename, func(t *testing.T) {
+ filename := tt.filename
+
+ err := os.MkdirAll(filepath.Dir(filename), 0o700)
+ require.NoError(t, err)
+
+ f1, err := os.Create(filename)
+ require.NoError(t, err)
+ err = f1.Close()
+ require.NoError(t, err)
+
+ time.Sleep(10 * time.Millisecond)
+ before := shouldCreate.Load()
+ w.notify(notify.Create, filename)
+ after := shouldCreate.Load()
+
+ isNotified := after-before != 0
+ require.Equal(t, tt.shouldNotify, isNotified)
+ })
+ }
+}
+
+func TestCommonPathPrefix(t *testing.T) {
+ a := assert.New(t)
+
+ paths := []string{
+ "/var/log/",
+ "/var/lib/docker/",
+ }
+
+ result := commonPathPrefix(paths)
+ a.Equal("/var", result)
+}