Skip to content

Commit

Permalink
refactor: Use named parameters for DB insertions
Browse files Browse the repository at this point in the history
* Add more methods to DB tables to get struct tags

* Move JSONFloat to types

* Update avg values in DB only when new value is non zero

* Add DB tests to ensure we update avg values correctly

* Check if job is in pending correctly in slurm manager

Signed-off-by: Mahendra Paipuri <[email protected]>
  • Loading branch information
mahendrapaipuri committed Apr 4, 2024
1 parent 7d6db6e commit eecfb1c
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 187 deletions.
22 changes: 11 additions & 11 deletions build/package/ceems_api_server/tsdb-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,30 @@ queries: {}
# avg_over_time(
# avg by (uuid) (
# (
# rate(ceems_slurm_job_cpu_user_seconds{uuid=~"{{.UUIDs}}"}[{{.RateInterval}}])
# rate(ceems_compute_unit_cpu_user_seconds{uuid=~"{{.UUIDs}}"}[{{.RateInterval}}])
# +
# rate(ceems_slurm_job_cpu_system_seconds{uuid=~"{{.UUIDs}}"}[{{.RateInterval}}])
# rate(ceems_compute_unit_cpu_system_seconds{uuid=~"{{.UUIDs}}"}[{{.RateInterval}}])
# )
# /
# ceems_slurm_job_cpus{uuid=~"{{.UUIDs}}"}
# ceems_compute_unit_cpus{uuid=~"{{.UUIDs}}"}
# )[{{.Range}}:]
# ) * 100

# # Avgerage CPU Memory utilisation
# avg_cpu_mem_usage: |
# avg_over_time(
# avg by (uuid) (
# ceems_slurm_job_memory_used_bytes{uuid=~"{{.UUIDs}}"}
# ceems_compute_unit_memory_used_bytes{uuid=~"{{.UUIDs}}"}
# /
# ceems_slurm_job_memory_total_bytes{uuid=~"{{.UUIDs}}"}
# ceems_compute_unit_memory_total_bytes{uuid=~"{{.UUIDs}}"}
# )[{{.Range}}:]
# ) * 100

# # Total CPU energy usage in kWh
# total_cpu_energy_usage_kwh: |
# sum_over_time(
# sum by (uuid) (
# unit:ceems_slurm_job_cpu_energy_usage:sum{uuid=~"{{.UUIDs}}"} * {{.ScrapeIntervalMilli}} / 3.6e9
# unit:ceems_compute_unit_cpu_energy_usage:sum{uuid=~"{{.UUIDs}}"} * {{.ScrapeIntervalMilli}} / 3.6e9
# )[{{.Range}}:{{.ScrapeInterval}}]
# )

Expand All @@ -60,7 +60,7 @@ queries: {}
# sum_over_time(
# sum by (uuid) (
# label_replace(
# unit:ceems_slurm_job_cpu_energy_usage:sum{uuid=~"{{.UUIDs}}"} * {{.ScrapeIntervalMilli}} / 3.6e9,
# unit:ceems_compute_unit_cpu_energy_usage:sum{uuid=~"{{.UUIDs}}"} * {{.ScrapeIntervalMilli}} / 3.6e9,
# "common_label",
# "mock",
# "hostname",
Expand All @@ -83,7 +83,7 @@ queries: {}
# avg by (uuid) (
# DCGM_FI_DEV_GPU_UTIL
# * on (gpuuuid) group_right ()
# ceems_slurm_job_gpu_index_flag{uuid=~"{{.UUIDs}}"}
# ceems_compute_unit_gpu_index_flag{uuid=~"{{.UUIDs}}"}
# )[{{.Range}}:{{.ScrapeInterval}}]
# )

Expand All @@ -93,7 +93,7 @@ queries: {}
# avg by (uuid) (
# DCGM_FI_DEV_MEM_COPY_UTIL
# * on (gpuuuid) group_right ()
# ceems_slurm_job_gpu_index_flag{uuid=~"{{.UUIDs}}"}
# ceems_compute_unit_gpu_index_flag{uuid=~"{{.UUIDs}}"}
# )[{{.Range}}:{{.ScrapeInterval}}]
# )

Expand All @@ -103,7 +103,7 @@ queries: {}
# sum by (uuid) (
# instance:DCGM_FI_DEV_POWER_USAGE:pue_avg * {{.ScrapeIntervalMilli}} / 3.6e9
# * on (gpuuuid) group_right()
# ceems_slurm_job_gpu_index_flag{uuid=~"{{.UUIDs}}"}
# ceems_compute_unit_gpu_index_flag{uuid=~"{{.UUIDs}}"}
# )[{{.Range}}:{{.ScrapeInterval}}]
# )

Expand All @@ -114,7 +114,7 @@ queries: {}
# label_replace(
# instance:DCGM_FI_DEV_POWER_USAGE:pue_avg * {{.ScrapeIntervalMilli}} / 3.6e+09
# * on (gpuuuid) group_right ()
# ceems_slurm_job_gpu_index_flag{uuid=~"{{.UUIDs}}"},
# ceems_compute_unit_gpu_index_flag{uuid=~"{{.UUIDs}}"},
# "common_label",
# "mock",
# "instance",
Expand Down
3 changes: 3 additions & 0 deletions internal/structset/structset.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ func GetStructFieldNames(Struct interface{}) []string {
// }

// Get tag value of field. If tag value is "-", return lower case value of field name
// If tag is empty, return name of field in map key
func getTagValue(field reflect.StructField, tag string) string {
if field.Tag.Get(tag) == "-" {
return strings.ToLower(field.Name)
} else if field.Tag.Get(tag) == "" {
return field.Name
} else {
return field.Tag.Get(tag)
}
Expand Down
13 changes: 9 additions & 4 deletions pkg/api/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/alecthomas/kingpin/v2"
"github.com/mahendrapaipuri/ceems/internal/structset"
"github.com/mahendrapaipuri/ceems/pkg/api/models"
)

Expand All @@ -32,10 +31,16 @@ var (
UsageDBTableColNames = models.Usage{}.TagNames("sql")
)

// Map of field names to DB column type
// Map of struct field name to DB column name
var (
UnitsDBTableColTypeMap = structset.GetStructFieldTagMap(models.Unit{}, "sql", "sqlitetype")
UsageDBTableColTypeMap = structset.GetStructFieldTagMap(models.Usage{}, "sql", "sqlitetype")
UnitsDBTableStructFieldColNameMap = models.Unit{}.TagMap("", "sql") // structset.GetStructFieldTagMap(models.Unit{}, "", "sql")
UsageDBTableStructFieldColNameMap = models.Usage{}.TagMap("", "sql") // structset.GetStructFieldTagMap(models.Usage{}, "", "sql")
)

// Map of DB column names to DB column type
var (
UnitsDBTableColTypeMap = models.Unit{}.TagMap("sql", "sqlitetype") // structset.GetStructFieldTagMap(models.Unit{}, "sql", "sqlitetype")
UsageDBTableColTypeMap = models.Usage{}.TagMap("sql", "sqlitetype") // structset.GetStructFieldTagMap(models.Usage{}, "sql", "sqlitetype")
)

// DatetimeLayout to be used in the package
Expand Down
197 changes: 76 additions & 121 deletions pkg/api/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ type storageConfig struct {
dbPath string
dbBackupPath string
retentionPeriod time.Duration
cutoffPeriod time.Duration
lastUpdateTime time.Time
lastUpdateTimeFile string
skipDeleteOldUnits bool
Expand All @@ -56,10 +55,9 @@ type storageConfig struct {
// Stringer receiver for storageConfig
func (s *storageConfig) String() string {
return fmt.Sprintf(
"storageConfig{dbPath: %s, retentionPeriod: %s, cutoffPeriod: %s, "+
"storageConfig{dbPath: %s, retentionPeriod: %s, "+
"lastUpdateTime: %s, lastUpdateTimeFile: %s}",
s.dbPath, s.retentionPeriod, s.cutoffPeriod, s.lastUpdateTime,
s.lastUpdateTimeFile,
s.dbPath, s.retentionPeriod, s.lastUpdateTime, s.lastUpdateTimeFile,
)
}

Expand Down Expand Up @@ -91,28 +89,26 @@ func init() {
var unitTablePlaceholders []string
for _, col := range base.UnitsDBTableColNames {
if strings.HasPrefix(col, "num") {
unitTablePlaceholders = append(unitTablePlaceholders, fmt.Sprintf(" %[1]s = %[1]s + ?", col))
unitTablePlaceholders = append(unitTablePlaceholders, fmt.Sprintf(" %[1]s = %[1]s + :%[1]s", col))
} else if strings.HasPrefix(col, "avg") {
unitTablePlaceholders = append(unitTablePlaceholders, fmt.Sprintf(" %[1]s = (%[1]s * num_intervals + ?) / (num_intervals + 1)", col))
// Update average values only when the new value is non zero
unitTablePlaceholders = append(unitTablePlaceholders, fmt.Sprintf(" %[1]s = CASE WHEN :%[1]s > 0 THEN (%[1]s * num_updates + :%[1]s) / (num_updates + 1) ELSE %[1]s END", col))
} else if strings.HasPrefix(col, "total") {
unitTablePlaceholders = append(unitTablePlaceholders, fmt.Sprintf(" %[1]s = (%[1]s + ?)", col))
unitTablePlaceholders = append(unitTablePlaceholders, fmt.Sprintf(" %[1]s = (%[1]s + :%[1]s)", col))
// We will need to update end time, elapsed time and state as they change with time
} else if slices.Contains([]string{"ended_at", "ended_at_ts", "elapsed", "elapsed_raw", "state", "tags"}, col) {
unitTablePlaceholders = append(unitTablePlaceholders, fmt.Sprintf(" %[1]s = ?", col))
unitTablePlaceholders = append(unitTablePlaceholders, fmt.Sprintf(" %[1]s = :%[1]s", col))
} else {
continue
}
}

// Unit update statement
unitStmt := fmt.Sprintf(
"INSERT INTO %s (%s) VALUES %s %s",
"INSERT INTO %s (%s) VALUES (:%s) %s",
base.UnitsDBTableName,
strings.Join(base.UnitsDBTableColNames, ","),
fmt.Sprintf(
"(%s)",
strings.Join(strings.Split(strings.Repeat("?", len(base.UnitsDBTableColNames)), ""), ","),
),
strings.Join(base.UnitsDBTableColNames, ",:"),
// Index is defined in 000001_create_unit_table.up.sql
// Update: 20240327: Index updated in 000004_alter_units_usage_tables.up.sql
"ON CONFLICT(resource_manager,uuid,started_at) DO UPDATE SET",
Expand All @@ -130,27 +126,25 @@ func init() {
var usageTablePlaceholders []string
for _, col := range base.UsageDBTableColNames {
if strings.HasPrefix(col, "num") {
usageTablePlaceholders = append(usageTablePlaceholders, fmt.Sprintf(" %[1]s = %[1]s + ?", col))
usageTablePlaceholders = append(usageTablePlaceholders, fmt.Sprintf(" %[1]s = %[1]s + :%[1]s", col))
} else if strings.HasPrefix(col, "avg") {
usageTablePlaceholders = append(usageTablePlaceholders, fmt.Sprintf(" %[1]s = (%[1]s * num_units + ?) / (num_units + 1)", col))
// Update average values only when the new value is non zero
usageTablePlaceholders = append(usageTablePlaceholders, fmt.Sprintf(" %[1]s = CASE WHEN :%[1]s > 0 THEN (%[1]s * num_updates + :%[1]s) / (num_updates + 1) ELSE %[1]s END", col))
} else if strings.HasPrefix(col, "total") {
usageTablePlaceholders = append(usageTablePlaceholders, fmt.Sprintf(" %[1]s = (%[1]s + ?)", col))
usageTablePlaceholders = append(usageTablePlaceholders, fmt.Sprintf(" %[1]s = (%[1]s + :%[1]s)", col))
} else if col == "tags" {
usageTablePlaceholders = append(usageTablePlaceholders, fmt.Sprintf(" %[1]s = ?", col))
usageTablePlaceholders = append(usageTablePlaceholders, fmt.Sprintf(" %[1]s = :%[1]s", col))
} else {
continue
}
}

// Usage update statement
usageStmt := fmt.Sprintf(
"INSERT INTO %s (%s) VALUES %s %s",
"INSERT INTO %s (%s) VALUES (:%s) %s",
base.UsageDBTableName,
strings.Join(base.UsageDBTableColNames, ","),
fmt.Sprintf(
"(%s)",
strings.Join(strings.Split(strings.Repeat("?", len(base.UsageDBTableColNames)), ""), ","),
),
strings.Join(base.UsageDBTableColNames, ",:"),
// Index is defined in 000002_create_usage_table.up.sql
// Update: 20240327: Index updated in 000004_alter_units_usage_tables.up.sql
"ON CONFLICT(resource_manager,usr,project) DO UPDATE SET",
Expand Down Expand Up @@ -416,67 +410,44 @@ func (s *statsDB) execStatements(statements map[string]*sql.Stmt, units []models
}

// level.Debug(s.logger).Log("msg", "Inserting unit", "id", unit.Jobid)
// Use named parameters to not to repeat the values
if _, err = statements[base.UnitsDBTableName].Exec(
unit.ResourceManager,
unit.UUID,
unit.Name,
unit.Project,
unit.Grp,
unit.Usr,
unit.CreatedAt,
unit.StartedAt,
unit.EndedAt,
unit.CreatedAtTS,
unit.StartedAtTS,
unit.EndedAtTS,
unit.Elapsed,
unit.ElapsedRaw,
unit.State,
unit.Allocation,
unit.TotalCPUBilling,
unit.TotalGPUBilling,
unit.TotalMiscBilling,
unit.AveCPUUsage,
unit.AveCPUMemUsage,
unit.TotalCPUEnergyUsage,
unit.TotalCPUEmissions,
unit.AveGPUUsage,
unit.AveGPUMemUsage,
unit.TotalGPUEnergyUsage,
unit.TotalGPUEmissions,
unit.TotalIOWriteHot,
unit.TotalIOReadHot,
unit.TotalIOWriteCold,
unit.TotalIOReadCold,
unit.TotalIngress,
unit.TotalOutgress,
unit.Tags,
ignore,
1, // NumIntervals
unit.EndedAt,
unit.EndedAtTS,
unit.Elapsed,
unit.ElapsedRaw,
unit.State,
unit.TotalCPUBilling,
unit.TotalGPUBilling,
unit.TotalMiscBilling,
unit.AveCPUUsage,
unit.AveCPUMemUsage,
unit.TotalCPUEnergyUsage,
unit.TotalCPUEmissions,
unit.AveGPUUsage,
unit.AveGPUMemUsage,
unit.TotalGPUEnergyUsage,
unit.TotalGPUEmissions,
unit.TotalIOWriteHot,
unit.TotalIOReadHot,
unit.TotalIOWriteCold,
unit.TotalIOReadCold,
unit.TotalIngress,
unit.TotalOutgress,
unit.Tags,
1, // NumIntervals
sql.Named(base.UnitsDBTableStructFieldColNameMap["ResourceManager"], unit.ResourceManager),
sql.Named(base.UnitsDBTableStructFieldColNameMap["UUID"], unit.UUID),
sql.Named(base.UnitsDBTableStructFieldColNameMap["Name"], unit.Name),
sql.Named(base.UnitsDBTableStructFieldColNameMap["Project"], unit.Project),
sql.Named(base.UnitsDBTableStructFieldColNameMap["Grp"], unit.Grp),
sql.Named(base.UnitsDBTableStructFieldColNameMap["Usr"], unit.Usr),
sql.Named(base.UnitsDBTableStructFieldColNameMap["CreatedAt"], unit.CreatedAt),
sql.Named(base.UnitsDBTableStructFieldColNameMap["StartedAt"], unit.StartedAt),
sql.Named(base.UnitsDBTableStructFieldColNameMap["EndedAt"], unit.EndedAt),
sql.Named(base.UnitsDBTableStructFieldColNameMap["CreatedAtTS"], unit.CreatedAtTS),
sql.Named(base.UnitsDBTableStructFieldColNameMap["StartedAtTS"], unit.StartedAtTS),
sql.Named(base.UnitsDBTableStructFieldColNameMap["EndedAtTS"], unit.EndedAtTS),
sql.Named(base.UnitsDBTableStructFieldColNameMap["Elapsed"], unit.Elapsed),
sql.Named(base.UnitsDBTableStructFieldColNameMap["ElapsedRaw"], unit.ElapsedRaw),
sql.Named(base.UnitsDBTableStructFieldColNameMap["State"], unit.State),
sql.Named(base.UnitsDBTableStructFieldColNameMap["Allocation"], unit.Allocation),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalCPUBilling"], unit.TotalCPUBilling),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalGPUBilling"], unit.TotalGPUBilling),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalMiscBilling"], unit.TotalMiscBilling),
sql.Named(base.UnitsDBTableStructFieldColNameMap["AveCPUUsage"], unit.AveCPUUsage),
sql.Named(base.UnitsDBTableStructFieldColNameMap["AveCPUMemUsage"], unit.AveCPUMemUsage),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalCPUEnergyUsage"], unit.TotalCPUEnergyUsage),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalCPUEmissions"], unit.TotalCPUEmissions),
sql.Named(base.UnitsDBTableStructFieldColNameMap["AveGPUUsage"], unit.AveGPUUsage),
sql.Named(base.UnitsDBTableStructFieldColNameMap["AveGPUMemUsage"], unit.AveGPUMemUsage),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalGPUEnergyUsage"], unit.TotalGPUEnergyUsage),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalGPUEmissions"], unit.TotalGPUEmissions),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalIOWriteHot"], unit.TotalIOWriteHot),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalIOReadHot"], unit.TotalIOReadHot),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalIOWriteCold"], unit.TotalIOWriteCold),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalIOReadCold"], unit.TotalIOReadCold),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalIngress"], unit.TotalIngress),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalOutgress"], unit.TotalOutgress),
sql.Named(base.UnitsDBTableStructFieldColNameMap["Tags"], unit.Tags),
sql.Named(base.UnitsDBTableStructFieldColNameMap["ignore"], ignore),
sql.Named(base.UnitsDBTableStructFieldColNameMap["numupdates"], 1),
); err != nil {
level.Error(s.logger).
Log("msg", "Failed to insert unit in DB", "id", unit.UUID, "err", err)
Expand All @@ -490,46 +461,30 @@ func (s *statsDB) execStatements(statements map[string]*sql.Stmt, units []models
}

// Update Usage table
// Use named parameters to not to repeat the values
if _, err = statements[base.UsageDBTableName].Exec(
unit.ResourceManager,
unitIncr,
unit.Project,
unit.Usr,
unit.TotalCPUBilling,
unit.TotalGPUBilling,
unit.TotalMiscBilling,
unit.AveCPUUsage,
unit.AveCPUMemUsage,
unit.TotalCPUEnergyUsage,
unit.TotalCPUEmissions,
unit.AveGPUUsage,
unit.AveGPUMemUsage,
unit.TotalGPUEnergyUsage,
unit.TotalGPUEmissions,
unit.TotalIOWriteHot,
unit.TotalIOReadHot,
unit.TotalIOWriteCold,
unit.TotalIOReadCold,
unit.TotalIngress,
unit.TotalOutgress,
unitIncr,
unit.TotalCPUBilling,
unit.TotalGPUBilling,
unit.TotalMiscBilling,
unit.AveCPUUsage,
unit.AveCPUMemUsage,
unit.TotalCPUEnergyUsage,
unit.TotalCPUEmissions,
unit.AveGPUUsage,
unit.AveGPUMemUsage,
unit.TotalGPUEnergyUsage,
unit.TotalGPUEmissions,
unit.TotalIOWriteHot,
unit.TotalIOReadHot,
unit.TotalIOWriteCold,
unit.TotalIOReadCold,
unit.TotalIngress,
unit.TotalOutgress,
sql.Named(base.UsageDBTableStructFieldColNameMap["ResourceManager"], unit.ResourceManager),
sql.Named(base.UsageDBTableStructFieldColNameMap["NumUnits"], unitIncr),
sql.Named(base.UsageDBTableStructFieldColNameMap["Project"], unit.Project),
sql.Named(base.UsageDBTableStructFieldColNameMap["Usr"], unit.Usr),
sql.Named(base.UsageDBTableStructFieldColNameMap["TotalCPUBilling"], unit.TotalCPUBilling),
sql.Named(base.UsageDBTableStructFieldColNameMap["TotalGPUBilling"], unit.TotalGPUBilling),
sql.Named(base.UsageDBTableStructFieldColNameMap["TotalMiscBilling"], unit.TotalMiscBilling),
sql.Named(base.UsageDBTableStructFieldColNameMap["AveCPUUsage"], unit.AveCPUUsage),
sql.Named(base.UsageDBTableStructFieldColNameMap["AveCPUMemUsage"], unit.AveCPUMemUsage),
sql.Named(base.UsageDBTableStructFieldColNameMap["TotalCPUEnergyUsage"], unit.TotalCPUEnergyUsage),
sql.Named(base.UsageDBTableStructFieldColNameMap["TotalCPUEmissions"], unit.TotalCPUEmissions),
sql.Named(base.UsageDBTableStructFieldColNameMap["AveGPUUsage"], unit.AveGPUUsage),
sql.Named(base.UsageDBTableStructFieldColNameMap["AveGPUMemUsage"], unit.AveGPUMemUsage),
sql.Named(base.UsageDBTableStructFieldColNameMap["TotalGPUEnergyUsage"], unit.TotalGPUEnergyUsage),
sql.Named(base.UsageDBTableStructFieldColNameMap["TotalGPUEmissions"], unit.TotalGPUEmissions),
sql.Named(base.UsageDBTableStructFieldColNameMap["TotalIOWriteHot"], unit.TotalIOWriteHot),
sql.Named(base.UsageDBTableStructFieldColNameMap["TotalIOReadHot"], unit.TotalIOReadHot),
sql.Named(base.UsageDBTableStructFieldColNameMap["TotalIOWriteCold"], unit.TotalIOWriteCold),
sql.Named(base.UsageDBTableStructFieldColNameMap["TotalIOReadCold"], unit.TotalIOReadCold),
sql.Named(base.UsageDBTableStructFieldColNameMap["TotalIngress"], unit.TotalIngress),
sql.Named(base.UsageDBTableStructFieldColNameMap["TotalOutgress"], unit.TotalOutgress),
sql.Named(base.UsageDBTableStructFieldColNameMap["numupdates"], 1),
); err != nil {
level.Error(s.logger).
Log("msg", "Failed to update usage table in DB", "id", unit.UUID, "err", err)
Expand Down
Loading

0 comments on commit eecfb1c

Please sign in to comment.