Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

file input plugin: define watching_dir from include patterns #567

Merged
merged 11 commits into from
Oct 9, 2024
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137
github.com/alicebob/miniredis/v2 v2.30.5
github.com/bitly/go-simplejson v0.5.1
github.com/bmatcuk/doublestar/v4 v4.0.2
github.com/bufbuild/protocompile v0.13.0
github.com/cenkalti/backoff/v4 v4.2.1
github.com/cespare/xxhash/v2 v2.2.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ github.com/bitly/go-simplejson v0.5.1 h1:xgwPbetQScXt1gh9BmoJ6j9JMr3TElvuIyjR8pg
github.com/bitly/go-simplejson v0.5.1/go.mod h1:YOPVLzCfwK14b4Sff3oP1AmGhI9T9Vsg84etUnlyp+Q=
github.com/bufbuild/protocompile v0.13.0 h1:6cwUB0Y2tSvmNxsbunwzmIto3xOlJOV7ALALuVOs92M=
github.com/bufbuild/protocompile v0.13.0/go.mod h1:dr++fGGeMPWHv7jPeT06ZKukm45NJscd7rUxQVzEKRk=
github.com/bmatcuk/doublestar/v4 v4.0.2 h1:X0krlUVAVmtr2cRoTqR8aDMrDqnB36ht8wpWTiQ3jsA=
github.com/bmatcuk/doublestar/v4 v4.0.2/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc=
github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3eG1c=
github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
Expand Down
12 changes: 0 additions & 12 deletions plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,6 @@ But update events don't work with symlinks, so watcher also periodically manuall

> ⚠ Use add_file_name plugin if you want to add filename to events.

**Reading docker container log files:**
```yaml
pipelines:
example_docker_pipeline:
input:
type: file
watching_dir: /var/lib/docker/containers
offsets_file: /data/offsets.yaml
filename_pattern: "*-json.log"
persistence_mode: async
```

[More details...](plugin/input/file/README.md)
## http
Reads events from HTTP requests with the body delimited by a new line.
Expand Down
12 changes: 0 additions & 12 deletions plugin/input/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,6 @@ But update events don't work with symlinks, so watcher also periodically manuall

> ⚠ Use add_file_name plugin if you want to add filename to events.

**Reading docker container log files:**
```yaml
pipelines:
example_docker_pipeline:
input:
type: file
watching_dir: /var/lib/docker/containers
offsets_file: /data/offsets.yaml
filename_pattern: "*-json.log"
persistence_mode: async
```

[More details...](plugin/input/file/README.md)
## http
Reads events from HTTP requests with the body delimited by a new line.
Expand Down
15 changes: 15 additions & 0 deletions plugin/input/file/README.idoc.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
# File plugin
@introduction

**Reading docker container log files:**
```yaml
pipelines:
example_docker_pipeline:
input:
type: file
paths:
include:
- '/var/lib/docker/containers/**/*-json.log'
exclude:
- '/var/lib/docker/containers/19aa5027343f4*/*-json.log'
offsets_file: /data/offsets.yaml
persistence_mode: async
```

### Config params
@config-params|description

Expand Down
54 changes: 33 additions & 21 deletions plugin/input/file/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,22 @@ pipelines:
example_docker_pipeline:
input:
type: file
watching_dir: /var/lib/docker/containers
paths:
include:
- '/var/lib/docker/containers/**/*-json.log'
exclude:
- '/var/lib/docker/containers/19aa5027343f4*/*-json.log'
offsets_file: /data/offsets.yaml
filename_pattern: "*-json.log"
persistence_mode: async
```

### Config params
**`watching_dir`** *`string`* *`required`*
**`paths`** *`Paths`*

The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have
`/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`.
Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more
different directories, it's recommended to setup separate pipelines for each.
Set paths in glob format

* `include` *`[]string`*
* `exclude` *`[]string`*

<br>

Expand All @@ -48,20 +51,6 @@ The filename to store offsets of processed files. Offsets are loaded only on ini

<br>

**`filename_pattern`** *`string`* *`default=*`*

Files that don't meet this pattern will be ignored.
> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.

<br>

**`dir_pattern`** *`string`* *`default=*`*

Dirs that don't meet this pattern will be ignored.
> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.

<br>

**`persistence_mode`** *`string`* *`default=async`* *`options=async|sync`*

It defines how to save the offsets file:
Expand Down Expand Up @@ -147,6 +136,29 @@ Example: ```filename: '{{ .filename }}'```

<br>

**`watching_dir`** **Deprecated format**

The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have
`/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`.
Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more
different directories, it's recommended to setup separate pipelines for each.

<br>

**`filename_pattern`** *`string`* *`default=*`*

Files that don't meet this pattern will be ignored.
> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.

<br>

**`dir_pattern`** *`string`* *`default=*`*

Dirs that don't meet this pattern will be ignored.
> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.

<br>


### Meta params
**`filename`**
Expand Down
74 changes: 45 additions & 29 deletions plugin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"path/filepath"
"time"

"github.com/bmatcuk/doublestar/v4"
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/metric"
Expand Down Expand Up @@ -34,18 +35,6 @@ But update events don't work with symlinks, so watcher also periodically manuall
> By default the plugin is notified only on file creations. Note that following for changes is more CPU intensive.

> ⚠ Use add_file_name plugin if you want to add filename to events.

**Reading docker container log files:**
```yaml
pipelines:
example_docker_pipeline:
input:
type: file
watching_dir: /var/lib/docker/containers
offsets_file: /data/offsets.yaml
filename_pattern: "*-json.log"
persistence_mode: async
```
}*/

type Plugin struct {
Expand Down Expand Up @@ -81,17 +70,22 @@ const (
offsetsOpReset // * `reset` – resets an offset to the beginning of the file
)

type Paths struct {
Include []string `json:"include"`
Exclude []string `json:"exclude"`
}

type Config struct {
// ! config-params
// ^ config-params

// > @3@4@5@6
// >
// > The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have
// > `/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`.
// > Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more
// > different directories, it's recommended to setup separate pipelines for each.
WatchingDir string `json:"watching_dir" required:"true"` // *
// > Set paths in glob format
// >
// > * `include` *`[]string`*
// > * `exclude` *`[]string`*
Paths Paths `json:"paths"` // *

// > @3@4@5@6
// >
Expand All @@ -100,18 +94,6 @@ type Config struct {
OffsetsFile string `json:"offsets_file" required:"true"` // *
OffsetsFileTmp string

// > @3@4@5@6
// >
// > Files that don't meet this pattern will be ignored.
// > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
FilenamePattern string `json:"filename_pattern" default:"*"` // *

// > @3@4@5@6
// >
// > Dirs that don't meet this pattern will be ignored.
// > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
DirPattern string `json:"dir_pattern" default:"*"` // *

// > @3@4@5@6
// >
// > It defines how to save the offsets file:
Expand Down Expand Up @@ -184,6 +166,26 @@ type Config struct {
// >
// > Example: ```filename: '{{ .filename }}'```
Meta cfg.MetaTemplates `json:"meta"` // *

// > **Deprecated format**
// >
// > The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have
// > `/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`.
// > Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more
// > different directories, it's recommended to setup separate pipelines for each.
WatchingDir string `json:"watching_dir"` // *

// > @3@4@5@6
// >
// > Files that don't meet this pattern will be ignored.
// > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
FilenamePattern string `json:"filename_pattern" default:"*"` // *

// > @3@4@5@6
// >
// > Dirs that don't meet this pattern will be ignored.
// > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
DirPattern string `json:"dir_pattern" default:"*"` // *
}

var offsetFiles = make(map[string]string)
Expand Down Expand Up @@ -222,6 +224,20 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa
offsetFiles[offsetFilePath] = params.PipelineName
}

for _, pattern := range p.config.Paths.Include {
_, err := doublestar.PathMatch(pattern, ".")
if err != nil {
p.logger.Fatalf("wrong paths include pattern %q: %s", pattern, err.Error())
}
}

for _, pattern := range p.config.Paths.Exclude {
_, err := doublestar.PathMatch(pattern, ".")
if err != nil {
p.logger.Fatalf("wrong paths exclude pattern %q: %s", pattern, err.Error())
}
}

p.jobProvider = NewJobProvider(
p.config,
newMetricCollection(
Expand Down
12 changes: 9 additions & 3 deletions plugin/input/file/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,16 @@ func (ir *InfoRegistry) Info(w http.ResponseWriter, r *http.Request) {

_, _ = w.Write([]byte(jobsInfo))

watcherInfo := logger.Header("watch_dirs")
watcherInfo := logger.Header("watch_paths")
for _, source := range plugin.jobProvider.watcher.basePaths {
watcherInfo += fmt.Sprintf(
"%s\n",
source,
)
}
watcherInfo += fmt.Sprintf(
"%s\n",
plugin.jobProvider.watcher.path,
"commonPath: %s\n",
plugin.jobProvider.watcher.commonPath,
)
_, _ = w.Write([]byte(watcherInfo))

Expand Down
17 changes: 15 additions & 2 deletions plugin/input/file/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,23 @@ func NewJobProvider(config *Config, metrics *metricCollection, sugLogger *zap.Su
numberOfCurrentJobsMetric: metrics.numberOfCurrentJobsMetric,
}

if len(config.Paths.Include) == 0 {
if config.DirPattern == "*" {
config.Paths.Include = append(
config.Paths.Include,
filepath.Join(config.WatchingDir, filepath.Join("**", config.FilenamePattern)),
)
} else {
config.Paths.Include = append(
config.Paths.Include,
filepath.Join(config.WatchingDir, filepath.Join(config.DirPattern, config.FilenamePattern)),
)
}
}

jp.watcher = NewWatcher(
config.WatchingDir,
config.FilenamePattern,
config.DirPattern,
config.Paths,
jp.processNotification,
config.ShouldWatchChanges,
metrics.notifyChannelLengthMetric,
Expand Down
Loading
Loading