From 048ffc5825aeabf821719c220d8ea090dd777493 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Wed, 6 Mar 2024 18:26:59 +0700 Subject: [PATCH 01/11] file input plugin info url --- plugin/input/file/info.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/plugin/input/file/info.go b/plugin/input/file/info.go index 3aa3403c2..a478b5cb6 100644 --- a/plugin/input/file/info.go +++ b/plugin/input/file/info.go @@ -45,10 +45,16 @@ func (ir *InfoRegistry) Info(w http.ResponseWriter, r *http.Request) { _, _ = w.Write([]byte(jobsInfo)) - watcherInfo := logger.Header("watch_dirs") + watcherInfo := logger.Header("watch_paths") + for _, source := range plugin.jobProvider.watcher.basePaths { + watcherInfo += fmt.Sprintf( + "%s\n", + source, + ) + } watcherInfo += fmt.Sprintf( - "%s\n", - plugin.jobProvider.watcher.path, + "commonPath: %s\n", + plugin.jobProvider.watcher.commonPath, ) _, _ = w.Write([]byte(watcherInfo)) From 19d726fa0e68e0191ca09c8d7d490044e4bbabed Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Wed, 27 Dec 2023 19:47:52 +0700 Subject: [PATCH 02/11] file input plugin: new format for patterns --- go.mod | 1 + go.sum | 2 + plugin/README.md | 6 +- plugin/input/README.md | 6 +- plugin/input/file/README.md | 19 ++++- plugin/input/file/file.go | 24 +++++- plugin/input/file/provider.go | 17 +++- plugin/input/file/provider_test.go | 91 ++++++++++++++++++++++ plugin/input/file/watcher.go | 68 +++++++++------- plugin/input/file/watcher_test.go | 121 +++++++++++++++++++++++++++-- 10 files changed, 317 insertions(+), 38 deletions(-) diff --git a/go.mod b/go.mod index 0712d6598..fcaaa21e4 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/bufbuild/protocompile v0.13.0 github.com/cenkalti/backoff/v4 v4.2.1 github.com/cespare/xxhash/v2 v2.2.0 + github.com/bmatcuk/doublestar/v4 v4.0.2 github.com/euank/go-kmsg-parser v2.0.0+incompatible github.com/go-faster/jx v1.1.0 github.com/go-redis/redis v6.15.9+incompatible diff --git a/go.sum b/go.sum index bc79b75e2..1a46bcbb8 100644 --- a/go.sum +++ b/go.sum @@ -29,6 +29,8 @@ github.com/bitly/go-simplejson v0.5.1 h1:xgwPbetQScXt1gh9BmoJ6j9JMr3TElvuIyjR8pg github.com/bitly/go-simplejson v0.5.1/go.mod h1:YOPVLzCfwK14b4Sff3oP1AmGhI9T9Vsg84etUnlyp+Q= github.com/bufbuild/protocompile v0.13.0 h1:6cwUB0Y2tSvmNxsbunwzmIto3xOlJOV7ALALuVOs92M= github.com/bufbuild/protocompile v0.13.0/go.mod h1:dr++fGGeMPWHv7jPeT06ZKukm45NJscd7rUxQVzEKRk= +github.com/bmatcuk/doublestar/v4 v4.0.2 h1:X0krlUVAVmtr2cRoTqR8aDMrDqnB36ht8wpWTiQ3jsA= +github.com/bmatcuk/doublestar/v4 v4.0.2/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc= github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3eG1c= github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= diff --git a/plugin/README.md b/plugin/README.md index 4f94109fd..9f6052d3e 100755 --- a/plugin/README.md +++ b/plugin/README.md @@ -37,8 +37,12 @@ pipelines: input: type: file watching_dir: /var/lib/docker/containers + paths: + include: + - '**\/*-json.log' # remove \ + exclude: + - ef933707fe551f512d0b240558fdd01771f7897cccab75eb4fab0e575393ab79 offsets_file: /data/offsets.yaml - filename_pattern: "*-json.log" persistence_mode: async ``` diff --git a/plugin/input/README.md b/plugin/input/README.md index 8afbeb3a2..2228a0c62 100755 --- a/plugin/input/README.md +++ b/plugin/input/README.md @@ -36,8 +36,12 @@ pipelines: input: type: file watching_dir: /var/lib/docker/containers + paths: + include: + - '**\/*-json.log' # remove \ + exclude: + - ef933707fe551f512d0b240558fdd01771f7897cccab75eb4fab0e575393ab79 offsets_file: /data/offsets.yaml - filename_pattern: "*-json.log" persistence_mode: async ``` diff --git a/plugin/input/file/README.md b/plugin/input/file/README.md index 37ff65d34..88d5b1b1e 100755 --- a/plugin/input/file/README.md +++ b/plugin/input/file/README.md @@ -26,14 +26,24 @@ pipelines: input: type: file watching_dir: /var/lib/docker/containers + paths: + include: + - '**\/*-json.log' # remove \ + exclude: + - ef933707fe551f512d0b240558fdd01771f7897cccab75eb4fab0e575393ab79 offsets_file: /data/offsets.yaml - filename_pattern: "*-json.log" persistence_mode: async ``` ### Config params **`watching_dir`** *`string`* *`required`* +List of included pathes +*`string`* *`required`* + +List of excluded pathes +*`string`* *`required`* + The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have `/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`. Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more @@ -41,6 +51,13 @@ different directories, it's recommended to setup separate pipelines for each.
+**`paths`** *`Paths`* + +Paths. +> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. + +
+ **`offsets_file`** *`string`* *`required`* The filename to store offsets of processed files. Offsets are loaded only on initialization. diff --git a/plugin/input/file/file.go b/plugin/input/file/file.go index 36928eaad..34c8e9532 100644 --- a/plugin/input/file/file.go +++ b/plugin/input/file/file.go @@ -42,8 +42,12 @@ pipelines: input: type: file watching_dir: /var/lib/docker/containers + paths: + include: + - '**\/*-json.log' # remove \ + exclude: + - ef933707fe551f512d0b240558fdd01771f7897cccab75eb4fab0e575393ab79 offsets_file: /data/offsets.yaml - filename_pattern: "*-json.log" persistence_mode: async ``` }*/ @@ -81,6 +85,18 @@ const ( offsetsOpReset // * `reset` – resets an offset to the beginning of the file ) +type Paths struct { + // > @3@4@5@6 + // > + // > List of included pathes + Include []string `json:"include"` + + // > @3@4@5@6 + // > + // > List of excluded pathes + Exclude []string `json:"exclude"` +} + type Config struct { // ! config-params // ^ config-params @@ -93,6 +109,12 @@ type Config struct { // > different directories, it's recommended to setup separate pipelines for each. WatchingDir string `json:"watching_dir" required:"true"` // * + // > @3@4@5@6 + // > + // > Paths. + // > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. + Paths Paths `json:"paths"` // * + // > @3@4@5@6 // > // > The filename to store offsets of processed files. Offsets are loaded only on initialization. diff --git a/plugin/input/file/provider.go b/plugin/input/file/provider.go index d2fb412e2..5d4434df9 100644 --- a/plugin/input/file/provider.go +++ b/plugin/input/file/provider.go @@ -145,10 +145,23 @@ func NewJobProvider(config *Config, metrics *metricCollection, sugLogger *zap.Su numberOfCurrentJobsMetric: metrics.numberOfCurrentJobsMetric, } + if len(config.Paths.Include) == 0 { + if config.DirPattern == "*" { + config.Paths.Include = append( + config.Paths.Include, + filepath.Join("**", config.FilenamePattern), + ) + } else { + config.Paths.Include = append( + config.Paths.Include, + filepath.Join(config.DirPattern, config.FilenamePattern), + ) + } + } + jp.watcher = NewWatcher( config.WatchingDir, - config.FilenamePattern, - config.DirPattern, + config.Paths, jp.processNotification, config.ShouldWatchChanges, metrics.notifyChannelLengthMetric, diff --git a/plugin/input/file/provider_test.go b/plugin/input/file/provider_test.go index 7973bea71..f9b388b82 100644 --- a/plugin/input/file/provider_test.go +++ b/plugin/input/file/provider_test.go @@ -47,4 +47,95 @@ func TestRefreshSymlinkOnBrokenLink(t *testing.T) { os.Remove(linkName) jp.maintenanceSymlinks() require.Equal(t, 0, len(jp.symlinks)) + "runtime" + "testing" + + "github.com/ozontech/file.d/metric" + "github.com/ozontech/file.d/test" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" +) + +func TestProvierWatcherPaths(t *testing.T) { + tests := []struct { + name string + config *Config + expectedPathes Paths + }{ + { + name: "default config", + config: &Config{ + WatchingDir: "/var/log/", + OffsetsFile: "offset.json", + }, + expectedPathes: Paths{ + Include: []string{"**/*"}, + }, + }, + { + name: "filename pattern config", + config: &Config{ + WatchingDir: "/var/log/", + FilenamePattern: "error.log", + OffsetsFile: "offset.json", + }, + expectedPathes: Paths{ + Include: []string{"**/error.log"}, + }, + }, + { + name: "filename and dir pattern config", + config: &Config{ + WatchingDir: "/var/log/", + FilenamePattern: "error.log", + DirPattern: "nginx-ingress-*", + OffsetsFile: "offset.json", + }, + expectedPathes: Paths{ + Include: []string{"nginx-ingress-*/error.log"}, + }, + }, + { + name: "dir pattern config", + config: &Config{ + WatchingDir: "/var/log/", + DirPattern: "nginx-ingress-*", + OffsetsFile: "offset.json", + }, + expectedPathes: Paths{ + Include: []string{"nginx-ingress-*/*"}, + }, + }, + { + name: "ignore filename and dir pattern", + config: &Config{ + WatchingDir: "/var/log/", + FilenamePattern: "error.log", + DirPattern: "nginx-ingress-*", + OffsetsFile: "offset.json", + Paths: Paths{ + Include: []string{"access.log"}, + }, + }, + expectedPathes: Paths{ + Include: []string{"access.log"}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctl := metric.NewCtl("test", prometheus.NewRegistry()) + config := tt.config + test.NewConfig(config, map[string]int{"gomaxprocs": runtime.GOMAXPROCS(0)}) + metrics := newMetricCollection( + ctl.RegisterCounter("worker", "help_test"), + ctl.RegisterCounter("worker", "help_test"), + ctl.RegisterGauge("worker", "help_test"), + ) + jp := NewJobProvider(config, metrics, &zap.SugaredLogger{}) + + assert.Equal(t, tt.expectedPathes, jp.watcher.paths) + }) + } } diff --git a/plugin/input/file/watcher.go b/plugin/input/file/watcher.go index 6f69aa792..e4d707637 100644 --- a/plugin/input/file/watcher.go +++ b/plugin/input/file/watcher.go @@ -4,15 +4,15 @@ import ( "os" "path/filepath" + "github.com/bmatcuk/doublestar/v4" "github.com/prometheus/client_golang/prometheus" "github.com/rjeczalik/notify" "go.uber.org/zap" ) type watcher struct { - path string // dir in which watch for files - filenamePattern string // files which match this pattern will be watched - dirPattern string // dirs which match this pattern will be watched + dir string // dir in which watch for files + paths Paths notifyFn notifyFn // function to receive notifications watcherCh chan notify.EventInfo shouldWatchWrites bool @@ -25,18 +25,16 @@ type notifyFn func(e notify.Event, filename string, stat os.FileInfo) // NewWatcher creates a watcher that see file creations in the path // and if they match filePattern and dirPattern, pass them to notifyFn. func NewWatcher( - path string, - filenamePattern string, - dirPattern string, + dir string, + paths Paths, notifyFn notifyFn, shouldWatchWrites bool, notifyChannelLengthMetric prometheus.Gauge, logger *zap.SugaredLogger, ) *watcher { return &watcher{ - path: path, - filenamePattern: filenamePattern, - dirPattern: dirPattern, + dir: dir, + paths: paths, notifyFn: notifyFn, shouldWatchWrites: shouldWatchWrites, notifyChannelLengthMetric: notifyChannelLengthMetric, @@ -45,15 +43,10 @@ func NewWatcher( } func (w *watcher) start() { - w.logger.Infof("starting watcher path=%s, pattern=%s", w.path, w.filenamePattern) - - if _, err := filepath.Match(w.filenamePattern, "_"); err != nil { - w.logger.Fatalf("wrong file name pattern %q: %s", w.filenamePattern, err.Error()) - } - - if _, err := filepath.Match(w.dirPattern, "_"); err != nil { - w.logger.Fatalf("wrong dir name pattern %q: %s", w.dirPattern, err.Error()) - } + w.logger.Infof( + "starting watcher path=%s, pattern_included=%q, pattern_excluded=%q", + w.dir, w.paths.Include, w.paths.Exclude, + ) eventsCh := make(chan notify.EventInfo, 256) w.watcherCh = eventsCh @@ -64,7 +57,7 @@ func (w *watcher) start() { } // watch recursively. - err := notify.Watch(filepath.Join(w.path, "..."), eventsCh, events...) + err := notify.Watch(filepath.Join(w.dir, "..."), eventsCh, events...) if err != nil { w.logger.Warnf("can't create fs watcher: %s", err.Error()) return @@ -73,7 +66,7 @@ func (w *watcher) start() { go w.watch() - w.tryAddPath(w.path) + w.tryAddPath(w.dir) } func (w *watcher) stop() { @@ -112,19 +105,42 @@ func (w *watcher) notify(e notify.Event, path string) { return } + dirRel, _ := filepath.Abs(w.dir) + rel, _ := filepath.Rel(dirRel, filename) + + w.logger.Infof("%s %s", e, path) + + for _, pattern := range w.paths.Exclude { + match, err := doublestar.PathMatch(pattern, rel) + if err != nil { + w.logger.Fatalf("wrong paths exclude pattern %q: %s", pattern, err.Error()) + return + } + if match { + return + } + } + stat, err := os.Lstat(filename) if err != nil { return } - match, _ := filepath.Match(w.filenamePattern, filepath.Base(filename)) - if match { - w.notifyFn(e, filename, stat) + if stat.IsDir() { + w.tryAddPath(filename) + return } - match, _ = filepath.Match(w.dirPattern, filepath.Base(filename)) - if stat.IsDir() && match { - w.tryAddPath(filename) + for _, pattern := range w.paths.Include { + match, err := doublestar.PathMatch(pattern, rel) + if err != nil { + w.logger.Fatalf("wrong paths include pattern %q: %s", pattern, err.Error()) + return + } + + if match { + w.notifyFn(e, filename, stat) + } } } diff --git a/plugin/input/file/watcher_test.go b/plugin/input/file/watcher_test.go index 3b7715bc4..a195192e1 100644 --- a/plugin/input/file/watcher_test.go +++ b/plugin/input/file/watcher_test.go @@ -29,16 +29,23 @@ func TestWatcher(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - path := t.TempDir() + dir := t.TempDir() shouldCreate := atomic.Int64{} notifyFn := func(_ notify.Event, _ string, _ os.FileInfo) { shouldCreate.Inc() } ctl := metric.NewCtl("test", prometheus.NewRegistry()) w := NewWatcher( - path, - tt.filenamePattern, - tt.dirPattern, + dir, + Paths{ + Include: []string{ + tt.dirPattern, + filepath.Join( + tt.dirPattern, + tt.filenamePattern, + ), + }, + }, notifyFn, false, ctl.RegisterGauge("worker", "help_test"), @@ -49,14 +56,14 @@ func TestWatcher(t *testing.T) { // create, write, remove files and ensure events are only passed for creation events. - f1Name := filepath.Join(path, "watch1.log") + f1Name := filepath.Join(dir, "watch1.log") f1, err := os.Create(f1Name) require.NoError(t, err) err = f1.Close() require.NoError(t, err) - f2, err := os.Create(filepath.Join(path, "watch2.log")) + f2, err := os.Create(filepath.Join(dir, "watch2.log")) require.NoError(t, err) err = f2.Close() require.NoError(t, err) @@ -81,3 +88,105 @@ func TestWatcher(t *testing.T) { }) } } + +func TestWatcherPaths(t *testing.T) { + dir := t.TempDir() + shouldCreate := atomic.Int64{} + notifyFn := func(_ notify.Event, _ string, _ os.FileInfo) { + shouldCreate.Inc() + } + ctl := metric.NewCtl("test", prometheus.NewRegistry()) + w := NewWatcher( + dir, + Paths{ + Include: []string{ + "nginx-ingress-*/error.log", + "log/**/*", + "access.log", + "**/sub_access.log", + }, + Exclude: []string{ + "log/payments/**", + "nginx-ingress-payments/error.log", + }, + }, + notifyFn, + false, + ctl.RegisterGauge("worker", "help_test"), + zap.L().Sugar(), + ) + w.start() + defer w.stop() + + tests := []struct { + name string + filename string + shouldNotify bool + }{ + { + filename: "nginx-ingress-0/error.log", + shouldNotify: true, + }, + { + filename: "log/errors.log", + shouldNotify: true, + }, + { + filename: "log/0/errors.log", + shouldNotify: true, + }, + { + filename: "log/0/0/errors.log", + shouldNotify: true, + }, + { + filename: "access.log", + shouldNotify: true, + }, + { + filename: "sub_access.log", + shouldNotify: true, + }, + { + filename: "access1.log", + shouldNotify: false, + }, + { + filename: "nginx/errors.log", + shouldNotify: false, + }, + { + filename: "log/payments/errors.log", + shouldNotify: false, + }, + { + filename: "log/payments/nginx-ingress-0/errors.log", + shouldNotify: false, + }, + { + filename: "nginx-ingress-payments/error.log", + shouldNotify: false, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.filename, func(t *testing.T) { + filename := filepath.Join(dir, tt.filename) + + err := os.MkdirAll(filepath.Dir(filename), 0o700) + require.NoError(t, err) + + f1, err := os.Create(filename) + require.NoError(t, err) + err = f1.Close() + require.NoError(t, err) + + before := shouldCreate.Load() + w.notify(notify.Create, filename) + after := shouldCreate.Load() + + isNotified := after-before != 0 + require.Equal(t, tt.shouldNotify, isNotified) + }) + } +} From c2f9379b572aa67f5b60b536761c10dcfe69f980 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Wed, 27 Dec 2023 19:47:52 +0700 Subject: [PATCH 03/11] file input plugin: new format for patterns --- go.mod | 1 + 1 file changed, 1 insertion(+) diff --git a/go.mod b/go.mod index fcaaa21e4..bf8d633f3 100644 --- a/go.mod +++ b/go.mod @@ -57,6 +57,7 @@ require ( github.com/andybalholm/brotli v1.0.5 // indirect github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/bmatcuk/doublestar/v4 v4.0.2 // indirect github.com/cenkalti/backoff/v3 v3.0.0 // indirect github.com/cilium/ebpf v0.9.1 // indirect github.com/containerd/cgroups/v3 v3.0.1 // indirect From be2dc23bfc5274266987c95be83167beff1c6093 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Fri, 29 Dec 2023 16:06:11 +0700 Subject: [PATCH 04/11] define watching_dir for file watcher --- go.mod | 1 - plugin/README.md | 5 +- plugin/input/README.md | 5 +- plugin/input/file/README.md | 5 +- plugin/input/file/file.go | 5 +- plugin/input/file/provider.go | 4 +- plugin/input/file/provider_test.go | 12 ++--- plugin/input/file/watcher.go | 77 ++++++++++++++++++++++-------- plugin/input/file/watcher_test.go | 57 +++++++++++++--------- 9 files changed, 108 insertions(+), 63 deletions(-) diff --git a/go.mod b/go.mod index bf8d633f3..fcaaa21e4 100644 --- a/go.mod +++ b/go.mod @@ -57,7 +57,6 @@ require ( github.com/andybalholm/brotli v1.0.5 // indirect github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/bmatcuk/doublestar/v4 v4.0.2 // indirect github.com/cenkalti/backoff/v3 v3.0.0 // indirect github.com/cilium/ebpf v0.9.1 // indirect github.com/containerd/cgroups/v3 v3.0.1 // indirect diff --git a/plugin/README.md b/plugin/README.md index 9f6052d3e..f81990591 100755 --- a/plugin/README.md +++ b/plugin/README.md @@ -36,12 +36,11 @@ pipelines: example_docker_pipeline: input: type: file - watching_dir: /var/lib/docker/containers paths: include: - - '**\/*-json.log' # remove \ + - '/var/lib/docker/containers/**\/*-json.log' # remove \ exclude: - - ef933707fe551f512d0b240558fdd01771f7897cccab75eb4fab0e575393ab79 + - '/var/lib/docker/containers/19aa5027343f4*\/*-json.log' # remove \ offsets_file: /data/offsets.yaml persistence_mode: async ``` diff --git a/plugin/input/README.md b/plugin/input/README.md index 2228a0c62..09e951bfd 100755 --- a/plugin/input/README.md +++ b/plugin/input/README.md @@ -35,12 +35,11 @@ pipelines: example_docker_pipeline: input: type: file - watching_dir: /var/lib/docker/containers paths: include: - - '**\/*-json.log' # remove \ + - '/var/lib/docker/containers/**\/*-json.log' # remove \ exclude: - - ef933707fe551f512d0b240558fdd01771f7897cccab75eb4fab0e575393ab79 + - '/var/lib/docker/containers/19aa5027343f4*\/*-json.log' # remove \ offsets_file: /data/offsets.yaml persistence_mode: async ``` diff --git a/plugin/input/file/README.md b/plugin/input/file/README.md index 88d5b1b1e..9b918801f 100755 --- a/plugin/input/file/README.md +++ b/plugin/input/file/README.md @@ -25,12 +25,11 @@ pipelines: example_docker_pipeline: input: type: file - watching_dir: /var/lib/docker/containers paths: include: - - '**\/*-json.log' # remove \ + - '/var/lib/docker/containers/**\/*-json.log' # remove \ exclude: - - ef933707fe551f512d0b240558fdd01771f7897cccab75eb4fab0e575393ab79 + - '/var/lib/docker/containers/19aa5027343f4*\/*-json.log' # remove \ offsets_file: /data/offsets.yaml persistence_mode: async ``` diff --git a/plugin/input/file/file.go b/plugin/input/file/file.go index 34c8e9532..cc68513fa 100644 --- a/plugin/input/file/file.go +++ b/plugin/input/file/file.go @@ -41,12 +41,11 @@ pipelines: example_docker_pipeline: input: type: file - watching_dir: /var/lib/docker/containers paths: include: - - '**\/*-json.log' # remove \ + - '/var/lib/docker/containers/**\/*-json.log' # remove \ exclude: - - ef933707fe551f512d0b240558fdd01771f7897cccab75eb4fab0e575393ab79 + - '/var/lib/docker/containers/19aa5027343f4*\/*-json.log' # remove \ offsets_file: /data/offsets.yaml persistence_mode: async ``` diff --git a/plugin/input/file/provider.go b/plugin/input/file/provider.go index 5d4434df9..0784938a4 100644 --- a/plugin/input/file/provider.go +++ b/plugin/input/file/provider.go @@ -149,12 +149,12 @@ func NewJobProvider(config *Config, metrics *metricCollection, sugLogger *zap.Su if config.DirPattern == "*" { config.Paths.Include = append( config.Paths.Include, - filepath.Join("**", config.FilenamePattern), + filepath.Join(config.WatchingDir, filepath.Join("**", config.FilenamePattern)), ) } else { config.Paths.Include = append( config.Paths.Include, - filepath.Join(config.DirPattern, config.FilenamePattern), + filepath.Join(config.WatchingDir, filepath.Join(config.DirPattern, config.FilenamePattern)), ) } } diff --git a/plugin/input/file/provider_test.go b/plugin/input/file/provider_test.go index f9b388b82..7fa421a9e 100644 --- a/plugin/input/file/provider_test.go +++ b/plugin/input/file/provider_test.go @@ -70,7 +70,7 @@ func TestProvierWatcherPaths(t *testing.T) { OffsetsFile: "offset.json", }, expectedPathes: Paths{ - Include: []string{"**/*"}, + Include: []string{"/var/log/**/*"}, }, }, { @@ -81,7 +81,7 @@ func TestProvierWatcherPaths(t *testing.T) { OffsetsFile: "offset.json", }, expectedPathes: Paths{ - Include: []string{"**/error.log"}, + Include: []string{"/var/log/**/error.log"}, }, }, { @@ -93,7 +93,7 @@ func TestProvierWatcherPaths(t *testing.T) { OffsetsFile: "offset.json", }, expectedPathes: Paths{ - Include: []string{"nginx-ingress-*/error.log"}, + Include: []string{"/var/log/nginx-ingress-*/error.log"}, }, }, { @@ -104,7 +104,7 @@ func TestProvierWatcherPaths(t *testing.T) { OffsetsFile: "offset.json", }, expectedPathes: Paths{ - Include: []string{"nginx-ingress-*/*"}, + Include: []string{"/var/log/nginx-ingress-*/*"}, }, }, { @@ -115,11 +115,11 @@ func TestProvierWatcherPaths(t *testing.T) { DirPattern: "nginx-ingress-*", OffsetsFile: "offset.json", Paths: Paths{ - Include: []string{"access.log"}, + Include: []string{"/var/log/access.log"}, }, }, expectedPathes: Paths{ - Include: []string{"access.log"}, + Include: []string{"/var/log/access.log"}, }, }, } diff --git a/plugin/input/file/watcher.go b/plugin/input/file/watcher.go index e4d707637..a99dbe40d 100644 --- a/plugin/input/file/watcher.go +++ b/plugin/input/file/watcher.go @@ -3,6 +3,7 @@ package file import ( "os" "path/filepath" + "strings" "github.com/bmatcuk/doublestar/v4" "github.com/prometheus/client_golang/prometheus" @@ -11,7 +12,8 @@ import ( ) type watcher struct { - dir string // dir in which watch for files + commonPath string + basePaths []string paths Paths notifyFn notifyFn // function to receive notifications watcherCh chan notify.EventInfo @@ -33,7 +35,6 @@ func NewWatcher( logger *zap.SugaredLogger, ) *watcher { return &watcher{ - dir: dir, paths: paths, notifyFn: notifyFn, shouldWatchWrites: shouldWatchWrites, @@ -43,9 +44,16 @@ func NewWatcher( } func (w *watcher) start() { + for _, pattern := range w.paths.Include { + // /var/lib/docker/containers/**/*-json.log -> /var/lib/docker/containers + basePattern, _ := doublestar.SplitPattern(pattern) + w.basePaths = append(w.basePaths, basePattern) + } + w.commonPath = commonPathPrefix(w.basePaths) + w.logger.Infof( "starting watcher path=%s, pattern_included=%q, pattern_excluded=%q", - w.dir, w.paths.Include, w.paths.Exclude, + w.commonPath, w.paths.Include, w.paths.Exclude, ) eventsCh := make(chan notify.EventInfo, 256) @@ -57,7 +65,7 @@ func (w *watcher) start() { } // watch recursively. - err := notify.Watch(filepath.Join(w.dir, "..."), eventsCh, events...) + err := notify.Watch(filepath.Join(w.commonPath, "..."), eventsCh, events...) if err != nil { w.logger.Warnf("can't create fs watcher: %s", err.Error()) return @@ -66,7 +74,33 @@ func (w *watcher) start() { go w.watch() - w.tryAddPath(w.dir) + w.tryAddPath(w.commonPath) +} + +func commonPathPrefix(paths []string) string { + results := make([][]string, 0, len(paths)) + results = append(results, strings.Split(paths[0], string(os.PathSeparator))) + longest := results[0] + + cmpWithLongest := func(a []string) { + if len(a) < len(longest) { + longest = longest[:len(a)] + } + for i := 0; i < len(longest); i++ { + if a[i] != longest[i] { + longest = longest[:i] + return + } + } + } + + for i := 1; i < len(paths); i++ { + r := strings.Split(paths[i], string(os.PathSeparator)) + results = append(results, r) + cmpWithLongest(r) + } + + return filepath.Join(string(os.PathSeparator), filepath.Join(longest...)) } func (w *watcher) stop() { @@ -89,7 +123,8 @@ func (w *watcher) tryAddPath(path string) { continue } - w.notify(notify.Create, filepath.Join(path, file.Name())) + filename := filepath.Join(path, file.Name()) + w.notify(notify.Create, filename) } } @@ -99,19 +134,8 @@ func (w *watcher) notify(e notify.Event, path string) { return } - filename, err := filepath.Abs(filename) - if err != nil { - w.logger.Fatalf("can't get abs file name: %s", err.Error()) - return - } - - dirRel, _ := filepath.Abs(w.dir) - rel, _ := filepath.Rel(dirRel, filename) - - w.logger.Infof("%s %s", e, path) - for _, pattern := range w.paths.Exclude { - match, err := doublestar.PathMatch(pattern, rel) + match, err := doublestar.PathMatch(pattern, path) if err != nil { w.logger.Fatalf("wrong paths exclude pattern %q: %s", pattern, err.Error()) return @@ -127,12 +151,25 @@ func (w *watcher) notify(e notify.Event, path string) { } if stat.IsDir() { - w.tryAddPath(filename) + dirFilename := filename + for { + for _, path := range w.basePaths { + if path == dirFilename { + w.tryAddPath(filename) + } + } + if dirFilename == w.commonPath { + break + } + dirFilename = filepath.Dir(dirFilename) + } return } + w.logger.Infof("%s %s", e, path) + for _, pattern := range w.paths.Include { - match, err := doublestar.PathMatch(pattern, rel) + match, err := doublestar.PathMatch(pattern, path) if err != nil { w.logger.Fatalf("wrong paths include pattern %q: %s", pattern, err.Error()) return diff --git a/plugin/input/file/watcher_test.go b/plugin/input/file/watcher_test.go index a195192e1..cd2c559d1 100644 --- a/plugin/input/file/watcher_test.go +++ b/plugin/input/file/watcher_test.go @@ -10,6 +10,7 @@ import ( "github.com/ozontech/file.d/metric" "github.com/prometheus/client_golang/prometheus" "github.com/rjeczalik/notify" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" "go.uber.org/zap" @@ -19,7 +20,6 @@ func TestWatcher(t *testing.T) { tests := []struct { name string filenamePattern string - dirPattern string }{ { name: "should_ok_and_count_only_creation", @@ -39,9 +39,8 @@ func TestWatcher(t *testing.T) { dir, Paths{ Include: []string{ - tt.dirPattern, filepath.Join( - tt.dirPattern, + dir, tt.filenamePattern, ), }, @@ -100,14 +99,14 @@ func TestWatcherPaths(t *testing.T) { dir, Paths{ Include: []string{ - "nginx-ingress-*/error.log", - "log/**/*", - "access.log", - "**/sub_access.log", + filepath.Join(dir, "nginx-ingress-*/error.log"), + filepath.Join(dir, "log/**/*"), + filepath.Join(dir, "access.log"), + filepath.Join(dir, "**/sub_access.log"), }, Exclude: []string{ - "log/payments/**", - "nginx-ingress-payments/error.log", + filepath.Join(dir, "log/payments/**"), + filepath.Join(dir, "nginx-ingress-payments/error.log"), }, }, notifyFn, @@ -124,54 +123,55 @@ func TestWatcherPaths(t *testing.T) { shouldNotify bool }{ { - filename: "nginx-ingress-0/error.log", + filename: filepath.Join(dir, "nginx-ingress-0/error.log"), shouldNotify: true, }, { - filename: "log/errors.log", + filename: filepath.Join(dir, "log/errors.log"), shouldNotify: true, }, { - filename: "log/0/errors.log", + filename: filepath.Join(dir, "log/0/errors.log"), shouldNotify: true, }, { - filename: "log/0/0/errors.log", + filename: filepath.Join(dir, "log/0/0/errors.log"), shouldNotify: true, }, { - filename: "access.log", + filename: filepath.Join(dir, "access.log"), shouldNotify: true, }, { - filename: "sub_access.log", + filename: filepath.Join(dir, "sub_access.log"), shouldNotify: true, }, { - filename: "access1.log", + filename: filepath.Join(dir, "access1.log"), shouldNotify: false, }, { - filename: "nginx/errors.log", + filename: filepath.Join(dir, "nginx/errors.log"), shouldNotify: false, }, { - filename: "log/payments/errors.log", + filename: filepath.Join(dir, "log/payments/errors.log"), shouldNotify: false, }, { - filename: "log/payments/nginx-ingress-0/errors.log", + filename: filepath.Join(dir, "log/payments/nginx-ingress-0/errors.log"), shouldNotify: false, }, { - filename: "nginx-ingress-payments/error.log", + filename: filepath.Join(dir, "nginx-ingress-payments/error.log"), shouldNotify: false, }, } for _, tt := range tests { tt := tt - t.Run(tt.filename, func(t *testing.T) { - filename := filepath.Join(dir, tt.filename) + relFilename, _ := filepath.Rel(dir, tt.filename) + t.Run(relFilename, func(t *testing.T) { + filename := tt.filename err := os.MkdirAll(filepath.Dir(filename), 0o700) require.NoError(t, err) @@ -181,6 +181,7 @@ func TestWatcherPaths(t *testing.T) { err = f1.Close() require.NoError(t, err) + time.Sleep(10 * time.Millisecond) before := shouldCreate.Load() w.notify(notify.Create, filename) after := shouldCreate.Load() @@ -190,3 +191,15 @@ func TestWatcherPaths(t *testing.T) { }) } } + +func TestCommonPathPrefix(t *testing.T) { + a := assert.New(t) + + paths := []string{ + "/var/log/", + "/var/lib/docker/", + } + + result := commonPathPrefix(paths) + a.Equal("/var", result) +} From 7b2603ce1d4949531913e126f7c6ad84b27f8ebb Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Fri, 12 Jan 2024 15:35:04 +0700 Subject: [PATCH 05/11] check dir files in notify by base paths from include patterns --- go.mod | 2 +- plugin/input/file/provider_test.go | 6 +++--- plugin/input/file/watcher.go | 16 ++++++++++++++++ plugin/input/file/watcher_test.go | 1 + 4 files changed, 21 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index fcaaa21e4..0819bcdec 100644 --- a/go.mod +++ b/go.mod @@ -13,9 +13,9 @@ require ( github.com/alicebob/miniredis/v2 v2.30.5 github.com/bitly/go-simplejson v0.5.1 github.com/bufbuild/protocompile v0.13.0 + github.com/bmatcuk/doublestar/v4 v4.0.2 github.com/cenkalti/backoff/v4 v4.2.1 github.com/cespare/xxhash/v2 v2.2.0 - github.com/bmatcuk/doublestar/v4 v4.0.2 github.com/euank/go-kmsg-parser v2.0.0+incompatible github.com/go-faster/jx v1.1.0 github.com/go-redis/redis v6.15.9+incompatible diff --git a/plugin/input/file/provider_test.go b/plugin/input/file/provider_test.go index 7fa421a9e..02422b697 100644 --- a/plugin/input/file/provider_test.go +++ b/plugin/input/file/provider_test.go @@ -129,9 +129,9 @@ func TestProvierWatcherPaths(t *testing.T) { config := tt.config test.NewConfig(config, map[string]int{"gomaxprocs": runtime.GOMAXPROCS(0)}) metrics := newMetricCollection( - ctl.RegisterCounter("worker", "help_test"), - ctl.RegisterCounter("worker", "help_test"), - ctl.RegisterGauge("worker", "help_test"), + ctl.RegisterCounter("worker1", "help_test"), + ctl.RegisterCounter("worker2", "help_test"), + ctl.RegisterGauge("worker3", "help_test"), ) jp := NewJobProvider(config, metrics, &zap.SugaredLogger{}) diff --git a/plugin/input/file/watcher.go b/plugin/input/file/watcher.go index a99dbe40d..6f458a433 100644 --- a/plugin/input/file/watcher.go +++ b/plugin/input/file/watcher.go @@ -152,10 +152,12 @@ func (w *watcher) notify(e notify.Event, path string) { if stat.IsDir() { dirFilename := filename + check_dir: for { for _, path := range w.basePaths { if path == dirFilename { w.tryAddPath(filename) + break check_dir } } if dirFilename == w.commonPath { @@ -166,6 +168,20 @@ func (w *watcher) notify(e notify.Event, path string) { return } + dirFilename := filepath.Dir(filename) +check_file: + for { + for _, path := range w.basePaths { + if path == dirFilename { + break check_file + } + } + if dirFilename == w.commonPath { + return + } + dirFilename = filepath.Dir(dirFilename) + } + w.logger.Infof("%s %s", e, path) for _, pattern := range w.paths.Include { diff --git a/plugin/input/file/watcher_test.go b/plugin/input/file/watcher_test.go index cd2c559d1..56b353ba5 100644 --- a/plugin/input/file/watcher_test.go +++ b/plugin/input/file/watcher_test.go @@ -88,6 +88,7 @@ func TestWatcher(t *testing.T) { } } +// nolint:gocritic func TestWatcherPaths(t *testing.T) { dir := t.TempDir() shouldCreate := atomic.Int64{} From 5ebd892a5f8ec345e4a0040b26efa991e01a83dc Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Wed, 6 Mar 2024 21:28:12 +0700 Subject: [PATCH 06/11] use filepath.Walk --- plugin/input/file/watcher.go | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/plugin/input/file/watcher.go b/plugin/input/file/watcher.go index 6f458a433..0d2ac3cd9 100644 --- a/plugin/input/file/watcher.go +++ b/plugin/input/file/watcher.go @@ -111,20 +111,21 @@ func (w *watcher) stop() { } func (w *watcher) tryAddPath(path string) { - files, err := os.ReadDir(path) - if err != nil { - return - } - w.logger.Infof("starting path watch: %s ", path) - for _, file := range files { - if file.Name() == "" || file.Name() == "." || file.Name() == ".." { - continue - } - - filename := filepath.Join(path, file.Name()) - w.notify(notify.Create, filename) + err := filepath.Walk(path, + func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() { + w.notify(notify.Create, path) + } + return nil + }, + ) + if err != nil { + return } } From 5c77095cba740bf8b4d24da858b6fe8da49406fa Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Thu, 28 Mar 2024 15:24:56 +0700 Subject: [PATCH 07/11] watching_dir is not required --- plugin/input/file/README.md | 6 +++--- plugin/input/file/file.go | 2 +- plugin/input/file/provider_test.go | 1 + 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/plugin/input/file/README.md b/plugin/input/file/README.md index 9b918801f..94409da29 100755 --- a/plugin/input/file/README.md +++ b/plugin/input/file/README.md @@ -35,13 +35,13 @@ pipelines: ``` ### Config params -**`watching_dir`** *`string`* *`required`* +**`watching_dir`** *`string`* List of included pathes -*`string`* *`required`* +*`string`* List of excluded pathes -*`string`* *`required`* +*`string`* The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have `/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`. diff --git a/plugin/input/file/file.go b/plugin/input/file/file.go index cc68513fa..df31e1a4c 100644 --- a/plugin/input/file/file.go +++ b/plugin/input/file/file.go @@ -106,7 +106,7 @@ type Config struct { // > `/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`. // > Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more // > different directories, it's recommended to setup separate pipelines for each. - WatchingDir string `json:"watching_dir" required:"true"` // * + WatchingDir string `json:"watching_dir"` // * // > @3@4@5@6 // > diff --git a/plugin/input/file/provider_test.go b/plugin/input/file/provider_test.go index 02422b697..f4d0934f5 100644 --- a/plugin/input/file/provider_test.go +++ b/plugin/input/file/provider_test.go @@ -132,6 +132,7 @@ func TestProvierWatcherPaths(t *testing.T) { ctl.RegisterCounter("worker1", "help_test"), ctl.RegisterCounter("worker2", "help_test"), ctl.RegisterGauge("worker3", "help_test"), + ctl.RegisterGauge("worker4", "help_test"), ) jp := NewJobProvider(config, metrics, &zap.SugaredLogger{}) From 90e780825495f8de9a2284ba1cd148a311c250ac Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Fri, 19 Apr 2024 12:28:31 +0700 Subject: [PATCH 08/11] fix doc for new format in file plugin --- plugin/README.md | 15 -------- plugin/input/README.md | 15 -------- plugin/input/file/README.idoc.md | 15 ++++++++ plugin/input/file/README.md | 62 ++++++++++++++---------------- plugin/input/file/file.go | 66 +++++++++++--------------------- 5 files changed, 67 insertions(+), 106 deletions(-) diff --git a/plugin/README.md b/plugin/README.md index f81990591..b5554e32f 100755 --- a/plugin/README.md +++ b/plugin/README.md @@ -30,21 +30,6 @@ But update events don't work with symlinks, so watcher also periodically manuall > ⚠ Use add_file_name plugin if you want to add filename to events. -**Reading docker container log files:** -```yaml -pipelines: - example_docker_pipeline: - input: - type: file - paths: - include: - - '/var/lib/docker/containers/**\/*-json.log' # remove \ - exclude: - - '/var/lib/docker/containers/19aa5027343f4*\/*-json.log' # remove \ - offsets_file: /data/offsets.yaml - persistence_mode: async -``` - [More details...](plugin/input/file/README.md) ## http Reads events from HTTP requests with the body delimited by a new line. diff --git a/plugin/input/README.md b/plugin/input/README.md index 09e951bfd..cd1db25d2 100755 --- a/plugin/input/README.md +++ b/plugin/input/README.md @@ -29,21 +29,6 @@ But update events don't work with symlinks, so watcher also periodically manuall > ⚠ Use add_file_name plugin if you want to add filename to events. -**Reading docker container log files:** -```yaml -pipelines: - example_docker_pipeline: - input: - type: file - paths: - include: - - '/var/lib/docker/containers/**\/*-json.log' # remove \ - exclude: - - '/var/lib/docker/containers/19aa5027343f4*\/*-json.log' # remove \ - offsets_file: /data/offsets.yaml - persistence_mode: async -``` - [More details...](plugin/input/file/README.md) ## http Reads events from HTTP requests with the body delimited by a new line. diff --git a/plugin/input/file/README.idoc.md b/plugin/input/file/README.idoc.md index 7c700b4ac..83ea9cc00 100644 --- a/plugin/input/file/README.idoc.md +++ b/plugin/input/file/README.idoc.md @@ -1,6 +1,21 @@ # File plugin @introduction +**Reading docker container log files:** +```yaml +pipelines: + example_docker_pipeline: + input: + type: file + paths: + include: + - '/var/lib/docker/containers/**/*-json.log' + exclude: + - '/var/lib/docker/containers/19aa5027343f4*/*-json.log' + offsets_file: /data/offsets.yaml + persistence_mode: async +``` + ### Config params @config-params|description diff --git a/plugin/input/file/README.md b/plugin/input/file/README.md index 94409da29..f5ee6b8d2 100755 --- a/plugin/input/file/README.md +++ b/plugin/input/file/README.md @@ -27,33 +27,20 @@ pipelines: type: file paths: include: - - '/var/lib/docker/containers/**\/*-json.log' # remove \ + - '/var/lib/docker/containers/**/*-json.log' exclude: - - '/var/lib/docker/containers/19aa5027343f4*\/*-json.log' # remove \ + - '/var/lib/docker/containers/19aa5027343f4*/*-json.log' offsets_file: /data/offsets.yaml persistence_mode: async ``` ### Config params -**`watching_dir`** *`string`* - -List of included pathes -*`string`* - -List of excluded pathes -*`string`* - -The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have -`/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`. -Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more -different directories, it's recommended to setup separate pipelines for each. - -
- **`paths`** *`Paths`* -Paths. -> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. +Set paths in glob format + +* `include` *`[]string`* +* `exclude` *`[]string`*
@@ -64,20 +51,6 @@ The filename to store offsets of processed files. Offsets are loaded only on ini
-**`filename_pattern`** *`string`* *`default=*`* - -Files that don't meet this pattern will be ignored. -> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. - -
- -**`dir_pattern`** *`string`* *`default=*`* - -Dirs that don't meet this pattern will be ignored. -> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. - -
- **`persistence_mode`** *`string`* *`default=async`* *`options=async|sync`* It defines how to save the offsets file: @@ -163,6 +136,29 @@ Example: ```filename: '{{ .filename }}'```
+**`watching_dir`** **Deprecated format** + +The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have +`/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`. +Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more +different directories, it's recommended to setup separate pipelines for each. + +
+ +**`filename_pattern`** *`string`* *`default=*`* + +Files that don't meet this pattern will be ignored. +> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. + +
+ +**`dir_pattern`** *`string`* *`default=*`* + +Dirs that don't meet this pattern will be ignored. +> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. + +
+ ### Meta params **`filename`** diff --git a/plugin/input/file/file.go b/plugin/input/file/file.go index df31e1a4c..b865a5591 100644 --- a/plugin/input/file/file.go +++ b/plugin/input/file/file.go @@ -34,21 +34,6 @@ But update events don't work with symlinks, so watcher also periodically manuall > By default the plugin is notified only on file creations. Note that following for changes is more CPU intensive. > ⚠ Use add_file_name plugin if you want to add filename to events. - -**Reading docker container log files:** -```yaml -pipelines: - example_docker_pipeline: - input: - type: file - paths: - include: - - '/var/lib/docker/containers/**\/*-json.log' # remove \ - exclude: - - '/var/lib/docker/containers/19aa5027343f4*\/*-json.log' # remove \ - offsets_file: /data/offsets.yaml - persistence_mode: async -``` }*/ type Plugin struct { @@ -85,14 +70,7 @@ const ( ) type Paths struct { - // > @3@4@5@6 - // > - // > List of included pathes Include []string `json:"include"` - - // > @3@4@5@6 - // > - // > List of excluded pathes Exclude []string `json:"exclude"` } @@ -102,16 +80,10 @@ type Config struct { // > @3@4@5@6 // > - // > The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have - // > `/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`. - // > Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more - // > different directories, it's recommended to setup separate pipelines for each. - WatchingDir string `json:"watching_dir"` // * - - // > @3@4@5@6 + // > Set paths in glob format // > - // > Paths. - // > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. + // > * `include` *`[]string`* + // > * `exclude` *`[]string`* Paths Paths `json:"paths"` // * // > @3@4@5@6 @@ -121,18 +93,6 @@ type Config struct { OffsetsFile string `json:"offsets_file" required:"true"` // * OffsetsFileTmp string - // > @3@4@5@6 - // > - // > Files that don't meet this pattern will be ignored. - // > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. - FilenamePattern string `json:"filename_pattern" default:"*"` // * - - // > @3@4@5@6 - // > - // > Dirs that don't meet this pattern will be ignored. - // > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. - DirPattern string `json:"dir_pattern" default:"*"` // * - // > @3@4@5@6 // > // > It defines how to save the offsets file: @@ -205,6 +165,26 @@ type Config struct { // > // > Example: ```filename: '{{ .filename }}'``` Meta cfg.MetaTemplates `json:"meta"` // * + + // > **Deprecated format** + // > + // > The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have + // > `/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`. + // > Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more + // > different directories, it's recommended to setup separate pipelines for each. + WatchingDir string `json:"watching_dir"` // * + + // > @3@4@5@6 + // > + // > Files that don't meet this pattern will be ignored. + // > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. + FilenamePattern string `json:"filename_pattern" default:"*"` // * + + // > @3@4@5@6 + // > + // > Dirs that don't meet this pattern will be ignored. + // > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. + DirPattern string `json:"dir_pattern" default:"*"` // * } var offsetFiles = make(map[string]string) From 1d28baf6d93b53b6edfe546d5e098056da723f41 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Tue, 11 Jun 2024 18:47:50 +0700 Subject: [PATCH 09/11] add debug messages for watcher --- plugin/input/file/watcher.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/plugin/input/file/watcher.go b/plugin/input/file/watcher.go index 0d2ac3cd9..bcdf07754 100644 --- a/plugin/input/file/watcher.go +++ b/plugin/input/file/watcher.go @@ -135,6 +135,8 @@ func (w *watcher) notify(e notify.Event, path string) { return } + w.logger.Infof("notify %s %s", e, path) + for _, pattern := range w.paths.Exclude { match, err := doublestar.PathMatch(pattern, path) if err != nil { @@ -142,6 +144,7 @@ func (w *watcher) notify(e notify.Event, path string) { return } if match { + w.logger.Infof("excluded %s by pattern %s", path, pattern) return } } @@ -183,8 +186,6 @@ check_file: dirFilename = filepath.Dir(dirFilename) } - w.logger.Infof("%s %s", e, path) - for _, pattern := range w.paths.Include { match, err := doublestar.PathMatch(pattern, path) if err != nil { @@ -193,6 +194,7 @@ check_file: } if match { + w.logger.Infof("path %s matched by pattern %s", filename, pattern) w.notifyFn(e, filename, stat) } } From 9bea8d8c16ff031c19c47cb0c21758867208546b Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Mon, 7 Oct 2024 16:08:16 +0300 Subject: [PATCH 10/11] fix after rebase --- go.mod | 2 +- plugin/input/file/provider_test.go | 16 +++++++--------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/go.mod b/go.mod index 0819bcdec..db3e904ba 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,8 @@ require ( github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 github.com/alicebob/miniredis/v2 v2.30.5 github.com/bitly/go-simplejson v0.5.1 - github.com/bufbuild/protocompile v0.13.0 github.com/bmatcuk/doublestar/v4 v4.0.2 + github.com/bufbuild/protocompile v0.13.0 github.com/cenkalti/backoff/v4 v4.2.1 github.com/cespare/xxhash/v2 v2.2.0 github.com/euank/go-kmsg-parser v2.0.0+incompatible diff --git a/plugin/input/file/provider_test.go b/plugin/input/file/provider_test.go index f4d0934f5..adc4545b2 100644 --- a/plugin/input/file/provider_test.go +++ b/plugin/input/file/provider_test.go @@ -5,11 +5,17 @@ import ( "path/filepath" "testing" + "runtime" + "github.com/ozontech/file.d/logger" "github.com/ozontech/file.d/metric" "github.com/prometheus/client_golang/prometheus" uuid "github.com/satori/go.uuid" "github.com/stretchr/testify/require" + + "github.com/ozontech/file.d/test" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" ) func TestRefreshSymlinkOnBrokenLink(t *testing.T) { @@ -47,15 +53,7 @@ func TestRefreshSymlinkOnBrokenLink(t *testing.T) { os.Remove(linkName) jp.maintenanceSymlinks() require.Equal(t, 0, len(jp.symlinks)) - "runtime" - "testing" - - "github.com/ozontech/file.d/metric" - "github.com/ozontech/file.d/test" - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" -) +} func TestProvierWatcherPaths(t *testing.T) { tests := []struct { From bd686925041d1c84abd936bffb0c445b38ada01d Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Tue, 8 Oct 2024 11:37:53 +0300 Subject: [PATCH 11/11] check file path patterns on start --- plugin/input/file/file.go | 15 +++++++++++++++ plugin/input/file/watcher.go | 4 ++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/plugin/input/file/file.go b/plugin/input/file/file.go index b865a5591..7eea49e47 100644 --- a/plugin/input/file/file.go +++ b/plugin/input/file/file.go @@ -5,6 +5,7 @@ import ( "path/filepath" "time" + "github.com/bmatcuk/doublestar/v4" "github.com/ozontech/file.d/cfg" "github.com/ozontech/file.d/fd" "github.com/ozontech/file.d/metric" @@ -223,6 +224,20 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa offsetFiles[offsetFilePath] = params.PipelineName } + for _, pattern := range p.config.Paths.Include { + _, err := doublestar.PathMatch(pattern, ".") + if err != nil { + p.logger.Fatalf("wrong paths include pattern %q: %s", pattern, err.Error()) + } + } + + for _, pattern := range p.config.Paths.Exclude { + _, err := doublestar.PathMatch(pattern, ".") + if err != nil { + p.logger.Fatalf("wrong paths exclude pattern %q: %s", pattern, err.Error()) + } + } + p.jobProvider = NewJobProvider( p.config, newMetricCollection( diff --git a/plugin/input/file/watcher.go b/plugin/input/file/watcher.go index bcdf07754..d88ae9066 100644 --- a/plugin/input/file/watcher.go +++ b/plugin/input/file/watcher.go @@ -140,7 +140,7 @@ func (w *watcher) notify(e notify.Event, path string) { for _, pattern := range w.paths.Exclude { match, err := doublestar.PathMatch(pattern, path) if err != nil { - w.logger.Fatalf("wrong paths exclude pattern %q: %s", pattern, err.Error()) + w.logger.Errorf("wrong paths exclude pattern %q: %s", pattern, err.Error()) return } if match { @@ -189,7 +189,7 @@ check_file: for _, pattern := range w.paths.Include { match, err := doublestar.PathMatch(pattern, path) if err != nil { - w.logger.Fatalf("wrong paths include pattern %q: %s", pattern, err.Error()) + w.logger.Errorf("wrong paths include pattern %q: %s", pattern, err.Error()) return }