Skip to content

Commit

Permalink
Merge pull request #548 from ozontech/metric-error-open-file
Browse files Browse the repository at this point in the history
add metric file_open_error_total
  • Loading branch information
DmitryRomanov authored Dec 6, 2023
2 parents 4b85093 + 2d1b82c commit 044151e
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 5 deletions.
4 changes: 3 additions & 1 deletion plugin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Plugin struct {

possibleOffsetCorruptionMetric *prometheus.CounterVec
alreadyWrittenEventsSkippedMetric *prometheus.CounterVec
errorOpenFileMetric *prometheus.CounterVec
}

type persistenceMode int
Expand Down Expand Up @@ -194,7 +195,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa

p.config.OffsetsFileTmp = p.config.OffsetsFile + ".atomic"

p.jobProvider = NewJobProvider(p.config, p.possibleOffsetCorruptionMetric, p.logger)
p.jobProvider = NewJobProvider(p.config, p.possibleOffsetCorruptionMetric, p.errorOpenFileMetric, p.logger)

ResetterRegistryInstance.AddResetter(params.PipelineName, p)

Expand All @@ -205,6 +206,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa
func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
p.possibleOffsetCorruptionMetric = ctl.RegisterCounter("input_file_possible_offset_corruptions_total", "Total number of possible offset corruptions")
p.alreadyWrittenEventsSkippedMetric = ctl.RegisterCounter("input_file_already_written_event_skipped_total", "Total number of skipped events that was already written")
p.errorOpenFileMetric = ctl.RegisterCounter("input_file_open_error_total", "Total number of file opening errors")
}

func (p *Plugin) startWorkers() {
Expand Down
5 changes: 4 additions & 1 deletion plugin/input/file/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type jobProvider struct {
// provider metrics

possibleOffsetCorruptionMetric *prometheus.CounterVec
errorOpenFileMetric *prometheus.CounterVec
}

type Job struct {
Expand Down Expand Up @@ -99,7 +100,7 @@ type symlinkInfo struct {
inode inodeID
}

func NewJobProvider(config *Config, possibleOffsetCorruptionMetric *prometheus.CounterVec, sugLogger *zap.SugaredLogger) *jobProvider {
func NewJobProvider(config *Config, possibleOffsetCorruptionMetric, errorOpenFileMetric *prometheus.CounterVec, sugLogger *zap.SugaredLogger) *jobProvider {
jp := &jobProvider{
config: config,
offsetDB: newOffsetDB(config.OffsetsFile, config.OffsetsFileTmp),
Expand All @@ -121,6 +122,7 @@ func NewJobProvider(config *Config, possibleOffsetCorruptionMetric *prometheus.C

logger: sugLogger,
possibleOffsetCorruptionMetric: possibleOffsetCorruptionMetric,
errorOpenFileMetric: errorOpenFileMetric,
}

jp.watcher = NewWatcher(
Expand Down Expand Up @@ -282,6 +284,7 @@ func (jp *jobProvider) refreshFile(stat os.FileInfo, filename string, symlink st
file, err := os.Open(filename)
if err != nil {
jp.logger.Warnf("file was already moved from creation place %s: %s", filename, err.Error())
jp.errorOpenFileMetric.WithLabelValues().Inc()
return
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/input/file/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (w *watcher) start() {
w.logger.Fatalf("wrong dir name pattern %q: %s", w.dirPattern, err.Error())
}

eventsCh := make(chan notify.EventInfo, 128)
eventsCh := make(chan notify.EventInfo, 256)
w.watcherCh = eventsCh

events := []notify.Event{notify.Create, notify.Rename, notify.Remove}
Expand Down
6 changes: 4 additions & 2 deletions plugin/input/file/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ func TestWorkerWork(t *testing.T) {
}
ctl := metric.New("test", prometheus.NewRegistry())
possibleOffsetCorruptionMetric := ctl.RegisterCounter("worker", "help_test")
jp := NewJobProvider(&Config{}, possibleOffsetCorruptionMetric, &zap.SugaredLogger{})
errorOpenFileMetric := ctl.RegisterCounter("worker", "help_test")
jp := NewJobProvider(&Config{}, possibleOffsetCorruptionMetric, errorOpenFileMetric, &zap.SugaredLogger{})
jp.jobsChan = make(chan *Job, 2)
jp.jobs = map[pipeline.SourceID]*Job{
1: job,
Expand Down Expand Up @@ -223,7 +224,8 @@ func TestWorkerWorkMultiData(t *testing.T) {

ctl := metric.New("test", prometheus.NewRegistry())
possibleOffsetCorruptionMetric := ctl.RegisterCounter("worker", "help_test")
jp := NewJobProvider(&Config{}, possibleOffsetCorruptionMetric, &zap.SugaredLogger{})
errorOpenFileMetric := ctl.RegisterCounter("worker", "help_test")
jp := NewJobProvider(&Config{}, possibleOffsetCorruptionMetric, errorOpenFileMetric, &zap.SugaredLogger{})
jp.jobsChan = make(chan *Job, 2)
jp.jobs = map[pipeline.SourceID]*Job{
1: job,
Expand Down

0 comments on commit 044151e

Please sign in to comment.