Skip to content

Commit

Permalink
Resolves #357 - Fix race conditions in the code (#358)
Browse files Browse the repository at this point in the history
  • Loading branch information
steve-r-west authored Jun 18, 2023
1 parent 38bf518 commit 4d95095
Show file tree
Hide file tree
Showing 11 changed files with 122 additions and 49 deletions.
4 changes: 3 additions & 1 deletion cmd/commercemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ var cmCommand = &cobra.Command{
Use: "commerce-manager",
Short: "Open commerce manager",
RunE: func(cmd *cobra.Command, args []string) error {
u, err := url.Parse(config.Envs.EPCC_API_BASE_URL)

env := config.GetEnv()
u, err := url.Parse(env.EPCC_API_BASE_URL)
if err != nil {
fmt.Println(err)
return err
Expand Down
2 changes: 1 addition & 1 deletion cmd/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ var configure = &cobra.Command{
log.Errorf("error writing to file %s, error: %v", configPath, err)
os.Exit(1)
}
config.Envs = &newProfile
config.SetEnv(&newProfile)
},
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func createInternal(ctx context.Context, overrides *httpclient.HttpParameterOver
if err != nil {
return "", fmt.Errorf("got error %s", err.Error())
} else if resp == nil {
return "", fmt.Errorf("got nil response")
return "", fmt.Errorf("got nil response with request: %s", resourceURL)
}

if resp.Body != nil {
Expand Down
17 changes: 11 additions & 6 deletions cmd/login.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ var loginInfo = &cobra.Command{

apiTokenResponse := authentication.GetApiToken()

if config.Envs.EPCC_BETA_API_FEATURES == "" {
env := config.GetEnv()

if env.EPCC_BETA_API_FEATURES == "" {
log.Infof("We have no configured API endpoint, will use default endpoint")
} else {
log.Infof("We are currently using API endpoint: %s", config.Envs.EPCC_API_BASE_URL)
log.Infof("We are currently using API endpoint: %s", env.EPCC_API_BASE_URL)
}

if apiTokenResponse != nil {
Expand Down Expand Up @@ -88,7 +90,7 @@ var loginInfo = &cobra.Command{
}

if authentication.IsAutoLoginEnabled() {
if config.Envs.EPCC_CLIENT_SECRET != "" {
if env.EPCC_CLIENT_SECRET != "" {
log.Infof("Auto login is enabled and we will (attempt to) login with client_credentials")
} else {
log.Infof("Auto login is enabled and we will (attempt to) login with implicit, as no client_secret is available")
Expand Down Expand Up @@ -171,10 +173,12 @@ var loginClientCredentials = &cobra.Command{
values := url.Values{}
values.Set("grant_type", "client_credentials")

env := config.GetEnv()

if len(args) == 0 {
log.Debug("Arguments have been passed, not using profile EPCC_CLIENT_ID and EPCC_CLIENT_SECRET")
values.Set("client_id", config.Envs.EPCC_CLIENT_ID)
values.Set("client_secret", config.Envs.EPCC_CLIENT_SECRET)
values.Set("client_id", env.EPCC_CLIENT_ID)
values.Set("client_secret", env.EPCC_CLIENT_SECRET)
}

if len(args)%2 != 0 {
Expand Down Expand Up @@ -240,9 +244,10 @@ var loginImplicit = &cobra.Command{
values := url.Values{}
values.Set("grant_type", "implicit")

env := config.GetEnv()
if len(args) == 0 {
log.Debug("Arguments have been passed, not using profile EPCC_CLIENT_ID")
values.Set("client_id", config.Envs.EPCC_CLIENT_ID)
values.Set("client_id", env.EPCC_CLIENT_ID)
}

if len(args)%2 != 0 {
Expand Down
28 changes: 20 additions & 8 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,15 @@ var jqCompletionFunc = func(cmd *cobra.Command, args []string, toComplete string
}, cobra.ShellCompDirectiveNoSpace
}

var profileNameFromCommandLine = "default"

func InitializeCmd() {
cobra.OnInitialize(initConfig)
initConfig()

if err := env.Parse(config.Envs); err != nil {
panic("Could not parse environment variables")
e := &config.Env{}
if err := env.Parse(e); err != nil {
log.Fatalf("Could not parse environment variables %v", err)
}

applyLogLevelEarlyDetectionHack()
Expand Down Expand Up @@ -92,7 +95,7 @@ func InitializeCmd() {

RootCmd.PersistentFlags().BoolVarP(&json.MonochromeOutput, "monochrome-output", "M", false, "By default, epcc will output using colors if the terminal supports this. Use this option to disable it.")
RootCmd.PersistentFlags().StringSliceVarP(&httpclient.RawHeaders, "header", "H", []string{}, "Extra headers and values to include in the request when sending HTTP to a server. You may specify any number of extra headers.")
RootCmd.PersistentFlags().StringVarP(&profiles.ProfileName, "profile", "P", profiles.ProfileName, "overrides the current EPCC_PROFILE var to run the command with the chosen profile.")
RootCmd.PersistentFlags().StringVarP(&profileNameFromCommandLine, "profile", "P", "", "overrides the current EPCC_PROFILE var to run the command with the chosen profile.")
RootCmd.PersistentFlags().Uint16VarP(&rateLimit, "rate-limit", "", 10, "Request limit per second")
RootCmd.PersistentFlags().BoolVarP(&httpclient.Retry5xx, "retry-5xx", "", false, "Whether we should retry requests with HTTP 5xx response code")
RootCmd.PersistentFlags().BoolVarP(&httpclient.Retry429, "retry-429", "", false, "Whether we should retry requests with HTTP 429 response code")
Expand Down Expand Up @@ -178,8 +181,9 @@ Environment Variables
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
log.SetLevel(logger.Loglevel)

if config.Envs.EPCC_RATE_LIMIT != 0 {
rateLimit = config.Envs.EPCC_RATE_LIMIT
env := config.GetEnv()
if env.EPCC_RATE_LIMIT != 0 {
rateLimit = env.EPCC_RATE_LIMIT
}
log.Debugf("Rate limit set to %d request per second ", rateLimit)
httpclient.Initialize(rateLimit, requestTimeout)
Expand Down Expand Up @@ -251,14 +255,22 @@ func Execute() {
}

func initConfig() {

envProfileName, ok := os.LookupEnv("EPCC_PROFILE")
if ok {
profiles.ProfileName = envProfileName
profiles.SetProfileName(envProfileName)
}
config.Envs = profiles.GetProfile(profiles.ProfileName)

if profileNameFromCommandLine != "" {
profiles.SetProfileName(profileNameFromCommandLine)
}

e := profiles.GetProfile(profiles.GetProfileName())

// Override profile configuration with environment variables
if err := env.Parse(config.Envs); err != nil {
if err := env.Parse(e); err != nil {
panic("Could not parse environment variables")
}

config.SetEnv(e)
}
17 changes: 14 additions & 3 deletions cmd/runbooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ func initRunbookRunCommands() *cobra.Command {

execTimeoutInSeconds := runbookRunCommand.PersistentFlags().Int64("execution-timeout", 900, "How long should the script take to execute before timing out")
maxConcurrency := runbookRunCommand.PersistentFlags().Int64("max-concurrency", 2048, "Maximum number of commands at once")
semaphore := semaphore.NewWeighted(*maxConcurrency)

for _, runbook := range runbooks.GetRunbooks() {
// Create a copy of runbook scoped to the loop
Expand Down Expand Up @@ -147,6 +146,8 @@ func initRunbookRunCommands() *cobra.Command {

ctx, cancelFunc := context.WithCancel(parentCtx)

concurrentRunSemaphore := semaphore.NewWeighted(*maxConcurrency)

for stepIdx, rawCmd := range runbookAction.RawCommands {

// Create a copy of loop variables
Expand All @@ -166,6 +167,7 @@ func initRunbookRunCommands() *cobra.Command {

for commandIdx, rawCmdLine := range rawCmdLines {

commandIdx := commandIdx
rawCmdLine := strings.Trim(rawCmdLine, " \n")

if rawCmdLine == "" {
Expand All @@ -187,9 +189,13 @@ func initRunbookRunCommands() *cobra.Command {

funcs = append(funcs, func() {

log.Tracef("(Step %d/%d Command %d/%d) Building Commmand", stepIdx+1, numSteps, commandIdx+1, len(funcs))

stepCmd := generateRunbookCmd()
stepCmd.SetArgs(rawCmdArguments[1:])
log.Tracef("(Step %d/%d Command %d/%d) Starting Command", stepIdx+1, numSteps, commandIdx+1, len(funcs))
err := stepCmd.ExecuteContext(ctx)
log.Tracef("(Step %d/%d Command %d/%d) Complete Command", stepIdx+1, numSteps, commandIdx+1, len(funcs))
commandResult := &commandResult{
stepIdx: stepIdx,
commandIdx: commandIdx,
Expand All @@ -210,18 +216,23 @@ func initRunbookRunCommands() *cobra.Command {
// Start processing all the functions
go func() {
for idx, fn := range funcs {
idx := idx
if shutdown.ShutdownFlag.Load() {
log.Infof("Aborting runbook execution, after %d scheduled executions", idx)
cancelFunc()
break
}

fn := fn
if err := semaphore.Acquire(ctx, 1); err == nil {
log.Tracef("Run %d is waiting on semaphore", idx)
if err := concurrentRunSemaphore.Acquire(ctx, 1); err == nil {
go func() {
defer semaphore.Release(1)
log.Tracef("Run %d is starting", idx)
defer concurrentRunSemaphore.Release(1)
fn()
}()
} else {
log.Warnf("Run %d failed to get semaphore %v", idx, err)
}
}
}()
Expand Down
19 changes: 18 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package config

import "sync/atomic"

type Env struct {
EPCC_API_BASE_URL string `env:"EPCC_API_BASE_URL"`
EPCC_CLIENT_ID string `env:"EPCC_CLIENT_ID"`
Expand All @@ -10,6 +12,21 @@ type Env struct {
EPCC_RUNBOOK_DIRECTORY string `env:"EPCC_RUNBOOK_DIRECTORY"`
}

var Envs = &Env{}
var env = atomic.Pointer[Env]{}

func init() {
SetEnv(&Env{})
}

func SetEnv(v *Env) {
// Store a copy
copyEnv := *v
env.Store(&copyEnv)
}

func GetEnv() *Env {
v := *env.Load()
return &v
}

const DefaultUrl = "https://api.moltin.com"
44 changes: 25 additions & 19 deletions external/authentication/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"net/url"
"strings"
"sync"
"sync/atomic"
"time"
)

Expand All @@ -28,7 +29,7 @@ var HttpClient = &http.Client{
Timeout: time.Second * 60,
}

var bearerToken *ApiTokenResponse = nil
var bearerToken atomic.Pointer[ApiTokenResponse]

var noTokenWarningMutex = sync.RWMutex{}

Expand All @@ -39,39 +40,42 @@ var getTokenMutex = sync.Mutex{}
func GetAuthenticationToken(useTokenFromProfileDir bool, valuesOverride *url.Values) (*ApiTokenResponse, error) {

if useTokenFromProfileDir {
bearerToken = GetApiToken()
bearerToken.Store(GetApiToken())
}

if bearerToken != nil {
if time.Now().Unix()+60 < bearerToken.Expires {
bearerTokenVal := bearerToken.Load()

if bearerTokenVal != nil {
if time.Now().Unix()+60 < bearerTokenVal.Expires {
// Use cached authentication (but clone first)
bearerCopy := *bearerToken
bearerCopy := *bearerTokenVal
return &bearerCopy, nil
}
}

getTokenMutex.Lock()
defer getTokenMutex.Unlock()

if bearerToken != nil {
if time.Now().Unix()+60 < bearerToken.Expires {
if bearerTokenVal != nil {
if time.Now().Unix()+60 < bearerTokenVal.Expires {
// Use cached authentication (but clone first)
bearerCopy := *bearerToken
bearerCopy := *bearerTokenVal
return &bearerCopy, nil
} else {
// TODO This will also happen a bunch of times in concurrent goroutines
log.Infof("Existing token has expired (or will very soon), refreshing. Token expiry is at %s", time.Unix(bearerToken.Expires, 0).Format(time.RFC1123Z))
log.Infof("Existing token has expired (or will very soon), refreshing. Token expiry is at %s", time.Unix(bearerTokenVal.Expires, 0).Format(time.RFC1123Z))
}
}

env := config.GetEnv()
requestValues := valuesOverride
if requestValues == nil {
if IsAutoLoginEnabled() {
values := url.Values{}
var grantType string

// Autologin using env vars
if config.Envs.EPCC_CLIENT_ID == "" {
if env.EPCC_CLIENT_ID == "" {
noTokenWarningMutex.RLock()
// Double check lock, read once with read lock, then once again with write lock
if noTokenWarningMessageLogged == false {
Expand All @@ -80,7 +84,7 @@ func GetAuthenticationToken(useTokenFromProfileDir bool, valuesOverride *url.Val
defer noTokenWarningMutex.Unlock()
if noTokenWarningMessageLogged == false {
noTokenWarningMessageLogged = true
if !config.Envs.EPCC_CLI_SUPPRESS_NO_AUTH_MESSAGES {
if !env.EPCC_CLI_SUPPRESS_NO_AUTH_MESSAGES {
log.Warn("No client id set in profile or env var, no authentication will be used for API request. To get started, set the EPCC_CLIENT_ID and (optionally) EPCC_CLIENT_SECRET environment variables")
}

Expand All @@ -92,11 +96,12 @@ func GetAuthenticationToken(useTokenFromProfileDir bool, valuesOverride *url.Val
return nil, nil
}

values.Set("client_id", config.Envs.EPCC_CLIENT_ID)
values.Set("client_id", env.EPCC_CLIENT_ID)
grantType = "implicit"

if config.Envs.EPCC_CLIENT_SECRET != "" {
values.Set("client_secret", config.Envs.EPCC_CLIENT_SECRET)
clientSecret := env.EPCC_CLIENT_SECRET
if clientSecret != "" {
values.Set("client_secret", clientSecret)
grantType = "client_credentials"
}

Expand All @@ -113,7 +118,7 @@ func GetAuthenticationToken(useTokenFromProfileDir bool, valuesOverride *url.Val
defer noTokenWarningMutex.Unlock()
if noTokenWarningMessageLogged == false {
noTokenWarningMessageLogged = true
if !config.Envs.EPCC_CLI_SUPPRESS_NO_AUTH_MESSAGES {
if !config.GetEnv().EPCC_CLI_SUPPRESS_NO_AUTH_MESSAGES {
log.Infof("Automatic login is disabled, re-enable by using `epcc login client_credentials`")
}
}
Expand All @@ -140,16 +145,17 @@ func GetAuthenticationToken(useTokenFromProfileDir bool, valuesOverride *url.Val
return nil, err
}

bearerToken = token
bearerToken.Store(token)

SaveApiToken(token)

SaveApiToken(bearerToken)
return bearerToken, nil
return token, nil
}

// fetchNewAuthenticationToken returns an AccessToken or an Error
func fetchNewAuthenticationToken(values url.Values) (*ApiTokenResponse, error) {

reqURL, err := url.Parse(config.Envs.EPCC_API_BASE_URL)
reqURL, err := url.Parse(config.GetEnv().EPCC_API_BASE_URL)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 4d95095

Please sign in to comment.