Skip to content

Commit

Permalink
feat: Feat to vacuum prometheus tsdb
Browse files Browse the repository at this point in the history
* Ability to vacuum ts of ignored jobs

* Configure cutoff time of batch jobs via CLI

* Modify CLI flags to be more intruitive

* Split jobstats struct into separate logical structs

* Update tests and fixtures

Signed-off-by: Mahendra Paipuri <[email protected]>
  • Loading branch information
mahendrapaipuri committed Jan 7, 2024
1 parent fd15129 commit d0e85e6
Show file tree
Hide file tree
Showing 15 changed files with 362 additions and 118 deletions.
14 changes: 13 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,13 @@ ifeq ($(CGO_BUILD), 1)
./pkg/jobstats/db ./pkg/jobstats/helper \
./pkg/jobstats/schedulers ./pkg/jobstats/server \
./cmd/batchjob_stats_server
checkmetrics := skip-checkmetrics
checkrules := skip-checkrules
else
PROMU_CONF ?= .promu-go.yml
pkgs := ./pkg/collector ./pkg/emissions ./cmd/batchjob_exporter
checkmetrics := checkmetrics
checkrules := checkrules
endif

ifeq ($(GOHOSTOS), linux)
Expand Down Expand Up @@ -75,7 +79,7 @@ $(eval $(call goarch_pair,amd64,386))
$(eval $(call goarch_pair,mips64,mips))
$(eval $(call goarch_pair,mips64el,mipsel))

all:: vet checkmetrics checkrules common-all $(cross-test) $(test-docker) $(test-e2e)
all:: vet common-all $(cross-test) $(test-docker) $(checkmetrics) $(checkrules) $(test-e2e)

.PHONY: test
test: pkg/collector/fixtures/sys/.unpacked pkg/collector/fixtures/proc/.unpacked
Expand Down Expand Up @@ -133,11 +137,19 @@ checkmetrics: $(PROMTOOL)
@echo ">> checking metrics for correctness"
./scripts/checkmetrics.sh $(PROMTOOL) $(e2e-out)

.PHONY: skip-checkmetrics
skip-checkmetrics: $(PROMTOOL)
@echo ">> SKIP checking metrics for correctness"

.PHONY: checkrules
checkrules: $(PROMTOOL)
@echo ">> checking rules for correctness"
find . -name "*rules*.yml" | xargs -I {} $(PROMTOOL) check rules {}

.PHONY: skip-checkrules
skip-checkrules: $(PROMTOOL)
@echo ">> SKIP checking rules for correctness"

.PHONY: test-docker
test-docker:
@echo ">> testing docker image"
Expand Down
1 change: 1 addition & 0 deletions pkg/jobstats/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type BatchJob struct {
StartTS string `json:"startts"`
EndTS string `json:"endts"`
Elapsed string `json:"elapsed"`
ElapsedRaw string `json:"elapsedraw"`
Exitcode string `json:"exitcode"`
State string `json:"state"`
Nnodes string `json:"nnodes"`
Expand Down
105 changes: 83 additions & 22 deletions pkg/jobstats/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package cli

import (
"context"
"crypto/tls"
"fmt"
"net/http"
"net/url"
"os"
"os/signal"
"path/filepath"
Expand Down Expand Up @@ -55,29 +58,42 @@ func (b *BatchJobStatsServer) Main() {
"Comma separated list of admin users (example: \"admin1,admin2\").",
).Default("").String()
dataPath = b.App.Flag(
"data.path",
"Absolute path to a directory where job data is stored. SQLite DB that contains jobs stats will be saved to this directory.",
"storage.data.path",
"Base path for data storage.",
).Default("/var/lib/jobstats").String()
retentionPeriodString = b.App.Flag(
"data.retention.period",
"Period in days for which job stats data will be retained. Units Supported: y, w, d, h, m, s, ms.",
"storage.data.retention.period",
"How long to retain job data. Units Supported: y, w, d, h, m, s, ms.",
).Default("1y").String()
jobstatDBFile = b.App.Flag(
"db.name",
"Name of the SQLite DB file that contains job stats.",
).Default("jobstats.db").String()
jobstatDBTable = b.App.Flag(
"db.table.name",
"Name of the table in SQLite DB file that contains job stats.",
).Default("jobs").String()
lastUpdateTime = b.App.Flag(
"db.last.update.time",
"Last time the DB was updated. Job stats from this time will be added for new DB. Supported formate: YYYY-MM-DD.",
"storage.data.update.from",
"Job data from this day will be gathered. Supported format: YYYY-MM-DD.",
).Default(time.Now().Format("2006-01-02")).String()
updateIntervalString = b.App.Flag(
"db.update.interval",
"Time period at which DB will be updated with job stats. Units Supported: y, w, d, h, m, s, ms.",
"storage.data.update.interval",
"Job data will be updated at this interval. Units Supported: y, w, d, h, m, s, ms.",
).Default("15m").String()
jobDurationCutoffString = b.App.Flag(
"storage.data.job.duration.cutoff",
"Jobs with wall time less than this period will be ignored. Units Supported: y, w, d, h, m, s, ms.",
).Default("5m").String()
prometheusURL = b.App.Flag(
"prometheus.web.url",
"Prometheus URL. If basic auth is enabled for Prometheus consider providing this URL using environment variable PROMETHEUS_URL.",
).Default(os.Getenv("PROMETHEUS_URL")).String()
prometheusSkipTLSVerify = b.App.Flag(
"prometheus.web.skip-tls-verify",
"Whether to skip TLS verification when using self signed certificates (default is false).",
).Default("false").Bool()
vacuumTSDB = b.App.Flag(
"prometheus.data.vacuum.tsdb",
"Prometheus TSDB will be vacuumed to remove time series of ignored jobs based on value set for --storage.data.job.duration.cutoff."+
" --prometheus.web.url should be provided if this flag is set to true. (default is false)",
).Default("false").Bool()
skipDeleteOldJobs = b.App.Flag(
"storage.data.skip.delete.old.jobs",
"Skip deleting old jobs. Used only in testing. (default is false)",
).Hidden().Default("false").Bool()
maxProcs = b.App.Flag(
"runtime.gomaxprocs", "The target number of CPUs Go will run on (GOMAXPROCS)",
).Envar("GOMAXPROCS").Default("1").Int()
Expand Down Expand Up @@ -106,22 +122,62 @@ func (b *BatchJobStatsServer) Main() {
// Parse retentionPeriod and updateInterval
retentionPeriod, err := model.ParseDuration(*retentionPeriodString)
if err != nil {
fmt.Printf("Failed to parse --data.retention.period flag. Error: %s", err)
fmt.Printf("Failed to parse --storage.data.retention.period flag. Error: %s", err)
os.Exit(1)
}
updateInterval, err := model.ParseDuration(*updateIntervalString)
if err != nil {
fmt.Printf("Failed to parse --db.update.interval flag. Error: %s", err)
fmt.Printf("Failed to parse --storage.data.update.interval flag. Error: %s", err)
os.Exit(1)
}
jobDurationCutoff, err := model.ParseDuration(*jobDurationCutoffString)
if err != nil {
fmt.Printf("Failed to parse --storage.data.job.duration.cutoff flag. Error: %s", err)
os.Exit(1)
}

// Parse lastUpdateTime to check if it is in correct format
_, err = time.Parse("2006-01-02", *lastUpdateTime)
if err != nil {
fmt.Printf("Failed to parse --db.last.update.time flag. Error: %s", err)
fmt.Printf("Failed to parse --storage.data.update.from flag. Error: %s", err)
os.Exit(1)
}

// Check if TSDB vacuum flag is turned on, a valid prometheus URL is provided
var client *http.Client
var promURL *url.URL
if *vacuumTSDB {
promURL, err = url.Parse(*prometheusURL)
if err != nil {
fmt.Printf("Failed to parse --prometheus.web.url %s", err)
os.Exit(1)
}

// If skip verify is set to true for Prometheus add it to client
if *prometheusSkipTLSVerify {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
client = &http.Client{Transport: tr, Timeout: time.Duration(1) * time.Second}
} else {
client = &http.Client{Timeout: time.Duration(1) * time.Second}
}

// Create a new GET request to reach out to Prometheus
req, err := http.NewRequest(http.MethodGet, promURL.String(), nil)
if err != nil {
fmt.Printf("Failed to make a new HTTP request for Prometheus %s", err)
os.Exit(1)
}

// Check if Prometheus is reachable
_, err = client.Do(req)
if err != nil {
fmt.Printf("--prometheus.data.vacuum.tsdb is set to true but Prometheus at %s is unreachable %s", promURL.Redacted(), err)
os.Exit(1)
}
}

// Set logger here after properly configuring promlog
logger := promlog.New(promlogConfig)

Expand All @@ -135,9 +191,9 @@ func (b *BatchJobStatsServer) Main() {

absDataPath, err := filepath.Abs(*dataPath)
if err != nil {
panic(fmt.Sprintf("Failed to get absolute path for --data.path=%s. Error: %s", *dataPath, err))
panic(fmt.Sprintf("Failed to get absolute path for --storage.data.path=%s. Error: %s", *dataPath, err))
}
jobstatDBPath := filepath.Join(absDataPath, *jobstatDBFile)
jobstatDBPath := filepath.Join(absDataPath, "jobstats.db")
jobsLastTimeStampFile := filepath.Join(absDataPath, "lastjobsupdatetime")

// Get slice of admin users
Expand All @@ -154,8 +210,13 @@ func (b *BatchJobStatsServer) Main() {
dbConfig := &db.Config{
Logger: logger,
JobstatsDBPath: jobstatDBPath,
JobstatsDBTable: *jobstatDBTable,
JobstatsDBTable: "jobs",
JobCutoffPeriod: time.Duration(jobDurationCutoff),
RetentionPeriod: time.Duration(retentionPeriod),
SkipDeleteOldJobs: *skipDeleteOldJobs,
VacuumTSDB: *vacuumTSDB,
PrometheusURL: promURL,
HTTPClient: client,
LastUpdateTimeString: *lastUpdateTime,
LastUpdateTimeStampFile: jobsLastTimeStampFile,
BatchScheduler: schedulers.NewBatchScheduler,
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobstats/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func queryServer(address string) error {
func TestBatchStatsServerMain(t *testing.T) {
tmpDir := t.TempDir()
// Remove test related args
os.Args = append([]string{os.Args[0]}, fmt.Sprintf("--data.path=%s", tmpDir))
os.Args = append([]string{os.Args[0]}, fmt.Sprintf("--storage.data.path=%s", tmpDir))
os.Args = append(os.Args, "--batch.scheduler.slurm")
os.Args = append(os.Args, "--slurm.sacct.path=../fixtures/sacct")
a, _ := NewBatchJobStatsServer()
Expand Down
Loading

0 comments on commit d0e85e6

Please sign in to comment.