Skip to content

Commit

Permalink
Feat: Add worker version to worker list command (#634)
Browse files Browse the repository at this point in the history
- Add worker build version to worker list command
- Sort worker list by pool, status, machine id (new), and then id
- Use mapstructure over custom relection version of CopyStruct
(significantly faster)
- Fix throttle print statement

Resolve BE-1939
  • Loading branch information
nickpetrovic authored Oct 18, 2024
1 parent 8bb3cde commit 74c6027
Show file tree
Hide file tree
Showing 13 changed files with 231 additions and 199 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ require (
github.com/go-openapi/jsonpointer v0.20.2 // indirect
github.com/go-openapi/jsonreference v0.20.4 // indirect
github.com/go-openapi/swag v0.22.8 // indirect
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/godbus/dbus/v5 v5.1.1-0.20230522191255-76236955d466 // indirect
github.com/gofrs/flock v0.8.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/go-test/deep v1.0.8 h1:TDsG77qcSprGbC6vTN8OuXp5g+J+b5Pcguhf7Zt61VM=
github.com/go-test/deep v1.0.8/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE=
github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss=
github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.1.1-0.20230522191255-76236955d466 h1:sQspH8M4niEijh3PFscJRLDnkL547IeP7kpPe3uUhEg=
github.com/godbus/dbus/v5 v5.1.1-0.20230522191255-76236955d466/go.mod h1:ZiQxhyQ+bbbfxUKVvjfO498oPYvtYhZzycal3G/NHmU=
Expand Down
2 changes: 1 addition & 1 deletion pkg/abstractions/common/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (i *AutoscaledInstance) Monitor() error {
if err := i.HandleScalingEvent(desiredContainers); err != nil {
if _, ok := err.(*types.ThrottledByConcurrencyLimitError); ok {
if time.Now().After(ignoreScalingEventWindow) {
log.Printf("<%s> throttled by concurrency limit", i.Name)
log.Printf("<%s> throttled by concurrency limit\n", i.Name)
ignoreScalingEventWindow = time.Now().Add(IgnoreScalingEventInterval)
}
}
Expand Down
24 changes: 14 additions & 10 deletions pkg/common/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/beam-cloud/beta9/pkg/types"
"github.com/bsm/redislock"
"github.com/go-viper/mapstructure/v2"
"github.com/redis/go-redis/v9"
)

Expand Down Expand Up @@ -311,19 +312,22 @@ func (l *RedisLock) Release(key string) error {
return redislock.ErrLockNotHeld
}

// Attempts to copy field values of the same name from the src to the dst struct.
func CopyStruct(src, dst interface{}) {
srcVal := reflect.ValueOf(src).Elem()
dstVal := reflect.ValueOf(dst).Elem()
func CopyStruct(src, dst any) error {
config := mapstructure.DecoderConfig{
WeaklyTypedInput: true,
Result: dst,
}

for i := 0; i < srcVal.NumField(); i++ {
srcField := srcVal.Type().Field(i).Name
dstField := dstVal.FieldByName(srcField)
decoder, err := mapstructure.NewDecoder(&config)
if err != nil {
return err
}

if dstField.IsValid() && dstField.CanSet() {
dstField.Set(srcVal.Field(i))
}
if err := decoder.Decode(src); err != nil {
return err
}

return nil
}

// Copies the result of HGetAll to a provided struct.
Expand Down
1 change: 1 addition & 0 deletions pkg/gateway/gateway.proto
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ message Worker {
int64 free_memory = 11;
uint32 free_gpu_count = 12;
repeated Container active_containers = 13;
string build_version = 14;
}

message ListWorkersRequest {}
Expand Down
7 changes: 7 additions & 0 deletions pkg/gateway/services/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (gws *GatewayService) ListWorkers(ctx context.Context, in *pb.ListWorkersRe
FreeCpu: w.FreeCpu,
FreeMemory: w.FreeMemory,
FreeGpuCount: w.FreeGpuCount,
BuildVersion: w.BuildVersion,
}

containers, err := gws.containerRepo.GetActiveContainersByWorkerId(w.Id)
Expand Down Expand Up @@ -84,6 +85,12 @@ func sortWorkers(w []*types.Worker) {
if i.Status > j.Status {
return 1
}
if i.MachineId < j.MachineId {
return -1
}
if i.MachineId > j.MachineId {
return 1
}
if i.Id < j.Id {
return -1
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/repository/worker_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,18 +310,19 @@ func (r *WorkerRedisRepository) UpdateWorkerCapacity(worker *types.Worker, reque
key := common.RedisKeys.SchedulerWorkerState(worker.Id)

// Retrieve current worker capacity
w, err := r.getWorkerFromKey(key)
currentWorker, err := r.getWorkerFromKey(key)
if err != nil {
return fmt.Errorf("failed to get worker state <%v>: %v", key, err)
}

sourceWorker := currentWorker // worker from the Redis store
if sourceWorker == nil {
sourceWorker = worker // worker from the argument
}

updatedWorker := &types.Worker{}
if w != nil {
// Populate updated worker with values from database
common.CopyStruct(w, updatedWorker)
} else {
// Populate updated worker with values from function parameter
common.CopyStruct(worker, updatedWorker)
if err := common.CopyStruct(sourceWorker, updatedWorker); err != nil {
return fmt.Errorf("failed to copy worker struct: %v", err)
}

if updatedWorker.ResourceVersion != worker.ResourceVersion {
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/pool_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ func (wpc *LocalKubernetesWorkerPoolController) createWorkerJob(workerId string,
Gpu: workerGpuType,
Status: types.WorkerStatusPending,
Priority: wpc.workerPool.Priority,
BuildVersion: wpc.config.Worker.ImageTag,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/types/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Worker struct {
ResourceVersion int64 `json:"resource_version" redis:"resource_version"`
RequiresPoolSelector bool `json:"requires_pool_selector" redis:"requires_pool_selector"`
Priority int32 `json:"priority" redis:"priority"`
BuildVersion string `json:"build_version" redis:"build_version"`
}

type CapacityUpdateType int
Expand Down
Loading

0 comments on commit 74c6027

Please sign in to comment.