From 2a1c65abda5c7faaf2a8e06aa7fa704bbe833520 Mon Sep 17 00:00:00 2001 From: Tomas Ebenlendr Date: Tue, 21 Feb 2023 10:16:38 +0100 Subject: [PATCH] exporter-exporter - add support for serving files directly --- README.md | 6 ++ config.go | 12 ++++ file.go | 190 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ main.go | 3 + 4 files changed, 211 insertions(+) create mode 100644 file.go diff --git a/README.md b/README.md index 3efe362..0390692 100644 --- a/README.md +++ b/README.md @@ -99,6 +99,12 @@ modules: env: THING: "1" THING2: "2" + + somefile: + method: file + file: + path: /tmp/myfile.prometheus.txt + use_mtime: true ``` In your prometheus configuration diff --git a/config.go b/config.go index f531764..7945704 100644 --- a/config.go +++ b/config.go @@ -45,6 +45,7 @@ type moduleConfig struct { Exec execConfig `yaml:"exec"` HTTP httpConfig `yaml:"http"` + File fileConfig `yaml:"file"` name string } @@ -78,6 +79,13 @@ type execConfig struct { mcfg *moduleConfig } +type fileConfig struct { + Path string `yaml:"path"` + UseMtime bool `yaml:"use_mtime"` + IsGlob bool `yaml:"glob"` + mcfg *moduleConfig +} + func readConfig(r io.Reader) (*config, error) { buf := bytes.Buffer{} io.Copy(&buf, r) @@ -168,6 +176,10 @@ func checkModuleConfig(name string, cfg *moduleConfig) error { if len(cfg.Exec.XXX) != 0 { return fmt.Errorf("unknown exec module configuration fields: %v", cfg.Exec.XXX) } + case "file": + if cfg.File.Path == "" { + return fmt.Errorf("Path argument for file module is mandatory") + } default: return fmt.Errorf("unknown module method: %v", cfg.Method) } diff --git a/file.go b/file.go new file mode 100644 index 0000000..595376e --- /dev/null +++ b/file.go @@ -0,0 +1,190 @@ +// Copyright 2016 Qubit Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "bytes" + "context" + "io" + "net/http" + "os" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + log "github.com/sirupsen/logrus" +) + +var ( + fileStartsCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "expexp_command_starts_total", + Help: "Counts of command starts", + }, + []string{"module"}, + ) + fileFailsCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "expexp_command_fails_total", + Help: "Count of commands with non-zero exits", + }, + []string{"module"}, + ) +) + +func readFileWithDeadline(path string, t time.Time) ([]byte, time.Time, error) { + f, err := os.Open(path) + mtime := time.Time{} + if err != nil { + return nil, mtime, err + } + defer f.Close() + f.SetDeadline(t) + + var size int + if info, err := f.Stat(); err == nil { + size64 := info.Size() + if int64(int(size64)) == size64 { + size = int(size64) + } + if info.Mode().IsRegular() { + mtime = info.ModTime() + } + } + size++ // one byte for final read at EOF + + // If a file claims a small size, read at least 512 bytes. + // In particular, files in Linux's /proc claim size 0 but + // then do not work right if read in small pieces, + // so an initial read of 1 byte would not work correctly. + if size < 512 { + size = 512 + } + + data := make([]byte, 0, size) + for { + if time.Now().After(t) { + return data, mtime, os.ErrDeadlineExceeded + } + if len(data) >= cap(data) { + d := append(data[:cap(data)], 0) + data = d[:len(data)] + } + n, err := f.Read(data[len(data):cap(data)]) + data = data[:len(data)+n] + if err != nil { + if err == io.EOF { + err = nil + } + return data, mtime, err + } + } +} + +var ( + mtimeName = "expexp_file_mtime_timestamp" + mtimeHelp = "Time of modification of parsed file" + mtimeType = dto.MetricType_GAUGE + mtimeLabelModule = "module" + mtimeLabelPath = "path" +) + +func (c fileConfig) GatherWithContext(ctx context.Context, r *http.Request) prometheus.GathererFunc { + return func() ([]*dto.MetricFamily, error) { + + errc := make(chan error, 1) + datc := make(chan []byte, 1) + timec := make(chan time.Time, 1) + go func() { + deadline, ok := ctx.Deadline() + if ! ok { deadline = time.Now().Add(time.Minute * 5) } + dat, mtime, err := readFileWithDeadline(c.Path, deadline) + errc <- err + if err == nil { + datc <- dat + timec <- mtime + } + close(errc) + close(datc) + close(timec) + }() + + err := <- errc + if err != nil { + log.Warnf("File module %v failed to read file %v, %+v", c.mcfg.name, c.Path, err) + fileFailsCount.WithLabelValues(c.mcfg.name).Inc() + if err == context.DeadlineExceeded || err == os.ErrDeadlineExceeded { + proxyTimeoutCount.WithLabelValues(c.mcfg.name).Inc() + } + return nil, err + } + dat := <- datc + mtime := <- timec + var prsr expfmt.TextParser + + var mtimeBuf *int64 = nil + if ! mtime.IsZero() { + mtimeBuf = new(int64) + *mtimeBuf = mtime.UnixMilli() + } + + var result []*dto.MetricFamily + mfs, err := prsr.TextToMetricFamilies(bytes.NewReader(dat)) + if err != nil { + proxyMalformedCount.WithLabelValues(c.mcfg.name).Inc() + return nil, err + } + for _, mf := range mfs { + if c.UseMtime && mtimeBuf != nil { + for _, m := range mf.GetMetric() { + m.TimestampMs = mtimeBuf + } + } + result = append(result, mf) + } + if !mtime.IsZero() { + v := float64(mtime.Unix()) + g := dto.Gauge { Value: &v, } + l := make([]*dto.LabelPair, 2) + l[0] = &dto.LabelPair{ + Name:&mtimeLabelModule, + Value:&c.mcfg.name, + } + l[1] = &dto.LabelPair{ + Name:&mtimeLabelPath, + Value:&c.Path, + } + m := dto.Metric { + Label: l, + Gauge: &g, + } + mf := dto.MetricFamily{ + Name: &mtimeName, + Help: &mtimeHelp, + Type: &mtimeType, + } + mf.Metric = append(mf.Metric, &m) + result = append(result, &mf) + } + return result, nil + } +} + +func (c fileConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + g := c.GatherWithContext(ctx, r) + promhttp.HandlerFor(g, promhttp.HandlerOpts{}).ServeHTTP(w, r) +} diff --git a/main.go b/main.go index 4aaf061..3c95733 100644 --- a/main.go +++ b/main.go @@ -502,6 +502,9 @@ func (m moduleConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) { case "http": m.HTTP.mcfg = &m m.HTTP.ServeHTTP(w, nr) + case "file": + m.File.mcfg = &m + m.File.ServeHTTP(w, nr) default: log.Errorf("unknown module method %v\n", m.Method) proxyErrorCount.WithLabelValues(m.name).Inc()