Skip to content

Commit

Permalink
refactor: Include time zone in slurm job times
Browse files Browse the repository at this point in the history
* We need to have time zone information to be able to convert times between unix and dateformats easily

* Set SLURM_TIME_FORMAT env var for sacct command to include timezone in output

* Include time stamps in BatchJob struct as we need them to generate Grafana URLs

* Add env arg to os execute helpers to be able to set env vars to subprocesses

* Update functions using Execute calls to include env vars arg

Signed-off-by: Mahendra Paipuri <[email protected]>
  • Loading branch information
mahendrapaipuri committed Jan 4, 2024
1 parent 575a007 commit b574eb9
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 39 deletions.
44 changes: 40 additions & 4 deletions internal/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package helpers

import (
"context"
"os"
"os/exec"
"strings"
"syscall"
Expand All @@ -22,13 +23,24 @@ func GetUuidFromString(stringSlice []string) (string, error) {
}

// Execute command and return stdout/stderr
func Execute(cmd string, args []string, logger log.Logger) ([]byte, error) {
func Execute(cmd string, args []string, env []string, logger log.Logger) ([]byte, error) {
level.Debug(logger).Log("msg", "Executing", "command", cmd, "args", strings.Join(args, " "))

execCmd := exec.Command(cmd, args...)

// If env is not nil pointer, add env vars into subprocess cmd
if env != nil {
execCmd.Env = append(os.Environ(), env...)
}

// Attach a separate terminal less session to the subprocess
// This is to avoid prompting for password when we run command with sudo
// Ref: https://stackoverflow.com/questions/13432947/exec-external-program-script-and-detect-if-it-requests-user-input
if cmd == "sudo" {
execCmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true}
}

// Execute command
out, err := execCmd.CombinedOutput()
if err != nil {
level.Error(logger).
Expand All @@ -38,12 +50,21 @@ func Execute(cmd string, args []string, logger log.Logger) ([]byte, error) {
}

// Execute command as a given UID and GID and return stdout/stderr
func ExecuteAs(cmd string, args []string, uid int, gid int, logger log.Logger) ([]byte, error) {
func ExecuteAs(cmd string, args []string, uid int, gid int, env []string, logger log.Logger) ([]byte, error) {
level.Debug(logger).
Log("msg", "Executing as user", "command", cmd, "args", strings.Join(args, " "), "uid", uid, "gid", gid)
execCmd := exec.Command(cmd, args...)

// Set uid and gid for process
execCmd.SysProcAttr = &syscall.SysProcAttr{}
execCmd.SysProcAttr.Credential = &syscall.Credential{Uid: uint32(uid), Gid: uint32(gid)}

// If env is not nil pointer, add env vars into subprocess cmd
if env != nil {
execCmd.Env = append(os.Environ(), env...)
}

// Execute command
out, err := execCmd.CombinedOutput()
if err != nil {
level.Error(logger).
Expand All @@ -53,7 +74,7 @@ func ExecuteAs(cmd string, args []string, uid int, gid int, logger log.Logger) (
}

// Execute command with timeout and return stdout/stderr
func ExecuteWithTimeout(cmd string, args []string, timeout int, logger log.Logger) ([]byte, error) {
func ExecuteWithTimeout(cmd string, args []string, timeout int, env []string, logger log.Logger) ([]byte, error) {
level.Debug(logger).
Log("msg", "Executing with timeout", "command", cmd, "args", strings.Join(args, " "), "timeout", timeout)

Expand All @@ -65,6 +86,12 @@ func ExecuteWithTimeout(cmd string, args []string, timeout int, logger log.Logge
}

execCmd := exec.CommandContext(ctx, cmd, args...)

// If env is not nil pointer, add env vars into subprocess cmd
if env != nil {
execCmd.Env = append(os.Environ(), env...)
}

// Attach a separate terminal less session to the subprocess
// This is to avoid prompting for password when we run command with sudo
// Ref: https://stackoverflow.com/questions/13432947/exec-external-program-script-and-detect-if-it-requests-user-input
Expand All @@ -75,6 +102,7 @@ func ExecuteWithTimeout(cmd string, args []string, timeout int, logger log.Logge
// The signal to send to the children when parent receives a kill signal
// execCmd.SysProcAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGTERM}

// Execute command
out, err := execCmd.CombinedOutput()
if err != nil {
level.Error(logger).
Expand All @@ -84,7 +112,7 @@ func ExecuteWithTimeout(cmd string, args []string, timeout int, logger log.Logge
}

// Execute command with timeout as a given UID and GID and return stdout/stderr
func ExecuteAsWithTimeout(cmd string, args []string, uid int, gid int, timeout int, logger log.Logger) ([]byte, error) {
func ExecuteAsWithTimeout(cmd string, args []string, uid int, gid int, timeout int, env []string, logger log.Logger) ([]byte, error) {
level.Debug(logger).
Log("msg", "Executing with timeout as user", "command", cmd, "args", strings.Join(args, " "), "uid", uid, "gid", gid, "timout")

Expand All @@ -96,9 +124,17 @@ func ExecuteAsWithTimeout(cmd string, args []string, uid int, gid int, timeout i
}

execCmd := exec.CommandContext(ctx, cmd, args...)

// If env is not nil pointer, add env vars into subprocess cmd
if env != nil {
execCmd.Env = append(os.Environ(), env...)
}

// Set uid and gid for the process
execCmd.SysProcAttr = &syscall.SysProcAttr{}
execCmd.SysProcAttr.Credential = &syscall.Credential{Uid: uint32(uid), Gid: uint32(gid)}

// Execute command
out, err := execCmd.CombinedOutput()
if err != nil {
level.Error(logger).
Expand Down
10 changes: 5 additions & 5 deletions internal/helpers/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,24 @@ func TestGetUuid(t *testing.T) {

func TestExecute(t *testing.T) {
// Test successful command execution
out, err := Execute("echo", []string{"test"}, log.NewNopLogger())
out, err := Execute("bash", []string{"-c", "echo ${VAR1} ${VAR2}"}, []string{"VAR1=1", "VAR2=2"}, log.NewNopLogger())
if err != nil {
t.Errorf("Failed to execute command %s", err)
}
if strings.TrimSpace(string(out)) != "test" {
t.Errorf("Expected output \"test\". Got \"%s\"", string(out))
if strings.TrimSpace(string(out)) != "1 2" {
t.Errorf("Expected output \"1 2\". Got \"%s\"", string(out))
}

// Test failed command execution
out, err = Execute("exit", []string{"1"}, log.NewNopLogger())
out, err = Execute("exit", []string{"1"}, nil, log.NewNopLogger())
if err == nil {
t.Errorf("Expected to fail command execution. Got output %s", out)
}
}

func TestExecuteWithTimeout(t *testing.T) {
// Test successful command execution
_, err := ExecuteWithTimeout("sleep", []string{"5"}, 2, log.NewNopLogger())
_, err := ExecuteWithTimeout("sleep", []string{"5"}, 2, nil, log.NewNopLogger())
if err == nil {
t.Errorf("Expected command timeout")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/collector/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func GetNvidiaGPUDevices(nvidiaSmiPath string, logger log.Logger) (map[int]Devic

// Execute nvidia-smi command to get available GPUs
args := []string{"--query-gpu=index,name,uuid", "--format=csv"}
nvidiaSmiOutput, err := helpers.Execute(nvidiaSmiPath, args, logger)
nvidiaSmiOutput, err := helpers.Execute(nvidiaSmiPath, args, nil, logger)
if err != nil {
level.Error(logger).
Log("msg", "nvidia-smi command to get list of devices failed", "err", err)
Expand Down
12 changes: 6 additions & 6 deletions pkg/collector/ipmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,23 +96,23 @@ func NewIPMICollector(logger log.Logger) (Collector, error) {
cmdSlice := strings.Split(*ipmiDcmiCmd, " ")

// Verify if running ipmiDcmiCmd works
if _, err := helpers.Execute(cmdSlice[0], cmdSlice[1:], logger); err == nil {
if _, err := helpers.Execute(cmdSlice[0], cmdSlice[1:], nil, logger); err == nil {
execMode = "native"
goto outside
}

// If ipmiDcmiCmd failed to run and if sudo is not already present in command,
// add sudo to command and execute. If current user has sudo rights it will be a success
if cmdSlice[0] != "sudo" {
if _, err := helpers.ExecuteWithTimeout("sudo", cmdSlice, 2, logger); err == nil {
if _, err := helpers.ExecuteWithTimeout("sudo", cmdSlice, 2, nil, logger); err == nil {
execMode = "sudo"
goto outside
}
}

// As last attempt, run the command as root user by forking subprocess
// as root. If there is setuid cap on the process, it will be a success
if _, err := helpers.ExecuteAs(cmdSlice[0], cmdSlice[1:], 0, 0, logger); err == nil {
if _, err := helpers.ExecuteAs(cmdSlice[0], cmdSlice[1:], 0, 0, nil, logger); err == nil {
execMode = "cap"
goto outside
}
Expand Down Expand Up @@ -152,11 +152,11 @@ func (c *impiCollector) Update(ch chan<- prometheus.Metric) error {
// Execute ipmi-dcmi command
cmdSlice := strings.Split(*ipmiDcmiCmd, " ")
if c.execMode == "cap" {
stdOut, err = helpers.ExecuteAs(cmdSlice[0], cmdSlice[1:], 0, 0, c.logger)
stdOut, err = helpers.ExecuteAs(cmdSlice[0], cmdSlice[1:], 0, 0, nil, c.logger)
} else if c.execMode == "sudo" {
stdOut, err = helpers.ExecuteWithTimeout("sudo", cmdSlice, 1, c.logger)
stdOut, err = helpers.ExecuteWithTimeout("sudo", cmdSlice, 1, nil, c.logger)
} else if c.execMode == "native" {
stdOut, err = helpers.Execute(cmdSlice[0], cmdSlice[1:], c.logger)
stdOut, err = helpers.Execute(cmdSlice[0], cmdSlice[1:], nil, c.logger)
} else {
err = fmt.Errorf("Current process do not have permissions to execute %s", *ipmiDcmiCmd)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/jobstats/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type BatchJob struct {
Submit string `json:"submit"`
Start string `json:"start"`
End string `json:"end"`
SubmitTS string `json:"submitts"`
StartTS string `json:"startts"`
EndTS string `json:"endts"`
Elapsed string `json:"elapsed"`
Exitcode string `json:"exitcode"`
State string `json:"state"`
Expand Down
9 changes: 9 additions & 0 deletions pkg/jobstats/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"regexp"
"strconv"
"strings"
"time"
)

var (
Expand Down Expand Up @@ -122,3 +123,11 @@ func expandNodelist(nodelistExp string) []string {
func NodelistParser(nodelistExp string) []string {
return expandNodelist(replaceNodelistDelimiter(nodelistExp))
}

// Converts a date in a given layout to unix timestamp of the date
func TimeToTimestamp(layout string, date string) int64 {
if t, err := time.Parse(layout, date); err == nil {
return int64(t.Local().Unix())
}
return 0
}
36 changes: 22 additions & 14 deletions pkg/jobstats/schedulers/slurm.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package schedulers

import (
"fmt"
"os"
"os/user"
"strconv"
Expand All @@ -12,24 +13,26 @@ import (
"github.com/go-kit/log/level"
"github.com/mahendrapaipuri/batchjob_monitoring/internal/helpers"
"github.com/mahendrapaipuri/batchjob_monitoring/pkg/jobstats/base"
"github.com/mahendrapaipuri/batchjob_monitoring/pkg/jobstats/helper"
jobstats_helper "github.com/mahendrapaipuri/batchjob_monitoring/pkg/jobstats/helper"
"github.com/prometheus/common/model"
)

type slurmScheduler struct {
logger log.Logger
execMode string
slurmDateFormat string
slurmWalltimeCutoff time.Duration
}

const slurmBatchScheduler = "slurm"

var (
slurmUserUid int
slurmUserGid int
jobLock = sync.RWMutex{}
sacctPath = base.BatchJobStatsServerApp.Flag(
slurmUserUid int
slurmUserGid int
defaultLayout = fmt.Sprintf("%sT%s", time.DateOnly, time.TimeOnly)
slurmTimeFormat = fmt.Sprintf("%s-0700", defaultLayout)
jobLock = sync.RWMutex{}
sacctPath = base.BatchJobStatsServerApp.Flag(
"slurm.sacct.path",
"Absolute path to sacct executable.",
).Default("/usr/local/bin/sacct").String()
Expand Down Expand Up @@ -88,15 +91,15 @@ func preflightChecks(logger log.Logger) (string, error) {
goto sudomode
}

if _, err := helpers.ExecuteAs(*sacctPath, []string{"--help"}, slurmUserUid, slurmUserGid, logger); err == nil {
if _, err := helpers.ExecuteAs(*sacctPath, []string{"--help"}, slurmUserUid, slurmUserGid, nil, logger); err == nil {
execMode = "cap"
level.Debug(logger).Log("msg", "Linux capabilities will be used to execute sacct as slurm user")
return execMode, nil
}

sudomode:
// Last attempt to run sacct with sudo
if _, err := helpers.ExecuteWithTimeout("sudo", []string{*sacctPath, "--help"}, 5, logger); err == nil {
if _, err := helpers.ExecuteWithTimeout("sudo", []string{*sacctPath, "--help"}, 5, nil, logger); err == nil {
execMode = "sudo"
level.Debug(logger).Log("msg", "sudo will be used to execute sacct command")
return execMode, nil
Expand All @@ -120,15 +123,14 @@ func NewSlurmScheduler(logger log.Logger) (BatchJobFetcher, error) {
return &slurmScheduler{
logger: logger,
execMode: execMode,
slurmDateFormat: "2006-01-02T15:04:05",
slurmWalltimeCutoff: time.Duration(slurmWalltimeCutoffString),
}, nil
}

// Get jobs from slurm
func (s *slurmScheduler) Fetch(start time.Time, end time.Time) ([]base.BatchJob, error) {
startTime := start.Format(s.slurmDateFormat)
endTime := end.Format(s.slurmDateFormat)
startTime := start.Format(defaultLayout)
endTime := end.Format(defaultLayout)

// Execute sacct command between start and end times
sacctOutput, err := runSacctCmd(s.execMode, startTime, endTime, s.logger)
Expand All @@ -147,21 +149,24 @@ func (s *slurmScheduler) Fetch(start time.Time, end time.Time) ([]base.BatchJob,
func runSacctCmd(execMode string, startTime string, endTime string, logger log.Logger) ([]byte, error) {
// Use jobIDRaw that outputs the array jobs as regular job IDs instead of id_array format
args := []string{
"-D", "--allusers", "--parsable2",
"-D", "-X", "--allusers", "--parsable2",
"--format", "jobidraw,partition,qos,account,group,gid,user,uid,submit,start,end,elapsed,elapsedraw,exitcode,state,allocnodes,alloccpus,nodelist,jobname,workdir",
"--state", "CANCELLED,COMPLETED,FAILED,NODE_FAIL,PREEMPTED,TIMEOUT",
"--starttime", startTime,
"--endtime", endTime,
}

// Use SLURM_TIME_FORMAT env var to get timezone offset
env := []string{"SLURM_TIME_FORMAT=\"%Y-%m-%dT%H:%M:%S%z\""}

// Run command as slurm user
if execMode == "cap" {
return helpers.ExecuteAs(*sacctPath, args, slurmUserUid, slurmUserGid, logger)
return helpers.ExecuteAs(*sacctPath, args, slurmUserUid, slurmUserGid, env, logger)
} else if execMode == "sudo" {
args = append([]string{*sacctPath}, args...)
return helpers.Execute("sudo", args, logger)
return helpers.Execute("sudo", args, env, logger)
}
return helpers.Execute(*sacctPath, args, logger)
return helpers.Execute(*sacctPath, args, env, logger)
}

// Parse sacct command output and return batchjob slice
Expand Down Expand Up @@ -239,6 +244,9 @@ func parseSacctCmdOutput(sacctOutput string, elapsedCutoff time.Duration, logger
Submit: components[8],
Start: components[9],
End: components[10],
SubmitTS: strconv.FormatInt(helper.TimeToTimestamp(slurmTimeFormat, components[8]), 10),
StartTS: strconv.FormatInt(helper.TimeToTimestamp(slurmTimeFormat, components[9]), 10),
EndTS: strconv.FormatInt(helper.TimeToTimestamp(slurmTimeFormat, components[10]), 10),
Elapsed: components[11],
Exitcode: components[13],
State: components[14],
Expand Down
24 changes: 15 additions & 9 deletions pkg/jobstats/schedulers/slurm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (

var (
sacctCmdOutput = `JobID|Partition|QoS|Account|Group|GID|User|UID|Submit|Start|End|Elapsed|ElapsedRaw|ExitCode|State|NNodes|Ncpus|NodeList|JobName|WorkDir
1479763|part1|qos1|acc1|grp|1000|usr|1000|2023-02-21T14:37:02|2023-02-21T14:37:07|2023-02-21T15:26:29|00:49:22|3000|0:0|CANCELLED by 302137|1|8|compute-0|test_script1|/home/usr
1481508|part1|qos1|acc1|grp|1000|usr|1000|2023-02-21T15:48:20|2023-02-21T15:49:06|2023-02-21T15:57:23|00:08:17|4920|0:0|CANCELLED by 302137|2|16|compute-[0-2]|test_script2|/home/usr
1481510|part1|qos1|acc1|grp|1000|usr|1000|2023-02-21T15:48:20|2023-02-21T15:49:06|2023-02-21T15:57:23|00:00:17|17|0:0|CANCELLED by 302137|2|16|compute-[0-2]|test_script2|/home/usr`
1479763|part1|qos1|acc1|grp|1000|usr|1000|2023-02-21T14:37:02+0100|2023-02-21T14:37:07+0100|2023-02-21T15:26:29+0100|00:49:22|3000|0:0|CANCELLED by 302137|1|8|compute-0|test_script1|/home/usr
1481508|part1|qos1|acc1|grp|1000|usr|1000|2023-02-21T15:48:20+0100|2023-02-21T15:49:06+0100|2023-02-21T15:57:23+0100|00:08:17|4920|0:0|CANCELLED by 302137|2|16|compute-[0-2]|test_script2|/home/usr
1481510|part1|qos1|acc1|grp|1000|usr|1000|2023-02-21T15:48:20+0100|2023-02-21T15:49:06+0100|2023-02-21T15:57:23+0100|00:00:17|17|0:0|CANCELLED by 302137|2|16|compute-[0-2]|test_script2|/home/usr`
logger = log.NewNopLogger()
expectedBatchJobs = []base.BatchJob{
{
Expand All @@ -26,9 +26,12 @@ var (
Gid: "1000",
Usr: "usr",
Uid: "1000",
Submit: "2023-02-21T14:37:02",
Start: "2023-02-21T14:37:07",
End: "2023-02-21T15:26:29",
Submit: "2023-02-21T14:37:02+0100",
Start: "2023-02-21T14:37:07+0100",
End: "2023-02-21T15:26:29+0100",
SubmitTS: "1676986622",
StartTS: "1676986627",
EndTS: "1676989589",
Elapsed: "00:49:22",
Exitcode: "0:0",
State: "CANCELLED by 302137",
Expand All @@ -49,9 +52,12 @@ var (
Gid: "1000",
Usr: "usr",
Uid: "1000",
Submit: "2023-02-21T15:48:20",
Start: "2023-02-21T15:49:06",
End: "2023-02-21T15:57:23",
Submit: "2023-02-21T15:48:20+0100",
Start: "2023-02-21T15:49:06+0100",
End: "2023-02-21T15:57:23+0100",
SubmitTS: "1676990900",
StartTS: "1676990946",
EndTS: "1676991443",
Elapsed: "00:08:17",
Exitcode: "0:0",
State: "CANCELLED by 302137",
Expand Down

0 comments on commit b574eb9

Please sign in to comment.