From 9bed51b5a30eebfc3a7f3caea80d2b917baaffc7 Mon Sep 17 00:00:00 2001 From: Mahendra Paipuri Date: Thu, 4 Apr 2024 11:58:24 +0200 Subject: [PATCH] refactor: Use named parameters for DB insertions * Add more methods to DB tables to get struct tags * Move JSONFloat to types * Update avg values in DB only when new value is non zero * Add DB tests to ensure we update avg values correctly * Check if job is in pending correctly in slurm manager Signed-off-by: Mahendra Paipuri --- .../package/ceems_api_server/tsdb-config.yml | 22 +- internal/structset/structset.go | 3 + pkg/api/base/base.go | 13 +- pkg/api/db/db.go | 197 +++++++----------- pkg/api/db/db_test.go | 59 +++++- pkg/api/models/models.go | 55 ++--- pkg/api/models/types.go | 38 ++++ pkg/api/resource/slurm.go | 2 +- 8 files changed, 202 insertions(+), 187 deletions(-) diff --git a/build/package/ceems_api_server/tsdb-config.yml b/build/package/ceems_api_server/tsdb-config.yml index c8c1deca..0b79cd8e 100644 --- a/build/package/ceems_api_server/tsdb-config.yml +++ b/build/package/ceems_api_server/tsdb-config.yml @@ -28,12 +28,12 @@ queries: {} # avg_over_time( # avg by (uuid) ( # ( - # rate(ceems_slurm_job_cpu_user_seconds{uuid=~"{{.UUIDs}}"}[{{.RateInterval}}]) + # rate(ceems_compute_unit_cpu_user_seconds{uuid=~"{{.UUIDs}}"}[{{.RateInterval}}]) # + - # rate(ceems_slurm_job_cpu_system_seconds{uuid=~"{{.UUIDs}}"}[{{.RateInterval}}]) + # rate(ceems_compute_unit_cpu_system_seconds{uuid=~"{{.UUIDs}}"}[{{.RateInterval}}]) # ) # / - # ceems_slurm_job_cpus{uuid=~"{{.UUIDs}}"} + # ceems_compute_unit_cpus{uuid=~"{{.UUIDs}}"} # )[{{.Range}}:] # ) * 100 @@ -41,9 +41,9 @@ queries: {} # avg_cpu_mem_usage: | # avg_over_time( # avg by (uuid) ( - # ceems_slurm_job_memory_used_bytes{uuid=~"{{.UUIDs}}"} + # ceems_compute_unit_memory_used_bytes{uuid=~"{{.UUIDs}}"} # / - # ceems_slurm_job_memory_total_bytes{uuid=~"{{.UUIDs}}"} + # ceems_compute_unit_memory_total_bytes{uuid=~"{{.UUIDs}}"} # )[{{.Range}}:] # ) * 100 @@ -51,7 +51,7 @@ queries: {} # total_cpu_energy_usage_kwh: | # sum_over_time( # sum by (uuid) ( - # unit:ceems_slurm_job_cpu_energy_usage:sum{uuid=~"{{.UUIDs}}"} * {{.ScrapeIntervalMilli}} / 3.6e9 + # unit:ceems_compute_unit_cpu_energy_usage:sum{uuid=~"{{.UUIDs}}"} * {{.ScrapeIntervalMilli}} / 3.6e9 # )[{{.Range}}:{{.ScrapeInterval}}] # ) @@ -60,7 +60,7 @@ queries: {} # sum_over_time( # sum by (uuid) ( # label_replace( - # unit:ceems_slurm_job_cpu_energy_usage:sum{uuid=~"{{.UUIDs}}"} * {{.ScrapeIntervalMilli}} / 3.6e9, + # unit:ceems_compute_unit_cpu_energy_usage:sum{uuid=~"{{.UUIDs}}"} * {{.ScrapeIntervalMilli}} / 3.6e9, # "common_label", # "mock", # "hostname", @@ -83,7 +83,7 @@ queries: {} # avg by (uuid) ( # DCGM_FI_DEV_GPU_UTIL # * on (gpuuuid) group_right () - # ceems_slurm_job_gpu_index_flag{uuid=~"{{.UUIDs}}"} + # ceems_compute_unit_gpu_index_flag{uuid=~"{{.UUIDs}}"} # )[{{.Range}}:{{.ScrapeInterval}}] # ) @@ -93,7 +93,7 @@ queries: {} # avg by (uuid) ( # DCGM_FI_DEV_MEM_COPY_UTIL # * on (gpuuuid) group_right () - # ceems_slurm_job_gpu_index_flag{uuid=~"{{.UUIDs}}"} + # ceems_compute_unit_gpu_index_flag{uuid=~"{{.UUIDs}}"} # )[{{.Range}}:{{.ScrapeInterval}}] # ) @@ -103,7 +103,7 @@ queries: {} # sum by (uuid) ( # instance:DCGM_FI_DEV_POWER_USAGE:pue_avg * {{.ScrapeIntervalMilli}} / 3.6e9 # * on (gpuuuid) group_right() - # ceems_slurm_job_gpu_index_flag{uuid=~"{{.UUIDs}}"} + # ceems_compute_unit_gpu_index_flag{uuid=~"{{.UUIDs}}"} # )[{{.Range}}:{{.ScrapeInterval}}] # ) @@ -114,7 +114,7 @@ queries: {} # label_replace( # instance:DCGM_FI_DEV_POWER_USAGE:pue_avg * {{.ScrapeIntervalMilli}} / 3.6e+09 # * on (gpuuuid) group_right () - # ceems_slurm_job_gpu_index_flag{uuid=~"{{.UUIDs}}"}, + # ceems_compute_unit_gpu_index_flag{uuid=~"{{.UUIDs}}"}, # "common_label", # "mock", # "instance", diff --git a/internal/structset/structset.go b/internal/structset/structset.go index dcc303a2..3ed7d826 100644 --- a/internal/structset/structset.go +++ b/internal/structset/structset.go @@ -40,9 +40,12 @@ func GetStructFieldNames(Struct interface{}) []string { // } // Get tag value of field. If tag value is "-", return lower case value of field name +// If tag is empty, return name of field in map key func getTagValue(field reflect.StructField, tag string) string { if field.Tag.Get(tag) == "-" { return strings.ToLower(field.Name) + } else if field.Tag.Get(tag) == "" { + return field.Name } else { return field.Tag.Get(tag) } diff --git a/pkg/api/base/base.go b/pkg/api/base/base.go index 5f8b7031..eee48b57 100644 --- a/pkg/api/base/base.go +++ b/pkg/api/base/base.go @@ -7,7 +7,6 @@ import ( "time" "github.com/alecthomas/kingpin/v2" - "github.com/mahendrapaipuri/ceems/internal/structset" "github.com/mahendrapaipuri/ceems/pkg/api/models" ) @@ -32,10 +31,16 @@ var ( UsageDBTableColNames = models.Usage{}.TagNames("sql") ) -// Map of field names to DB column type +// Map of struct field name to DB column name var ( - UnitsDBTableColTypeMap = structset.GetStructFieldTagMap(models.Unit{}, "sql", "sqlitetype") - UsageDBTableColTypeMap = structset.GetStructFieldTagMap(models.Usage{}, "sql", "sqlitetype") + UnitsDBTableStructFieldColNameMap = models.Unit{}.TagMap("", "sql") // structset.GetStructFieldTagMap(models.Unit{}, "", "sql") + UsageDBTableStructFieldColNameMap = models.Usage{}.TagMap("", "sql") // structset.GetStructFieldTagMap(models.Usage{}, "", "sql") +) + +// Map of DB column names to DB column type +var ( + UnitsDBTableColTypeMap = models.Unit{}.TagMap("sql", "sqlitetype") // structset.GetStructFieldTagMap(models.Unit{}, "sql", "sqlitetype") + UsageDBTableColTypeMap = models.Usage{}.TagMap("sql", "sqlitetype") // structset.GetStructFieldTagMap(models.Usage{}, "sql", "sqlitetype") ) // DatetimeLayout to be used in the package diff --git a/pkg/api/db/db.go b/pkg/api/db/db.go index 56152393..6d3fb186 100644 --- a/pkg/api/db/db.go +++ b/pkg/api/db/db.go @@ -47,7 +47,6 @@ type storageConfig struct { dbPath string dbBackupPath string retentionPeriod time.Duration - cutoffPeriod time.Duration lastUpdateTime time.Time lastUpdateTimeFile string skipDeleteOldUnits bool @@ -56,10 +55,9 @@ type storageConfig struct { // Stringer receiver for storageConfig func (s *storageConfig) String() string { return fmt.Sprintf( - "storageConfig{dbPath: %s, retentionPeriod: %s, cutoffPeriod: %s, "+ + "storageConfig{dbPath: %s, retentionPeriod: %s, "+ "lastUpdateTime: %s, lastUpdateTimeFile: %s}", - s.dbPath, s.retentionPeriod, s.cutoffPeriod, s.lastUpdateTime, - s.lastUpdateTimeFile, + s.dbPath, s.retentionPeriod, s.lastUpdateTime, s.lastUpdateTimeFile, ) } @@ -91,14 +89,15 @@ func init() { var unitTablePlaceholders []string for _, col := range base.UnitsDBTableColNames { if strings.HasPrefix(col, "num") { - unitTablePlaceholders = append(unitTablePlaceholders, fmt.Sprintf(" %[1]s = %[1]s + ?", col)) + unitTablePlaceholders = append(unitTablePlaceholders, fmt.Sprintf(" %[1]s = %[1]s + :%[1]s", col)) } else if strings.HasPrefix(col, "avg") { - unitTablePlaceholders = append(unitTablePlaceholders, fmt.Sprintf(" %[1]s = (%[1]s * num_intervals + ?) / (num_intervals + 1)", col)) + // Update average values only when the new value is non zero + unitTablePlaceholders = append(unitTablePlaceholders, fmt.Sprintf(" %[1]s = CASE WHEN :%[1]s > 0 THEN (%[1]s * num_updates + :%[1]s) / (num_updates + 1) ELSE %[1]s END", col)) } else if strings.HasPrefix(col, "total") { - unitTablePlaceholders = append(unitTablePlaceholders, fmt.Sprintf(" %[1]s = (%[1]s + ?)", col)) + unitTablePlaceholders = append(unitTablePlaceholders, fmt.Sprintf(" %[1]s = (%[1]s + :%[1]s)", col)) // We will need to update end time, elapsed time and state as they change with time } else if slices.Contains([]string{"ended_at", "ended_at_ts", "elapsed", "elapsed_raw", "state", "tags"}, col) { - unitTablePlaceholders = append(unitTablePlaceholders, fmt.Sprintf(" %[1]s = ?", col)) + unitTablePlaceholders = append(unitTablePlaceholders, fmt.Sprintf(" %[1]s = :%[1]s", col)) } else { continue } @@ -106,13 +105,10 @@ func init() { // Unit update statement unitStmt := fmt.Sprintf( - "INSERT INTO %s (%s) VALUES %s %s", + "INSERT INTO %s (%s) VALUES (:%s) %s", base.UnitsDBTableName, strings.Join(base.UnitsDBTableColNames, ","), - fmt.Sprintf( - "(%s)", - strings.Join(strings.Split(strings.Repeat("?", len(base.UnitsDBTableColNames)), ""), ","), - ), + strings.Join(base.UnitsDBTableColNames, ",:"), // Index is defined in 000001_create_unit_table.up.sql // Update: 20240327: Index updated in 000004_alter_units_usage_tables.up.sql "ON CONFLICT(resource_manager,uuid,started_at) DO UPDATE SET", @@ -130,13 +126,14 @@ func init() { var usageTablePlaceholders []string for _, col := range base.UsageDBTableColNames { if strings.HasPrefix(col, "num") { - usageTablePlaceholders = append(usageTablePlaceholders, fmt.Sprintf(" %[1]s = %[1]s + ?", col)) + usageTablePlaceholders = append(usageTablePlaceholders, fmt.Sprintf(" %[1]s = %[1]s + :%[1]s", col)) } else if strings.HasPrefix(col, "avg") { - usageTablePlaceholders = append(usageTablePlaceholders, fmt.Sprintf(" %[1]s = (%[1]s * num_units + ?) / (num_units + 1)", col)) + // Update average values only when the new value is non zero + usageTablePlaceholders = append(usageTablePlaceholders, fmt.Sprintf(" %[1]s = CASE WHEN :%[1]s > 0 THEN (%[1]s * num_updates + :%[1]s) / (num_updates + 1) ELSE %[1]s END", col)) } else if strings.HasPrefix(col, "total") { - usageTablePlaceholders = append(usageTablePlaceholders, fmt.Sprintf(" %[1]s = (%[1]s + ?)", col)) + usageTablePlaceholders = append(usageTablePlaceholders, fmt.Sprintf(" %[1]s = (%[1]s + :%[1]s)", col)) } else if col == "tags" { - usageTablePlaceholders = append(usageTablePlaceholders, fmt.Sprintf(" %[1]s = ?", col)) + usageTablePlaceholders = append(usageTablePlaceholders, fmt.Sprintf(" %[1]s = :%[1]s", col)) } else { continue } @@ -144,13 +141,10 @@ func init() { // Usage update statement usageStmt := fmt.Sprintf( - "INSERT INTO %s (%s) VALUES %s %s", + "INSERT INTO %s (%s) VALUES (:%s) %s", base.UsageDBTableName, strings.Join(base.UsageDBTableColNames, ","), - fmt.Sprintf( - "(%s)", - strings.Join(strings.Split(strings.Repeat("?", len(base.UsageDBTableColNames)), ""), ","), - ), + strings.Join(base.UsageDBTableColNames, ",:"), // Index is defined in 000002_create_usage_table.up.sql // Update: 20240327: Index updated in 000004_alter_units_usage_tables.up.sql "ON CONFLICT(resource_manager,usr,project) DO UPDATE SET", @@ -416,67 +410,44 @@ func (s *statsDB) execStatements(statements map[string]*sql.Stmt, units []models } // level.Debug(s.logger).Log("msg", "Inserting unit", "id", unit.Jobid) + // Use named parameters to not to repeat the values if _, err = statements[base.UnitsDBTableName].Exec( - unit.ResourceManager, - unit.UUID, - unit.Name, - unit.Project, - unit.Grp, - unit.Usr, - unit.CreatedAt, - unit.StartedAt, - unit.EndedAt, - unit.CreatedAtTS, - unit.StartedAtTS, - unit.EndedAtTS, - unit.Elapsed, - unit.ElapsedRaw, - unit.State, - unit.Allocation, - unit.TotalCPUBilling, - unit.TotalGPUBilling, - unit.TotalMiscBilling, - unit.AveCPUUsage, - unit.AveCPUMemUsage, - unit.TotalCPUEnergyUsage, - unit.TotalCPUEmissions, - unit.AveGPUUsage, - unit.AveGPUMemUsage, - unit.TotalGPUEnergyUsage, - unit.TotalGPUEmissions, - unit.TotalIOWriteHot, - unit.TotalIOReadHot, - unit.TotalIOWriteCold, - unit.TotalIOReadCold, - unit.TotalIngress, - unit.TotalOutgress, - unit.Tags, - ignore, - 1, // NumIntervals - unit.EndedAt, - unit.EndedAtTS, - unit.Elapsed, - unit.ElapsedRaw, - unit.State, - unit.TotalCPUBilling, - unit.TotalGPUBilling, - unit.TotalMiscBilling, - unit.AveCPUUsage, - unit.AveCPUMemUsage, - unit.TotalCPUEnergyUsage, - unit.TotalCPUEmissions, - unit.AveGPUUsage, - unit.AveGPUMemUsage, - unit.TotalGPUEnergyUsage, - unit.TotalGPUEmissions, - unit.TotalIOWriteHot, - unit.TotalIOReadHot, - unit.TotalIOWriteCold, - unit.TotalIOReadCold, - unit.TotalIngress, - unit.TotalOutgress, - unit.Tags, - 1, // NumIntervals + sql.Named(base.UnitsDBTableStructFieldColNameMap["ResourceManager"], unit.ResourceManager), + sql.Named(base.UnitsDBTableStructFieldColNameMap["UUID"], unit.UUID), + sql.Named(base.UnitsDBTableStructFieldColNameMap["Name"], unit.Name), + sql.Named(base.UnitsDBTableStructFieldColNameMap["Project"], unit.Project), + sql.Named(base.UnitsDBTableStructFieldColNameMap["Grp"], unit.Grp), + sql.Named(base.UnitsDBTableStructFieldColNameMap["Usr"], unit.Usr), + sql.Named(base.UnitsDBTableStructFieldColNameMap["CreatedAt"], unit.CreatedAt), + sql.Named(base.UnitsDBTableStructFieldColNameMap["StartedAt"], unit.StartedAt), + sql.Named(base.UnitsDBTableStructFieldColNameMap["EndedAt"], unit.EndedAt), + sql.Named(base.UnitsDBTableStructFieldColNameMap["CreatedAtTS"], unit.CreatedAtTS), + sql.Named(base.UnitsDBTableStructFieldColNameMap["StartedAtTS"], unit.StartedAtTS), + sql.Named(base.UnitsDBTableStructFieldColNameMap["EndedAtTS"], unit.EndedAtTS), + sql.Named(base.UnitsDBTableStructFieldColNameMap["Elapsed"], unit.Elapsed), + sql.Named(base.UnitsDBTableStructFieldColNameMap["ElapsedRaw"], unit.ElapsedRaw), + sql.Named(base.UnitsDBTableStructFieldColNameMap["State"], unit.State), + sql.Named(base.UnitsDBTableStructFieldColNameMap["Allocation"], unit.Allocation), + sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalCPUBilling"], unit.TotalCPUBilling), + sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalGPUBilling"], unit.TotalGPUBilling), + sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalMiscBilling"], unit.TotalMiscBilling), + sql.Named(base.UnitsDBTableStructFieldColNameMap["AveCPUUsage"], unit.AveCPUUsage), + sql.Named(base.UnitsDBTableStructFieldColNameMap["AveCPUMemUsage"], unit.AveCPUMemUsage), + sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalCPUEnergyUsage"], unit.TotalCPUEnergyUsage), + sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalCPUEmissions"], unit.TotalCPUEmissions), + sql.Named(base.UnitsDBTableStructFieldColNameMap["AveGPUUsage"], unit.AveGPUUsage), + sql.Named(base.UnitsDBTableStructFieldColNameMap["AveGPUMemUsage"], unit.AveGPUMemUsage), + sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalGPUEnergyUsage"], unit.TotalGPUEnergyUsage), + sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalGPUEmissions"], unit.TotalGPUEmissions), + sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalIOWriteHot"], unit.TotalIOWriteHot), + sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalIOReadHot"], unit.TotalIOReadHot), + sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalIOWriteCold"], unit.TotalIOWriteCold), + sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalIOReadCold"], unit.TotalIOReadCold), + sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalIngress"], unit.TotalIngress), + sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalOutgress"], unit.TotalOutgress), + sql.Named(base.UnitsDBTableStructFieldColNameMap["Tags"], unit.Tags), + sql.Named(base.UnitsDBTableStructFieldColNameMap["ignore"], ignore), + sql.Named(base.UnitsDBTableStructFieldColNameMap["numupdates"], 1), ); err != nil { level.Error(s.logger). Log("msg", "Failed to insert unit in DB", "id", unit.UUID, "err", err) @@ -490,46 +461,30 @@ func (s *statsDB) execStatements(statements map[string]*sql.Stmt, units []models } // Update Usage table + // Use named parameters to not to repeat the values if _, err = statements[base.UsageDBTableName].Exec( - unit.ResourceManager, - unitIncr, - unit.Project, - unit.Usr, - unit.TotalCPUBilling, - unit.TotalGPUBilling, - unit.TotalMiscBilling, - unit.AveCPUUsage, - unit.AveCPUMemUsage, - unit.TotalCPUEnergyUsage, - unit.TotalCPUEmissions, - unit.AveGPUUsage, - unit.AveGPUMemUsage, - unit.TotalGPUEnergyUsage, - unit.TotalGPUEmissions, - unit.TotalIOWriteHot, - unit.TotalIOReadHot, - unit.TotalIOWriteCold, - unit.TotalIOReadCold, - unit.TotalIngress, - unit.TotalOutgress, - unitIncr, - unit.TotalCPUBilling, - unit.TotalGPUBilling, - unit.TotalMiscBilling, - unit.AveCPUUsage, - unit.AveCPUMemUsage, - unit.TotalCPUEnergyUsage, - unit.TotalCPUEmissions, - unit.AveGPUUsage, - unit.AveGPUMemUsage, - unit.TotalGPUEnergyUsage, - unit.TotalGPUEmissions, - unit.TotalIOWriteHot, - unit.TotalIOReadHot, - unit.TotalIOWriteCold, - unit.TotalIOReadCold, - unit.TotalIngress, - unit.TotalOutgress, + sql.Named(base.UsageDBTableStructFieldColNameMap["ResourceManager"], unit.ResourceManager), + sql.Named(base.UsageDBTableStructFieldColNameMap["NumUnits"], unitIncr), + sql.Named(base.UsageDBTableStructFieldColNameMap["Project"], unit.Project), + sql.Named(base.UsageDBTableStructFieldColNameMap["Usr"], unit.Usr), + sql.Named(base.UsageDBTableStructFieldColNameMap["TotalCPUBilling"], unit.TotalCPUBilling), + sql.Named(base.UsageDBTableStructFieldColNameMap["TotalGPUBilling"], unit.TotalGPUBilling), + sql.Named(base.UsageDBTableStructFieldColNameMap["TotalMiscBilling"], unit.TotalMiscBilling), + sql.Named(base.UsageDBTableStructFieldColNameMap["AveCPUUsage"], unit.AveCPUUsage), + sql.Named(base.UsageDBTableStructFieldColNameMap["AveCPUMemUsage"], unit.AveCPUMemUsage), + sql.Named(base.UsageDBTableStructFieldColNameMap["TotalCPUEnergyUsage"], unit.TotalCPUEnergyUsage), + sql.Named(base.UsageDBTableStructFieldColNameMap["TotalCPUEmissions"], unit.TotalCPUEmissions), + sql.Named(base.UsageDBTableStructFieldColNameMap["AveGPUUsage"], unit.AveGPUUsage), + sql.Named(base.UsageDBTableStructFieldColNameMap["AveGPUMemUsage"], unit.AveGPUMemUsage), + sql.Named(base.UsageDBTableStructFieldColNameMap["TotalGPUEnergyUsage"], unit.TotalGPUEnergyUsage), + sql.Named(base.UsageDBTableStructFieldColNameMap["TotalGPUEmissions"], unit.TotalGPUEmissions), + sql.Named(base.UsageDBTableStructFieldColNameMap["TotalIOWriteHot"], unit.TotalIOWriteHot), + sql.Named(base.UsageDBTableStructFieldColNameMap["TotalIOReadHot"], unit.TotalIOReadHot), + sql.Named(base.UsageDBTableStructFieldColNameMap["TotalIOWriteCold"], unit.TotalIOWriteCold), + sql.Named(base.UsageDBTableStructFieldColNameMap["TotalIOReadCold"], unit.TotalIOReadCold), + sql.Named(base.UsageDBTableStructFieldColNameMap["TotalIngress"], unit.TotalIngress), + sql.Named(base.UsageDBTableStructFieldColNameMap["TotalOutgress"], unit.TotalOutgress), + sql.Named(base.UsageDBTableStructFieldColNameMap["numupdates"], 1), ); err != nil { level.Error(s.logger). Log("msg", "Failed to update usage table in DB", "id", unit.UUID, "err", err) diff --git a/pkg/api/db/db_test.go b/pkg/api/db/db_test.go index e112cc53..b0638b0d 100644 --- a/pkg/api/db/db_test.go +++ b/pkg/api/db/db_test.go @@ -29,8 +29,17 @@ type mockFetcherThree struct { logger log.Logger } -var mockUnitsOne = []models.Unit{{UUID: "10000"}, {UUID: "10001"}, {UUID: "10002"}} -var mockUnitsTwo = []models.Unit{{UUID: "20000"}, {UUID: "20001"}, {UUID: "20002"}} +var mockUnitsOne = []models.Unit{ + {UUID: "10000", Usr: "foo1", Project: "fooprj"}, + {UUID: "10001", Usr: "foo1", Project: "fooprj"}, + {UUID: "10002", Usr: "foo1", Project: "fooprj"}, + {UUID: "10003", Usr: "foo2", Project: "fooprj"}, +} +var mockUnitsTwo = []models.Unit{ + {UUID: "20000", Usr: "bar1", Project: "barprj"}, + {UUID: "20001", Usr: "bar3", Project: "barprj"}, + {UUID: "20002", Usr: "bar3", Project: "barprj"}, +} var mockUnits = append(mockUnitsOne, mockUnitsTwo...) func newMockManager(logger log.Logger) (*resource.Manager, error) { @@ -62,7 +71,15 @@ type mockUpdater struct { logger log.Logger } -var mockUpdatedUnits = []models.Unit{{UUID: "10000", Usr: "foo"}, {UUID: "10001", Usr: "bar"}} +var mockUpdatedUnits = []models.Unit{ + {UUID: "10000", Usr: "foo1", Project: "fooprj", AveCPUUsage: 10, AveGPUUsage: 20}, + {UUID: "10001", Usr: "foo1", Project: "fooprj", AveCPUUsage: 15, TotalCPUEnergyUsage: 100}, + {UUID: "10002", Usr: "foo1", Project: "fooprj", TotalCPUEmissions: 20}, + {UUID: "10003", Usr: "foo2", Project: "fooprj", TotalCPUEmissions: 40}, + {UUID: "20000", Usr: "bar1", Project: "barprj", TotalGPUEnergyUsage: 200}, + {UUID: "20001", Usr: "bar3", Project: "barprj", AveCPUUsage: 20, AveGPUMemUsage: 40}, + {UUID: "20002", Usr: "bar3", Project: "barprj", TotalGPUEmissions: 40}, +} func newMockUpdater(logger log.Logger) (*updater.UnitUpdater, error) { return &updater.UnitUpdater{ @@ -229,14 +246,14 @@ func TestUnitStatsDBEntries(t *testing.T) { t.Errorf("expected one err from fetcher got none") } - // Try to insert data. It should fail + // Try to insert data err = s.Collect() if err != nil { t.Errorf("Failed to collect units data: %s", err) } - // Make query - rows, err := s.db.Query("SELECT uuid,usr FROM units ORDER BY uuid") + // Make units query + rows, err := s.db.Query("SELECT uuid,usr,project,avg_cpu_usage,avg_cpu_mem_usage,total_cpu_energy_usage_kwh,total_cpu_emissions_gms,avg_gpu_usage,avg_gpu_mem_usage,total_gpu_energy_usage_kwh,total_gpu_emissions_gms FROM units ORDER BY uuid") if err != nil { t.Errorf("Failed to make DB query") } @@ -246,7 +263,11 @@ func TestUnitStatsDBEntries(t *testing.T) { for rows.Next() { var unit models.Unit - if err = rows.Scan(&unit.UUID, &unit.Usr); err != nil { + if err = rows.Scan( + &unit.UUID, &unit.Usr, &unit.Project, &unit.AveCPUUsage, + &unit.AveCPUMemUsage, &unit.TotalCPUEnergyUsage, + &unit.TotalCPUEmissions, &unit.AveGPUUsage, &unit.AveGPUMemUsage, + &unit.TotalGPUEnergyUsage, &unit.TotalGPUEmissions); err != nil { t.Errorf("failed to scan row: %s", err) } units = append(units, unit) @@ -255,6 +276,26 @@ func TestUnitStatsDBEntries(t *testing.T) { if !reflect.DeepEqual(units, mockUpdatedUnits) { t.Errorf("expected %#v, \n got %#v", mockUpdatedUnits, units) } + + // Make usage query + rows, err = s.db.Query("SELECT avg_cpu_usage,num_updates FROM usage WHERE usr = 'foo1'") + if err != nil { + t.Errorf("Failed to make DB query") + } + defer rows.Close() + + var cpuUsage float64 + var numUpdates int64 + for rows.Next() { + if err = rows.Scan(&cpuUsage, &numUpdates); err != nil { + t.Errorf("failed to scan row: %s", err) + } + } + + if cpuUsage != 12.5 { + t.Errorf("expected 12.5, \n got %f", cpuUsage) + } + // Close DB s.Stop() } @@ -340,8 +381,8 @@ func TestUnitStatsDBBackup(t *testing.T) { for rows.Next() { numRows += 1 } - if numRows != 6 { - t.Errorf("Backup DB check failed. Expected rows 6 , Got %d.", numRows) + if numRows != 7 { + t.Errorf("Backup DB check failed. Expected rows 7 , Got %d.", numRows) } } diff --git a/pkg/api/models/models.go b/pkg/api/models/models.go index 19f257ba..dea212ac 100644 --- a/pkg/api/models/models.go +++ b/pkg/api/models/models.go @@ -2,9 +2,6 @@ package models import ( - "encoding/json" - "math" - "github.com/mahendrapaipuri/ceems/internal/structset" ) @@ -13,43 +10,6 @@ const ( usageTableName = "usage" ) -// JSONFloat is a custom float64 that can handle Inf and NaN during JSON (un)marshalling -type JSONFloat float64 - -// MarshalJSON marshals JSONFloat into byte array -func (j JSONFloat) MarshalJSON() ([]byte, error) { - v := float64(j) - if math.IsInf(v, 0) || math.IsNaN(v) { - // handle infinity, assign desired value to v - s := "0" - return []byte(s), nil - } - return json.Marshal(v) // marshal result as standard float64 -} - -// UnmarshalJSON unmarshals byte array into JSONFloat -func (j *JSONFloat) UnmarshalJSON(v []byte) error { - if s := string(v); s == "+Inf" || s == "-Inf" || s == "NaN" { - // if +Inf/-Inf indiciates infinity - if s == "+Inf" { - *j = JSONFloat(math.Inf(1)) - return nil - } else if s == "-Inf" { - *j = JSONFloat(math.Inf(-1)) - return nil - } - *j = JSONFloat(math.NaN()) - return nil - } - // just a regular float value - var fv float64 - if err := json.Unmarshal(v, &fv); err != nil { - return err - } - *j = JSONFloat(fv) - return nil -} - // Unit is an abstract compute unit that can mean Job (batchjobs), VM (cloud) or Pod (k8s) type Unit struct { ID int64 `json:"-" sql:"id" sqlitetype:"integer not null primary key"` @@ -88,7 +48,7 @@ type Unit struct { TotalOutgress JSONFloat `json:"total_outgress_in_gb,omitempty" sql:"total_outgress_in_gb" sqlitetype:"real"` // Total outgress traffic in GB of unit Tags Tag `json:"tags,omitempty" sql:"tags" sqlitetype:"text"` // A map to store generic info. String and int64 are valid value types of map Ignore int `json:"-" sql:"ignore" sqlitetype:"integer"` // Whether to ignore unit - NumIntervals int `json:"-" sql:"num_intervals" sqlitetype:"integer"` // Number of update intervals. This is used internally to update aggregate metrics + NumUpdates int64 `json:"-" sql:"num_updates" sqlitetype:"integer"` // Number of updates. This is used internally to update aggregate metrics } // TableName returns the table which units are stored into. @@ -101,6 +61,12 @@ func (u Unit) TagNames(tag string) []string { return structset.GetStructFieldTagValues(u, tag) } +// TagMap returns a map of tags based on keyTag and valueTag. If keyTag is empty, +// field names are used as map keys. +func (u Unit) TagMap(keyTag string, valueTag string) map[string]string { + return structset.GetStructFieldTagMap(u, keyTag, valueTag) +} + // Usage statistics of each project/tenant/namespace type Usage struct { ID int64 `json:"-" sql:"id" sqlitetype:"integer not null primary key"` @@ -125,6 +91,7 @@ type Usage struct { TotalIOReadCold JSONFloat `json:"total_io_read_cold_gb" sql:"total_io_read_cold_gb" sqlitetype:"real"` // Total IO read on cold storage in GB during lifetime of project TotalIngress JSONFloat `json:"total_ingress_in_gb" sql:"total_ingress_in_gb" sqlitetype:"real"` // Total ingress traffic in GB of project TotalOutgress JSONFloat `json:"total_outgress_in_gb" sql:"total_outgress_in_gb" sqlitetype:"real"` // Total outgress traffic in GB of project + NumUpdates int64 `json:"-" sql:"num_updates" sqlitetype:"integer"` // Number of updates. This is used internally to update aggregate metrics } // TableName returns the table which usage stats are stored into. @@ -137,6 +104,12 @@ func (u Usage) TagNames(tag string) []string { return structset.GetStructFieldTagValues(u, tag) } +// TagMap returns a map of tags based on keyTag and valueTag. If keyTag is empty, +// field names are used as map keys. +func (u Usage) TagMap(keyTag string, valueTag string) map[string]string { + return structset.GetStructFieldTagMap(u, keyTag, valueTag) +} + // Project struct type Project struct { Name string `json:"name,omitempty" sql:"project" sqlitetype:"text"` diff --git a/pkg/api/models/types.go b/pkg/api/models/types.go index e8553445..2b5135bf 100644 --- a/pkg/api/models/types.go +++ b/pkg/api/models/types.go @@ -5,6 +5,7 @@ import ( "database/sql/driver" "encoding/json" "fmt" + "math" ) // Generic is map to store any mixed data types. Only string and int are supported. @@ -67,3 +68,40 @@ type Tag = Generic // Allocation is a type alias to Generic that stores allocation data of compute units type Allocation = Generic + +// JSONFloat is a custom float64 that can handle Inf and NaN during JSON (un)marshalling +type JSONFloat float64 + +// MarshalJSON marshals JSONFloat into byte array +func (j JSONFloat) MarshalJSON() ([]byte, error) { + v := float64(j) + if math.IsInf(v, 0) || math.IsNaN(v) { + // handle infinity, assign desired value to v + s := "0" + return []byte(s), nil + } + return json.Marshal(v) // marshal result as standard float64 +} + +// UnmarshalJSON unmarshals byte array into JSONFloat +func (j *JSONFloat) UnmarshalJSON(v []byte) error { + if s := string(v); s == "+Inf" || s == "-Inf" || s == "NaN" { + // if +Inf/-Inf indiciates infinity + if s == "+Inf" { + *j = JSONFloat(math.Inf(1)) + return nil + } else if s == "-Inf" { + *j = JSONFloat(math.Inf(-1)) + return nil + } + *j = JSONFloat(math.NaN()) + return nil + } + // just a regular float value + var fv float64 + if err := json.Unmarshal(v, &fv); err != nil { + return err + } + *j = JSONFloat(fv) + return nil +} diff --git a/pkg/api/resource/slurm.go b/pkg/api/resource/slurm.go index 924f179f..ae187bea 100644 --- a/pkg/api/resource/slurm.go +++ b/pkg/api/resource/slurm.go @@ -255,7 +255,7 @@ func parseSacctCmdOutput(sacctOutput string, start time.Time, end time.Time) ([] // If job has not started between interval's start and end time, // elapsedTime should be zero. This can happen when job is in pending state // after submission - if jobStartTS > intEndTS { + if jobStartTS == 0 { endMark = startMark goto elapsed }