Skip to content

Commit

Permalink
FS-1249; Improving error reporting, removing CPU quota, removing "con…
Browse files Browse the repository at this point in the history
…currency" config and replacing it with command line args.
  • Loading branch information
jakeschuurmans committed Mar 1, 2024
1 parent 6a4c911 commit d10328d
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 17 deletions.
9 changes: 8 additions & 1 deletion cmd/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ import (
"golang.org/x/net/context"
)

var (
pageSize int
inFlightPages int
)

var cmdInventory = &cobra.Command{
Use: "inventory",
Short: "gather all servers and create invetory for them",
Expand All @@ -25,6 +30,8 @@ var cmdInventory = &cobra.Command{
}

func init() {
rootCmd.PersistentFlags().IntVar(&pageSize, "page-size", 4, "Define how many servers to query per request")
rootCmd.PersistentFlags().IntVar(&inFlightPages, "inflight-pages", 1, "Define how many server pages to queue up before waiting for the previous to finish creating the condition")
rootCmd.AddCommand(cmdInventory)
}

Expand Down Expand Up @@ -59,7 +66,7 @@ func inventory(ctx context.Context) error {
return err
}

err = newClient.CreateConditionInventoryForAllServers()
err = newClient.CreateConditionInventoryForAllServers(pageSize, inFlightPages)
if err != nil {
return err
}
Expand Down
9 changes: 0 additions & 9 deletions internal/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ const (
defaultFleetDBClientID = "fleetscheduler-serverservice-api"
defaultConditionOrcClientID = "fleetscheduler-condition-api"

defaultConcurrencyCount = 4

configEnvVariableName = "FLEET_SCHEDULER_CONFIG"
)

Expand All @@ -29,9 +27,6 @@ type Configuration struct {
// FacilityCode limits this fleet scheduler to events in a facility.
FacilityCode string `mapstructure:"facility_code"`

// Max threads allowed for communicating with other resources.
Concurrency int `mapstructure:"concurrency"`

// Defines the fleetdb (serverservice) client configuration parameters
FdbCfg *ConfigOIDC `mapstructure:"fleetdb_api"`
// Defines the condition orchestrator client configuration parameters
Expand Down Expand Up @@ -102,10 +97,6 @@ func validateClientParams(cfg *Configuration) error {
cfg.LogLevel = "debug"
}

if cfg.Concurrency <= 0 {
cfg.Concurrency = defaultConcurrencyCount
}

// FleetDB (serverservice) Configuration
if cfg.FdbCfg == nil {
return errors.Wrap(ErrInvalidConfig, "fleetdb_api entry doesnt exist")
Expand Down
10 changes: 7 additions & 3 deletions internal/client/fleetdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (c *Client) gatherServers(pageSize int, serverCh chan *fleetdbapi.Server, c
c.log.WithFields(logrus.Fields{
"pageSize": pageSize,
"pageIndex": 1,
}).Logger.Error("Failed to get list of servers")
}).Logger.Errorf("Failed to get list of servers: %s", err.Error())
return
}
totalPages := response.TotalPages
Expand All @@ -48,7 +48,7 @@ func (c *Client) gatherServers(pageSize int, serverCh chan *fleetdbapi.Server, c
c.log.WithFields(logrus.Fields{
"pageSize": pageSize,
"pageIndex": i,
}).Logger.Error("Failed to get page of servers")
}).Logger.Errorf("Failed to get page of servers, attempting to continue: %s", err.Error())

continue
}
Expand All @@ -61,11 +61,15 @@ func (c *Client) gatherServers(pageSize int, serverCh chan *fleetdbapi.Server, c

// throttle this loop
// Doing a spinlock to prevent a permanent lock if the ctx gets canceled
// TODO; Kill thread if context is canceled?
for !concLimiter.TryAcquire(int64(response.PageSize)) && c.ctx.Err() == nil {
time.Sleep(time.Second)
}

if c.ctx.Err() != nil {
c.log.Warn("Context canceled, stopping server gathering")
return
}

for i := range servers {
serverCh <- &servers[i]
}
Expand Down
8 changes: 4 additions & 4 deletions internal/client/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
fleetdbapi "github.com/metal-toolbox/fleetdb/pkg/api/v1"
)

func (c *Client) CreateConditionInventoryForAllServers() error {
func (c *Client) CreateConditionInventoryForAllServers(pageSize, inFlightPages int) error {
// Start thread to start collecting servers
serverCh, concLimiter, err := c.GatherServersNonBlocking(c.cfg.Concurrency)
serverCh, concLimiter, err := c.GatherServersNonBlocking(pageSize, inFlightPages)
if err != nil {
return err
}
Expand All @@ -30,9 +30,9 @@ func (c *Client) CreateConditionInventoryForAllServers() error {
return nil
}

func (c *Client) GatherServersNonBlocking(pageSize int) (chan *fleetdbapi.Server, *semaphore.Weighted, error) {
func (c *Client) GatherServersNonBlocking(pageSize, inFlightPages int) (chan *fleetdbapi.Server, *semaphore.Weighted, error) {
serverCh := make(chan *fleetdbapi.Server)
concLimiter := semaphore.NewWeighted(int64(pageSize * pageSize))
concLimiter := semaphore.NewWeighted(int64(inFlightPages * pageSize))

go func() {
c.gatherServers(pageSize, serverCh, concLimiter)
Expand Down

0 comments on commit d10328d

Please sign in to comment.