Skip to content

Commit

Permalink
feat: Estimate agg stats based on weighted mean
Browse files Browse the repository at this point in the history
* Use cpu hours as weight to get agg cpu usage

* Use cpu mem hours as weight to get agg cpu mem usage

* Use gpu hours as weight for gpu ang gpu mem usage

* Use weighted mean when returning current usage

Signed-off-by: Mahendra Paipuri <[email protected]>
  • Loading branch information
mahendrapaipuri committed May 2, 2024
1 parent a776496 commit 748522a
Show file tree
Hide file tree
Showing 13 changed files with 559 additions and 256 deletions.
71 changes: 54 additions & 17 deletions pkg/api/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ const (

var (
prepareStatements = make(map[string]string, 3)

// For estimating average values, we do weighted average method using following
// values as weight for each DB column
// For CPU and GPU, we use CPU time and GPU time as weights
// For memory usage, we use Walltime * Mem for CPU as weight and just walltime
// for GPU
Weights = map[string]string{
"avg_cpu_usage": "total_cputime_seconds",
"avg_gpu_usage": "total_gputime_seconds",
"avg_cpu_mem_usage": "total_cpumemtime_seconds",
"avg_gpu_mem_usage": "total_gpumemtime_seconds",
}
)

// Init func to set prepareStatements
Expand All @@ -92,7 +104,10 @@ func init() {
unitTablePlaceholders = append(unitTablePlaceholders, fmt.Sprintf(" %[1]s = %[1]s + :%[1]s", col))
} else if strings.HasPrefix(col, "avg") {
// Update average values only when the new value is non zero
unitTablePlaceholders = append(unitTablePlaceholders, fmt.Sprintf(" %[1]s = CASE WHEN :%[1]s > 0 THEN (%[1]s * num_updates + :%[1]s) / (num_updates + 1) ELSE %[1]s END", col))
unitTablePlaceholders = append(
unitTablePlaceholders,
fmt.Sprintf(" %[1]s = CASE WHEN :%[1]s > 0 THEN (%[1]s * %[2]s + :%[1]s * :%[2]s) / (%[2]s + :%[2]s) ELSE %[1]s END", col, Weights[col]),
)
} else if strings.HasPrefix(col, "total") {
unitTablePlaceholders = append(unitTablePlaceholders, fmt.Sprintf(" %[1]s = (%[1]s + :%[1]s)", col))
// We will need to update end time, elapsed time and state as they change with time
Expand Down Expand Up @@ -129,7 +144,10 @@ func init() {
usageTablePlaceholders = append(usageTablePlaceholders, fmt.Sprintf(" %[1]s = %[1]s + :%[1]s", col))
} else if strings.HasPrefix(col, "avg") {
// Update average values only when the new value is non zero
usageTablePlaceholders = append(usageTablePlaceholders, fmt.Sprintf(" %[1]s = CASE WHEN :%[1]s > 0 THEN (%[1]s * num_updates + :%[1]s) / (num_updates + 1) ELSE %[1]s END", col))
usageTablePlaceholders = append(
usageTablePlaceholders,
fmt.Sprintf(" %[1]s = CASE WHEN :%[1]s > 0 THEN (%[1]s * %[2]s + :%[1]s * :%[2]s) / (%[2]s + :%[2]s) ELSE %[1]s END", col, Weights[col]),
)
} else if strings.HasPrefix(col, "total") {
usageTablePlaceholders = append(usageTablePlaceholders, fmt.Sprintf(" %[1]s = (%[1]s + :%[1]s)", col))
} else if col == "tags" {
Expand Down Expand Up @@ -332,11 +350,11 @@ func (s *statsDB) getUnitStats(startTime, endTime time.Time) error {
// Delete older entries and free up DB pages
// In testing we want to skip this
if !s.storage.skipDeleteOldUnits {
level.Debug(s.logger).Log("msg", "Cleaning up old units")
level.Debug(s.logger).Log("msg", "Cleaning up old entries in DB")
if err = s.purgeExpiredUnits(tx); err != nil {
level.Error(s.logger).Log("msg", "Failed to clean up old unit entries", "err", err)
level.Error(s.logger).Log("msg", "Failed to clean up old entries", "err", err)
} else {
level.Debug(s.logger).Log("msg", "Cleaned up old units in DB")
level.Debug(s.logger).Log("msg", "Cleaned up old entries in DB")
}
}

Expand Down Expand Up @@ -369,19 +387,35 @@ func (s *statsDB) getUnitStats(startTime, endTime time.Time) error {

// Delete old entries in DB
func (s *statsDB) purgeExpiredUnits(tx *sql.Tx) error {
deleteRowQuery := fmt.Sprintf(
// Purge expired units
deleteUnitsQuery := fmt.Sprintf(
"DELETE FROM %s WHERE started_at <= date('now', '-%d day')",
base.UnitsDBTableName,
int(s.storage.retentionPeriod.Hours()/24),
)
if _, err := tx.Exec(deleteRowQuery); err != nil {
if _, err := tx.Exec(deleteUnitsQuery); err != nil {
return err
}

// Get changes
var unitsDeleted int
_ = tx.QueryRow("SELECT changes();").Scan(&unitsDeleted)
level.Debug(s.logger).Log("units_deleted", unitsDeleted)

// Purge stale usage data
deleteUsageQuery := fmt.Sprintf(
"DELETE FROM %s WHERE last_updated_at <= date('now', '-%d day')",
base.UsageDBTableName,
int(s.storage.retentionPeriod.Hours()/24),
)
if _, err := tx.Exec(deleteUsageQuery); err != nil {
return err
}

// Get changes
var rowsDeleted int
_ = tx.QueryRow("SELECT changes();").Scan(&rowsDeleted)
level.Debug(s.logger).Log("units_deleted", rowsDeleted)
var usageDeleted int
_ = tx.QueryRow("SELECT changes();").Scan(&usageDeleted)
level.Debug(s.logger).Log("usage_deleted", usageDeleted)
return nil
}

Expand Down Expand Up @@ -425,12 +459,13 @@ func (s *statsDB) execStatements(statements map[string]*sql.Stmt, units []models
sql.Named(base.UnitsDBTableStructFieldColNameMap["StartedAtTS"], unit.StartedAtTS),
sql.Named(base.UnitsDBTableStructFieldColNameMap["EndedAtTS"], unit.EndedAtTS),
sql.Named(base.UnitsDBTableStructFieldColNameMap["Elapsed"], unit.Elapsed),
sql.Named(base.UnitsDBTableStructFieldColNameMap["ElapsedRaw"], unit.ElapsedRaw),
sql.Named(base.UnitsDBTableStructFieldColNameMap["State"], unit.State),
sql.Named(base.UnitsDBTableStructFieldColNameMap["Allocation"], unit.Allocation),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalCPUBilling"], unit.TotalCPUBilling),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalGPUBilling"], unit.TotalGPUBilling),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalMiscBilling"], unit.TotalMiscBilling),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalWallTime"], unit.TotalWallTime),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalCPUTime"], unit.TotalCPUTime),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalGPUTime"], unit.TotalGPUTime),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalCPUMemTime"], unit.TotalCPUMemTime),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalGPUMemTime"], unit.TotalGPUMemTime),
sql.Named(base.UnitsDBTableStructFieldColNameMap["AveCPUUsage"], unit.AveCPUUsage),
sql.Named(base.UnitsDBTableStructFieldColNameMap["AveCPUMemUsage"], unit.AveCPUMemUsage),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalCPUEnergyUsage"], unit.TotalCPUEnergyUsage),
Expand Down Expand Up @@ -468,9 +503,11 @@ func (s *statsDB) execStatements(statements map[string]*sql.Stmt, units []models
sql.Named(base.UsageDBTableStructFieldColNameMap["Project"], unit.Project),
sql.Named(base.UsageDBTableStructFieldColNameMap["Usr"], unit.Usr),
sql.Named(base.UsageDBTableStructFieldColNameMap["LastUpdatedAt"], time.Now().Format(base.DatetimeLayout)),
sql.Named(base.UsageDBTableStructFieldColNameMap["TotalCPUBilling"], unit.TotalCPUBilling),
sql.Named(base.UsageDBTableStructFieldColNameMap["TotalGPUBilling"], unit.TotalGPUBilling),
sql.Named(base.UsageDBTableStructFieldColNameMap["TotalMiscBilling"], unit.TotalMiscBilling),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalWallTime"], unit.TotalWallTime),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalCPUTime"], unit.TotalCPUTime),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalGPUTime"], unit.TotalGPUTime),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalCPUMemTime"], unit.TotalCPUMemTime),
sql.Named(base.UnitsDBTableStructFieldColNameMap["TotalGPUMemTime"], unit.TotalGPUMemTime),
sql.Named(base.UsageDBTableStructFieldColNameMap["AveCPUUsage"], unit.AveCPUUsage),
sql.Named(base.UsageDBTableStructFieldColNameMap["AveCPUMemUsage"], unit.AveCPUMemUsage),
sql.Named(base.UsageDBTableStructFieldColNameMap["TotalCPUEnergyUsage"], unit.TotalCPUEnergyUsage),
Expand Down
176 changes: 156 additions & 20 deletions pkg/api/db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package db

import (
"fmt"
"io"
"os"
"path/filepath"
"reflect"
Expand Down Expand Up @@ -30,15 +31,72 @@ type mockFetcherThree struct {
}

var mockUnitsOne = []models.Unit{
{UUID: "10000", Usr: "foo1", Project: "fooprj"},
{UUID: "10001", Usr: "foo1", Project: "fooprj"},
{UUID: "10002", Usr: "foo1", Project: "fooprj"},
{UUID: "10003", Usr: "foo2", Project: "fooprj"},
{
UUID: "10000",
Usr: "foo1",
Project: "fooprj",
TotalCPUTime: int64(1800),
TotalGPUTime: int64(900),
TotalWallTime: int64(900),
TotalCPUMemTime: int64(9000),
TotalGPUMemTime: int64(900),
},
{
UUID: "10001",
Usr: "foo1",
Project: "fooprj",
TotalCPUTime: int64(900),
TotalWallTime: int64(900),
TotalCPUMemTime: int64(4500),
},
{
UUID: "10002",
Usr: "foo1",
Project: "fooprj",
TotalCPUTime: int64(2700),
TotalWallTime: int64(900),
TotalCPUMemTime: int64(9000),
},
{
UUID: "10003",
Usr: "foo2",
Project: "fooprj",
TotalCPUTime: int64(3600),
TotalWallTime: int64(1800),
TotalCPUMemTime: int64(90000),
},
}
var mockUnitsTwo = []models.Unit{
{UUID: "20000", Usr: "bar1", Project: "barprj"},
{UUID: "20001", Usr: "bar3", Project: "barprj"},
{UUID: "20002", Usr: "bar3", Project: "barprj"},
{
UUID: "20000",
Usr: "bar1",
Project: "barprj",
TotalCPUTime: int64(900),
TotalGPUTime: int64(900),
TotalWallTime: int64(900),
TotalCPUMemTime: int64(9000),
TotalGPUMemTime: int64(900),
},
{
UUID: "20001",
Usr: "bar3",
Project: "barprj",
TotalCPUTime: int64(1800),
TotalGPUTime: int64(1800),
TotalWallTime: int64(900),
TotalCPUMemTime: int64(90000),
TotalGPUMemTime: int64(900),
},
{
UUID: "20002",
Usr: "bar3",
Project: "barprj",
TotalCPUTime: int64(2700),
TotalGPUTime: int64(900),
TotalWallTime: int64(900),
TotalCPUMemTime: int64(90000),
TotalGPUMemTime: int64(900),
},
}
var mockUnits = append(mockUnitsOne, mockUnitsTwo...)

Expand Down Expand Up @@ -72,13 +130,80 @@ type mockUpdater struct {
}

var mockUpdatedUnits = []models.Unit{
{UUID: "10000", Usr: "foo1", Project: "fooprj", AveCPUUsage: 10, AveGPUUsage: 20},
{UUID: "10001", Usr: "foo1", Project: "fooprj", AveCPUUsage: 15, TotalCPUEnergyUsage: 100},
{UUID: "10002", Usr: "foo1", Project: "fooprj", TotalCPUEmissions: 20},
{UUID: "10003", Usr: "foo2", Project: "fooprj", TotalCPUEmissions: 40},
{UUID: "20000", Usr: "bar1", Project: "barprj", TotalGPUEnergyUsage: 200},
{UUID: "20001", Usr: "bar3", Project: "barprj", AveCPUUsage: 20, AveGPUMemUsage: 40},
{UUID: "20002", Usr: "bar3", Project: "barprj", TotalGPUEmissions: 40},
{
UUID: "10000",
Usr: "foo1",
Project: "fooprj",
TotalCPUTime: int64(1800),
TotalGPUTime: int64(900),
TotalWallTime: int64(900),
TotalCPUMemTime: int64(9000),
TotalGPUMemTime: int64(900),
AveCPUUsage: 10,
AveGPUUsage: 20,
},
{
UUID: "10001",
Usr: "foo1",
Project: "fooprj",
TotalCPUTime: int64(900),
TotalWallTime: int64(900),
TotalCPUMemTime: int64(4500),
AveCPUUsage: 25,
TotalCPUEnergyUsage: 100,
},
{
UUID: "10002",
Usr: "foo1",
Project: "fooprj",
TotalCPUTime: int64(2700),
TotalWallTime: int64(900),
TotalCPUMemTime: int64(9000),
TotalCPUEmissions: 20,
},
{
UUID: "10003",
Usr: "foo2",
Project: "fooprj",
TotalCPUTime: int64(3600),
TotalWallTime: int64(1800),
TotalCPUMemTime: int64(90000),
TotalCPUEmissions: 40,
},
{
UUID: "20000",
Usr: "bar1",
Project: "barprj",
TotalCPUTime: int64(900),
TotalGPUTime: int64(900),
TotalWallTime: int64(900),
TotalCPUMemTime: int64(9000),
TotalGPUMemTime: int64(900),
TotalGPUEnergyUsage: 200,
},
{
UUID: "20001",
Usr: "bar3",
Project: "barprj",
TotalCPUTime: int64(1800),
TotalGPUTime: int64(1800),
TotalWallTime: int64(900),
TotalCPUMemTime: int64(90000),
TotalGPUMemTime: int64(900),
AveCPUUsage: 20,
AveGPUMemUsage: 40,
},
{
UUID: "20002",
Usr: "bar3",
Project: "barprj",
TotalCPUTime: int64(2700),
TotalGPUTime: int64(900),
TotalWallTime: int64(900),
TotalCPUMemTime: int64(90000),
TotalGPUMemTime: int64(900),
TotalGPUEmissions: 40,
},
}

func newMockUpdater(logger log.Logger) (*updater.UnitUpdater, error) {
Expand Down Expand Up @@ -254,7 +379,7 @@ func TestUnitStatsDBEntries(t *testing.T) {

// Make units query
rows, err := s.db.Query(
"SELECT uuid,usr,project,avg_cpu_usage,avg_cpu_mem_usage,total_cpu_energy_usage_kwh,total_cpu_emissions_gms,avg_gpu_usage,avg_gpu_mem_usage,total_gpu_energy_usage_kwh,total_gpu_emissions_gms FROM units ORDER BY uuid",
"SELECT uuid,usr,project,total_cputime_seconds,total_gputime_seconds,total_walltime_seconds,total_cpumemtime_seconds,total_gpumemtime_seconds,avg_cpu_usage,avg_cpu_mem_usage,total_cpu_energy_usage_kwh,total_cpu_emissions_gms,avg_gpu_usage,avg_gpu_mem_usage,total_gpu_energy_usage_kwh,total_gpu_emissions_gms FROM units ORDER BY uuid",
)
if err != nil {
t.Errorf("Failed to make DB query")
Expand All @@ -266,7 +391,10 @@ func TestUnitStatsDBEntries(t *testing.T) {
var unit models.Unit

if err = rows.Scan(
&unit.UUID, &unit.Usr, &unit.Project, &unit.AveCPUUsage,
&unit.UUID, &unit.Usr, &unit.Project, &unit.TotalCPUTime,
&unit.TotalGPUTime, &unit.TotalWallTime, &unit.TotalCPUMemTime,
&unit.TotalGPUMemTime,
&unit.AveCPUUsage,
&unit.AveCPUMemUsage, &unit.TotalCPUEnergyUsage,
&unit.TotalCPUEmissions, &unit.AveGPUUsage, &unit.AveGPUMemUsage,
&unit.TotalGPUEnergyUsage, &unit.TotalGPUEmissions); err != nil {
Expand All @@ -276,16 +404,24 @@ func TestUnitStatsDBEntries(t *testing.T) {
}

if !reflect.DeepEqual(units, mockUpdatedUnits) {
t.Errorf("expected %#v, \n got %#v", mockUpdatedUnits, units)
t.Errorf("expected %#v, \n\n\n got %#v", mockUpdatedUnits, units)
}

// Make usage query
rows, err = s.db.Query("SELECT avg_cpu_usage,num_updates FROM usage WHERE usr = 'foo1'")
if err != nil {
t.Errorf("Failed to make DB query")
t.Errorf("Failed to make DB query: %s", err)
}
defer rows.Close()

source, _ := os.Open(filepath.Join(tmpDir, "data", "ceems_api_server.db"))
defer source.Close()

destination, _ := os.Create("test.db")
defer destination.Close()
nBytes, _ := io.Copy(destination, source)
fmt.Println(nBytes)

var cpuUsage float64
var numUpdates int64
for rows.Next() {
Expand All @@ -294,8 +430,8 @@ func TestUnitStatsDBEntries(t *testing.T) {
}
}

if cpuUsage != 12.5 {
t.Errorf("expected 12.5, \n got %f", cpuUsage)
if cpuUsage < 15 {
t.Errorf("expected 15, \n got %f", cpuUsage)
}

// Close DB
Expand Down
12 changes: 12 additions & 0 deletions pkg/api/db/migrations/000006_alter_unit_usage_tables.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
ALTER TABLE units RENAME COLUMN total_cpu_billing TO total_cputime_seconds;
ALTER TABLE units RENAME COLUMN total_gpu_billing TO total_gputime_seconds;
ALTER TABLE units RENAME COLUMN total_misc_billing TO total_misctime_seconds;
ALTER TABLE usage RENAME COLUMN total_cpu_billing TO total_cputime_seconds;
ALTER TABLE usage RENAME COLUMN total_gpu_billing TO total_gputime_seconds;
ALTER TABLE usage RENAME COLUMN total_misc_billing TO total_misctime_seconds;
ALTER TABLE units RENAME COLUMN elapsed_raw TO total_walltime_seconds;
ALTER TABLE usage ADD COLUMN "total_walltime_seconds" integer;
ALTER TABLE units ADD COLUMN "total_cpumemtime_seconds" integer;
ALTER TABLE units ADD COLUMN "total_gpumemtime_seconds" integer;
ALTER TABLE usage ADD COLUMN "total_cpumemtime_seconds" integer;
ALTER TABLE usage ADD COLUMN "total_gpumemtime_seconds" integer;
12 changes: 12 additions & 0 deletions pkg/api/db/migrations/000006_alter_units_usage_tables.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
ALTER TABLE usage DROP COLUMN total_gpumemtim_seconds;
ALTER TABLE usage DROP COLUMN total_cpumemtim_seconds;
ALTER TABLE units DROP COLUMN total_gpumemtim_seconds;
ALTER TABLE units DROP COLUMN total_cpumemtim_seconds;
ALTER TABLE usage DROP COLUMN total_walltime_seconds;
ALTER TABLE units RENAME COLUMN total_walltime_seconds TO elapsed_raw;
ALTER TABLE usage RENAME COLUMN total_misctime_seconds TO total_misc_billing;
ALTER TABLE usage RENAME COLUMN total_gputime_seconds TO total_gpu_billing;
ALTER TABLE usage RENAME COLUMN total_cputime_seconds TO total_cpu_billing;
ALTER TABLE units RENAME COLUMN total_misctime_seconds TO total_misc_billing;
ALTER TABLE units RENAME COLUMN total_gputime_seconds TO total_gpu_billing;
ALTER TABLE units RENAME COLUMN total_cputime_seconds TO total_cpu_billing;
Loading

0 comments on commit 748522a

Please sign in to comment.