Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Context Deadline During Probe Calls #386

Merged
merged 5 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 45 additions & 15 deletions service/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ const (
sioReplicationPairExists = "A Replication Pair for the specified local volume already exists"

DriverConfigParamsYaml = "driver-config-params.yaml"

DefaultAPITimeout = 5 * time.Second
)

// Extra metadata field names for propagating to goscaleio and beyond.
Expand Down Expand Up @@ -2628,26 +2630,32 @@ func (s *service) systemProbeAll(ctx context.Context) error {
Log.Infof("probing zoneLabel '%s', zone value: '%s'", s.opts.zoneLabelKey, zoneName)
}

newCtx, cancel := createProbeContextWithDeadline(ctx)
defer cancel()

for _, array := range s.opts.arrays {
// If zone information is available, use it to probe the array
if usingZones && !array.isInZone(zoneName) {
// Driver node containers should not probe arrays that exist outside their assigned zone
// Driver controller container should probe all arrays
Log.Infof("array %s zone %s does not match %s, not pinging this array\n", array.SystemID, array.AvailabilityZone.Name, zoneName)
errMap[array.SystemID] = fmt.Errorf("array %s zone %s does not match %s, not pinging this array", array.SystemID, array.AvailabilityZone.Name, zoneName)
continue
}

err := s.systemProbe(ctx, array)
err := s.systemProbe(newCtx, array)
systemID := array.SystemID
if err == nil {
Log.Infof("array %s probed successfully", systemID)
allArrayFail = false
} else {
if err != nil {
errMap[systemID] = err
Log.Errorf("array %s probe failed: %v", array.SystemID, err)
} else {
allArrayFail = false
Log.Infof("array %s probed successfully", systemID)
}
}

Log.Printf("[SystemProbeAll] Number of failed probes: %d", len(errMap))

if allArrayFail {
return status.Error(codes.FailedPrecondition,
fmt.Sprintf("All arrays are not working. Could not proceed further: %v", errMap))
Expand Down Expand Up @@ -2685,21 +2693,23 @@ func (s *service) systemProbe(ctx context.Context, array *ArrayConnectionData) e
// Create ScaleIO API client if needed
if s.adminClients[systemID] == nil {
skipCertificateValidation := array.SkipCertificateValidation || array.Insecure
c, err := goscaleio.NewClientWithArgs(array.Endpoint, "", math.MaxInt64, skipCertificateValidation, !s.opts.DisableCerts)
client, err := goscaleio.NewClientWithArgs(array.Endpoint, "", math.MaxInt64, skipCertificateValidation, !s.opts.DisableCerts)
if err != nil {
return status.Errorf(codes.FailedPrecondition,
"unable to create ScaleIO client: %s", err.Error())
}
s.adminClients[systemID] = c

s.adminClients[systemID] = client
for _, name := range altSystemNames {
s.adminClients[name] = c
s.adminClients[name] = client
}
}

Log.Printf("Login to PowerFlex Gateway, system=%s, endpoint=%s, user=%s\n", systemID, array.Endpoint, array.Username)

if s.adminClients[systemID].GetToken() == "" {
_, err := s.adminClients[systemID].WithContext(ctx).Authenticate(&goscaleio.ConfigConnect{
client := s.adminClients[systemID]
if client.GetToken() == "" {
_, err := client.WithContext(ctx).Authenticate(&goscaleio.ConfigConnect{
Endpoint: array.Endpoint,
Username: array.Username,
Password: array.Password,
Expand All @@ -2712,24 +2722,24 @@ func (s *service) systemProbe(ctx context.Context, array *ArrayConnectionData) e

// initialize system if needed
if s.systems[systemID] == nil {
system, err := s.adminClients[systemID].WithContext(ctx).FindSystem(
array.SystemID, array.SystemID, "")
system, err := client.WithContext(ctx).FindSystem(array.SystemID, array.SystemID, "")
if err != nil {
return status.Errorf(codes.FailedPrecondition,
"unable to find matching PowerFlex system name: %s",
err.Error())
}

s.systems[systemID] = system
if system.System != nil && system.System.Name != "" {
Log.Printf("Found Name for system=%s with ID=%s", system.System.Name, system.System.ID)
s.connectedSystemNameToID[system.System.Name] = system.System.ID
s.systems[system.System.ID] = system
s.adminClients[system.System.ID] = s.adminClients[systemID]
s.adminClients[system.System.ID] = client
}
// associate alternate system name to systemID
for _, name := range altSystemNames {
s.systems[name] = system
s.adminClients[name] = s.adminClients[systemID]
s.adminClients[name] = client
s.connectedSystemNameToID[name] = system.System.ID
}
}
Expand All @@ -2740,7 +2750,8 @@ func (s *service) systemProbe(ctx context.Context, array *ArrayConnectionData) e
sysID = id
s.opts.arrays[sysID] = array
}
if array.IsDefault == true {

if array.IsDefault {
Log.Infof("default array is set to array ID: %s", sysID)
s.opts.defaultSystemID = sysID
Log.Printf("%s is the default array, skipping VolumePrefixToSystems map update. \n", sysID)
Expand All @@ -2750,6 +2761,7 @@ func (s *service) systemProbe(ctx context.Context, array *ArrayConnectionData) e
return err
}
}

return nil
}

Expand Down Expand Up @@ -3747,3 +3759,21 @@ func (s *service) verifySystem(systemID string) (*goscaleio.Client, error) {

return adminClient, nil
}

func createProbeContextWithDeadline(ctx context.Context) (context.Context, context.CancelFunc) {
defaultProbeDeadline := time.Now().Add(DefaultAPITimeout)
probeDeadline, ok := ctx.Deadline()
if !ok {
Log.Println("Probe deadline not in context, using default")
probeDeadline = time.Now().Add(DefaultAPITimeout)
}

// Set the deadline to be the lowest of the two times.
if probeDeadline.After(defaultProbeDeadline) {
Log.Printf("Original Probe Deadline %s is greater than defaultProbeDeadline %s, setting to default", probeDeadline, defaultProbeDeadline)
probeDeadline = defaultProbeDeadline
}

// Calculate the new deadline by subtracting the desired duration
return context.WithDeadline(ctx, probeDeadline)
}
11 changes: 10 additions & 1 deletion service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,8 +544,17 @@ func (s *service) BeforeServe(
s.updateConfigMap(s.getIPAddressByInterface, ConfigMapFilePath)

if _, ok := csictx.LookupEnv(ctx, "X_CSI_VXFLEXOS_NO_PROBE_ON_START"); !ok {
return s.doProbe(ctx)
Log.Printf("BeforeServe probing starting %s", time.Now().Format("15:04:05.000000000"))
// probe before the server starts, to avoid errors in the controller, we must return before 2 seconds.
beforeServeMaxTimeout := 1 * time.Second
newContext, cancel := context.WithDeadline(ctx, time.Now().Add(beforeServeMaxTimeout))
defer cancel()

err := s.doProbe(newContext)
Log.Printf("BeforeServe probing complete %s", time.Now().Format("15:04:05.000000000"))
return err
}

return nil
}

Expand Down
Loading