diff --git a/service/controller.go b/service/controller.go index 355951d2..8175d204 100644 --- a/service/controller.go +++ b/service/controller.go @@ -136,6 +136,8 @@ const ( sioReplicationPairExists = "A Replication Pair for the specified local volume already exists" DriverConfigParamsYaml = "driver-config-params.yaml" + + DefaultAPITimeout = 5 * time.Second ) // Extra metadata field names for propagating to goscaleio and beyond. @@ -155,8 +157,6 @@ const ( var interestingParameters = [...]string{0: "FsType", 1: KeyMkfsFormatOption, 2: KeyBandwidthLimitInKbps, 3: KeyIopsLimit} -// var lock = sync.RWMutex{} - type ZoneContent struct { systemID string protectionDomain ProtectionDomainName @@ -2616,8 +2616,6 @@ func (s *service) getZoneFromZoneLabelKey(ctx context.Context, zoneLabelKey stri func (s *service) systemProbeAll(ctx context.Context) error { // probe all arrays Log.Infoln("Probing all associated arrays") - // allArrayFail := true - // errMap := make(map[string]error) errMap := new(sync.Map) zoneName := "" usingZones := s.opts.zoneLabelKey != "" && s.isNodeMode() @@ -2634,18 +2632,9 @@ func (s *service) systemProbeAll(ctx context.Context) error { var wg sync.WaitGroup errChan := make(chan error, len(s.opts.arrays)) - deadline, ok := ctx.Deadline() - if !ok { - deadline = time.Now().Add(10 * time.Second) - } - - // Calculate the new deadline by subtracting the desired duration - newDeadline := deadline.Add(-(100 * time.Millisecond)) - newCtx, cancel := context.WithDeadline(ctx, newDeadline) + newCtx, cancel := createProbeContextWithDeadline(ctx) defer cancel() - Log.Printf("[SystemProbeAll - FERNANDO] context deadline: %v, new deadline: %v", deadline, newDeadline) - for _, array := range s.opts.arrays { // If zone information is available, use it to probe the array if usingZones && !array.isInZone(zoneName) { @@ -2661,18 +2650,14 @@ func (s *service) systemProbeAll(ctx context.Context) error { go func(array *ArrayConnectionData) { defer wg.Done() - Log.Infof("[SystemProbeAll - FERNANDO] probing array %s", array.SystemID) - err := s.systemProbe(newCtx, array) systemID := array.SystemID if err != nil { - // errMap[systemID] = err errMap.Store(systemID, err) Log.Errorf("array %s probe failed: %v", array.SystemID, err) errChan <- err } else { Log.Infof("array %s probed successfully", systemID) - // allArrayFail = false } }(array) @@ -2700,8 +2685,8 @@ func (s *service) systemProbeAll(ctx context.Context) error { // systemProbe will probe the given array func (s *service) systemProbe(ctx context.Context, array *ArrayConnectionData) error { - s.mutex.Lock() - defer s.mutex.Unlock() + s.probeMutex.Lock() + defer s.probeMutex.Unlock() // Check that we have the details needed to login to the Gateway if array.Endpoint == "" { @@ -2730,26 +2715,23 @@ func (s *service) systemProbe(ctx context.Context, array *ArrayConnectionData) e // Create ScaleIO API client if needed if s.adminClients[systemID] == nil { skipCertificateValidation := array.SkipCertificateValidation || array.Insecure - c, err := goscaleio.NewClientWithArgs(array.Endpoint, "", math.MaxInt64, skipCertificateValidation, !s.opts.DisableCerts) + client, err := goscaleio.NewClientWithArgs(array.Endpoint, "", math.MaxInt64, skipCertificateValidation, !s.opts.DisableCerts) if err != nil { return status.Errorf(codes.FailedPrecondition, "unable to create ScaleIO client: %s", err.Error()) } - s.adminClients[systemID] = c - // s.setMuxAdminClient(systemID, c) + + s.adminClients[systemID] = client for _, name := range altSystemNames { - s.adminClients[name] = c - // s.setMuxAdminClient(name, c) + s.adminClients[name] = client } } Log.Printf("Login to PowerFlex Gateway, system=%s, endpoint=%s, user=%s\n", systemID, array.Endpoint, array.Username) - // client := s.getMuxAdminClient(systemID) - // client := s.adminClients[systemID] - - if s.adminClients[systemID].GetToken() == "" { - _, err := s.adminClients[systemID].WithContext(ctx).Authenticate(&goscaleio.ConfigConnect{ + client := s.adminClients[systemID] + if client.GetToken() == "" { + _, err := client.WithContext(ctx).Authenticate(&goscaleio.ConfigConnect{ Endpoint: array.Endpoint, Username: array.Username, Password: array.Password, @@ -2762,34 +2744,25 @@ func (s *service) systemProbe(ctx context.Context, array *ArrayConnectionData) e // initialize system if needed if s.systems[systemID] == nil { - system, err := s.adminClients[systemID].WithContext(ctx).FindSystem( - array.SystemID, array.SystemID, "") + system, err := client.WithContext(ctx).FindSystem(array.SystemID, array.SystemID, "") if err != nil { return status.Errorf(codes.FailedPrecondition, "unable to find matching PowerFlex system name: %s", err.Error()) } + s.systems[systemID] = system - // s.setMuxSystem(systemID, system) if system.System != nil && system.System.Name != "" { Log.Printf("Found Name for system=%s with ID=%s", system.System.Name, system.System.ID) s.connectedSystemNameToID[system.System.Name] = system.System.ID s.systems[system.System.ID] = system - s.adminClients[system.System.ID] = s.adminClients[systemID] - - // s.setMuxConnectedSystemNameToID(system.System.Name, system.System.ID) - // s.setMuxSystem(system.System.ID, system) - // s.setMuxAdminClient(system.System.ID, client) + s.adminClients[system.System.ID] = client } // associate alternate system name to systemID for _, name := range altSystemNames { s.systems[name] = system - s.adminClients[name] = s.adminClients[systemID] + s.adminClients[name] = client s.connectedSystemNameToID[name] = system.System.ID - - // s.setMuxSystem(name, system) - // s.setMuxAdminClient(name, client) - // s.setMuxConnectedSystemNameToID(name, system.System.ID) } } @@ -3809,38 +3782,20 @@ func (s *service) verifySystem(systemID string) (*goscaleio.Client, error) { return adminClient, nil } -func (s *service) setMuxAdminClient(systemID string, client *goscaleio.Client) { - s.mutex.Lock() - defer s.mutex.Unlock() - s.adminClients[systemID] = client -} - -func (s *service) getMuxAdminClient(systemID string) *goscaleio.Client { - s.mutex.Lock() - defer s.mutex.Unlock() - return s.adminClients[systemID] -} - -func (s *service) setMuxSystem(name string, system *goscaleio.System) { - s.mutex.Lock() - defer s.mutex.Unlock() - s.systems[name] = system -} - -func (s *service) getMuxSystem(name string) *goscaleio.System { - s.mutex.Lock() - defer s.mutex.Unlock() - return s.systems[name] -} +func createProbeContextWithDeadline(ctx context.Context) (context.Context, context.CancelFunc) { + defaultProbeDeadline := time.Now().Add(DefaultAPITimeout) + probeDeadline, ok := ctx.Deadline() + if !ok { + Log.Println("Probe deadline not in context, using default") + probeDeadline = time.Now().Add(DefaultAPITimeout) + } -func (s *service) setMuxConnectedSystemNameToID(name, id string) { - s.mutex.Lock() - defer s.mutex.Unlock() - s.connectedSystemNameToID[name] = id -} + // Set the deadline to be the lowest of the two times. + if probeDeadline.After(defaultProbeDeadline) { + Log.Printf("Original Probe Deadline %s is greater than defaultProbeDeadline %s, setting to default", probeDeadline, defaultProbeDeadline) + probeDeadline = defaultProbeDeadline + } -func (s *service) getMuxConnectedSystemNameToID(name string) string { - s.mutex.Lock() - defer s.mutex.Unlock() - return s.connectedSystemNameToID[name] + // Calculate the new deadline by subtracting the desired duration + return context.WithDeadline(ctx, probeDeadline) } diff --git a/service/service.go b/service/service.go index 62ec8982..b10a7830 100644 --- a/service/service.go +++ b/service/service.go @@ -209,7 +209,7 @@ type service struct { volumePrefixToSystems map[string][]string connectedSystemNameToID map[string]string - mutex sync.Mutex + probeMutex sync.Mutex } type Config struct { @@ -545,18 +545,18 @@ func (s *service) BeforeServe( // Update the ConfigMap with the Interface IPs s.updateConfigMap(s.getIPAddressByInterface, ConfigMapFilePath) - Log.Printf("[BeforeServe] Context: %+v", ctx) - - newContext, cancel := context.WithDeadline(ctx, time.Now().Add(1*time.Second)) - defer cancel() - if _, ok := csictx.LookupEnv(ctx, "X_CSI_VXFLEXOS_NO_PROBE_ON_START"); !ok { + Log.Printf("BeforeServe probing starting %s", time.Now().Format("15:04:05.000000000")) + // probe before the server starts, to avoid errors in the controller, we must return before 2 seconds. + beforeServeMaxTimeout := 1 * time.Second + newContext, cancel := context.WithDeadline(ctx, time.Now().Add(beforeServeMaxTimeout)) + defer cancel() + err := s.doProbe(newContext) - Log.Printf("BeforeServe - doProbe: Completed at %s", time.Now().Format(time.RFC3339)) + Log.Printf("BeforeServe probing complete %s", time.Now().Format("15:04:05.000000000")) return err } - Log.Printf("BeforeServe: Completed at %s", time.Now().Format(time.RFC3339)) return nil }