From 5309a729652ce65baadcf45c7df64bd5c47f66ce Mon Sep 17 00:00:00 2001 From: Derek Su Date: Sun, 15 Dec 2024 21:07:49 +0800 Subject: [PATCH] feat(v2 upgrade/controller): handle resources transition Longhorn 9104 Signed-off-by: Derek Su --- controller/controller_manager.go | 2 +- controller/engine_controller.go | 455 ++++++++++++++- controller/instance_handler.go | 652 +++++++++++++++++++--- controller/instance_handler_test.go | 32 +- controller/instance_manager_controller.go | 11 +- controller/node_controller.go | 102 +++- controller/replica_controller.go | 56 +- controller/utils.go | 4 - controller/volume_controller.go | 292 ++++++++-- controller/volume_controller_test.go | 3 + 10 files changed, 1452 insertions(+), 157 deletions(-) diff --git a/controller/controller_manager.go b/controller/controller_manager.go index 64a65fc526..fdcda5dc92 100644 --- a/controller/controller_manager.go +++ b/controller/controller_manager.go @@ -41,7 +41,7 @@ func StartControllers(logger logrus.FieldLogger, clients *client.Clients, stopCh := clients.StopCh // Longhorn controllers - replicaController, err := NewReplicaController(logger, ds, scheme, kubeClient, namespace, controllerID) + replicaController, err := NewReplicaController(logger, ds, scheme, kubeClient, namespace, controllerID, proxyConnCounter) if err != nil { return nil, err } diff --git a/controller/engine_controller.go b/controller/engine_controller.go index 92da3d22d2..cf30ed8fdf 100644 --- a/controller/engine_controller.go +++ b/controller/engine_controller.go @@ -3,6 +3,7 @@ package controller import ( "context" "fmt" + "net" "path/filepath" "reflect" "regexp" @@ -280,12 +281,7 @@ func (ec *EngineController) syncEngine(key string) (err error) { return errors.Wrapf(err, "failed to get engine") } - defaultEngineImage, err := ec.ds.GetSettingValueExisted(types.SettingNameDefaultEngineImage) - if err != nil { - return err - } - - isResponsible, err := ec.isResponsibleFor(engine, defaultEngineImage) + isResponsible, err := ec.isResponsibleFor(engine) if err != nil { return err } @@ -438,24 +434,67 @@ func (ec *EngineController) enqueueInstanceManagerChange(obj interface{}) { } } -func (ec *EngineController) CreateInstance(obj interface{}) (*longhorn.InstanceProcess, error) { +func (ec *EngineController) findInstanceManagerAndIPs(obj interface{}) (im *longhorn.InstanceManager, initiatorIP string, targetIP string, err error) { + e, ok := obj.(*longhorn.Engine) + if !ok { + return nil, "", "", fmt.Errorf("invalid object for engine: %v", obj) + } + + initiatorIM, err := ec.ds.GetInstanceManagerByInstanceRO(obj, false) + if err != nil { + return nil, "", "", err + } + + initiatorIP = initiatorIM.Status.IP + targetIP = initiatorIM.Status.IP + im = initiatorIM + + // Target, or called Target Instance, is on another node + if e.Spec.TargetNodeIDForLiveUpgrade != "" { + targetIM, err := ec.ds.GetInstanceManagerByInstanceRO(obj, true) + if err != nil { + return nil, "", "", err + } + + targetIP = targetIM.Status.IP + + if !e.Status.TargetInstanceReplacementCreated && e.Status.CurrentTargetNodeIDForLiveUpgrade == "" { + im = targetIM + } + } + + return im, initiatorIP, targetIP, nil +} + +func (ec *EngineController) CreateInstance(obj interface{}, isInstanceOnRemoteNode bool) (*longhorn.InstanceProcess, error) { e, ok := obj.(*longhorn.Engine) if !ok { return nil, fmt.Errorf("invalid object for engine process creation: %v", obj) } + + log := getLoggerForEngine(ec.logger, e) + if e.Spec.VolumeName == "" || e.Spec.NodeID == "" { return nil, fmt.Errorf("missing parameters for engine instance creation: %v", e) } + frontend := e.Spec.Frontend if e.Spec.DisableFrontend { frontend = longhorn.VolumeFrontendEmpty } - im, err := ec.ds.GetInstanceManagerByInstanceRO(obj) + im, initiatorIP, targetIP, err := ec.findInstanceManagerAndIPs(obj) if err != nil { return nil, err } + log.WithFields(logrus.Fields{ + "instanceManager": im.Name, + "instanceManagerImage": im.Spec.Image, + "initiatorIP": initiatorIP, + "targetIP": targetIP, + }).Info("Creating engine instance") + c, err := engineapi.NewInstanceManagerClient(im, false) if err != nil { return nil, err @@ -482,6 +521,10 @@ func (ec *EngineController) CreateInstance(obj interface{}) (*longhorn.InstanceP return nil, err } + // No need to care about the initiator and target ports if the engine is not being upgraded. + initiatorAddress := net.JoinHostPort(initiatorIP, strconv.Itoa(0)) + targetAddress := net.JoinHostPort(targetIP, strconv.Itoa(e.Status.Port)) + return c.EngineInstanceCreate(&engineapi.EngineInstanceCreateRequest{ Engine: e, VolumeFrontend: frontend, @@ -490,9 +533,8 @@ func (ec *EngineController) CreateInstance(obj interface{}) (*longhorn.InstanceP DataLocality: v.Spec.DataLocality, ImIP: im.Status.IP, EngineCLIAPIVersion: cliAPIVersion, - UpgradeRequired: false, - InitiatorAddress: im.Status.IP, - TargetAddress: im.Status.IP, + InitiatorAddress: initiatorAddress, + TargetAddress: targetAddress, }) } @@ -511,7 +553,7 @@ func (ec *EngineController) DeleteInstance(obj interface{}) (err error) { log.Warn("Engine does not set instance manager name and node ID, will skip the actual instance deletion") return nil } - im, err = ec.ds.GetInstanceManagerByInstance(obj) + im, err = ec.ds.GetInstanceManagerByInstance(obj, false) if err != nil { log.WithError(err).Warn("Failed to detect instance manager for engine, will skip the actual instance deletion") return nil @@ -589,29 +631,259 @@ func (ec *EngineController) DeleteInstance(obj interface{}) (err error) { return err } + if e.Status.CurrentTargetNodeIDForLiveUpgrade != "" { + err = c.EngineInstanceDeleteTarget(&engineapi.EngineInstanceDeleteTargetRequest{ + Engine: e, + }) + if err != nil { + if !types.ErrorIsNotFound(err) { + return errors.Wrapf(err, "failed to delete target for engine %v", e.Name) + } + log.WithError(err).Warnf("Failed to delete target for engine %v", e.Name) + } + } + return nil } -func (ec *EngineController) GetInstance(obj interface{}) (*longhorn.InstanceProcess, error) { +func (ec *EngineController) SuspendInstance(obj interface{}) error { e, ok := obj.(*longhorn.Engine) if !ok { - return nil, fmt.Errorf("invalid object for engine instance get: %v", obj) + return fmt.Errorf("invalid object for engine instance suspension: %v", obj) + } + + if !types.IsDataEngineV2(e.Spec.DataEngine) { + return fmt.Errorf("suspending engine instance is not supported for data engine %v", e.Spec.DataEngine) + } + + if e.Spec.VolumeName == "" || e.Spec.NodeID == "" { + return fmt.Errorf("missing parameters for engine instance suspension: %+v", e) + } + + im, err := ec.ds.GetInstanceManagerByInstanceRO(obj, false) + if err != nil { + return err + } + c, err := engineapi.NewInstanceManagerClient(im, false) + if err != nil { + return err + } + defer c.Close() + + return c.EngineInstanceSuspend(&engineapi.EngineInstanceSuspendRequest{ + Engine: e, + }) +} + +func (ec *EngineController) ResumeInstance(obj interface{}) error { + e, ok := obj.(*longhorn.Engine) + if !ok { + return fmt.Errorf("invalid object for engine instance resumption: %v", obj) + } + + if !types.IsDataEngineV2(e.Spec.DataEngine) { + return fmt.Errorf("resuming engine instance is not supported for data engine %v", e.Spec.DataEngine) + } + + if e.Spec.VolumeName == "" || e.Spec.NodeID == "" { + return fmt.Errorf("missing parameters for engine instance resumption: %+v", e) + } + + im, err := ec.ds.GetInstanceManagerByInstanceRO(obj, false) + if err != nil { + return err + } + c, err := engineapi.NewInstanceManagerClient(im, false) + if err != nil { + return err + } + defer c.Close() + + return c.EngineInstanceResume(&engineapi.EngineInstanceResumeRequest{ + Engine: e, + }) +} + +func (ec *EngineController) SwitchOverTarget(obj interface{}) error { + e, ok := obj.(*longhorn.Engine) + if !ok { + return fmt.Errorf("invalid object for target switchover: %v", obj) + } + + if !types.IsDataEngineV2(e.Spec.DataEngine) { + return fmt.Errorf("target switchover is not supported for data engine %v", e.Spec.DataEngine) + } + + if e.Spec.VolumeName == "" || e.Spec.NodeID == "" { + return fmt.Errorf("missing parameters for target switchover: %+v", e) + } + + initiatorInstance, err := ec.GetInstance(obj, false) + if err != nil { + return errors.Wrapf(err, "failed to get initiator instance %v for switchover", e.Name) + } + + log := getLoggerForEngine(ec.logger, e) + + log.Infof("Preparing to switch over target to node %v. Initiator instance port details: port=%v, targetPort=%v, standbyTargetPort=%v", + e.Spec.TargetNodeIDForLiveUpgrade, initiatorInstance.Status.PortStart, initiatorInstance.Status.TargetPortStart, initiatorInstance.Status.StandbyTargetPortStart) + + targetInstance, err := ec.GetInstance(obj, true) + if err != nil { + return errors.Wrapf(err, "failed to get target instance %v for switchover", e.Name) } + log.Infof("Preparing to switch over target to node %v. Target instance port details: port=%v, targetPort=%v, standbyTargetPort=%v", + e.Spec.TargetNodeIDForLiveUpgrade, targetInstance.Status.PortStart, targetInstance.Status.TargetPortStart, targetInstance.Status.StandbyTargetPortStart) + targetIM, err := ec.getTargetInstanceManagerForSwitchOver(e) + if err != nil { + return err + } + + initiatorIM, err := ec.ds.GetInstanceManagerByInstanceRO(obj, false) + if err != nil { + return err + } + c, err := engineapi.NewInstanceManagerClient(initiatorIM, false) + if err != nil { + return err + } + defer c.Close() + + port := targetInstance.Status.TargetPortStart + if targetInstance.Status.StandbyTargetPortStart != 0 { + port = targetInstance.Status.StandbyTargetPortStart + } + + log.Infof("Switching over target to %v:%v", targetIM.Status.IP, port) + return c.EngineInstanceSwitchOverTarget(&engineapi.EngineInstanceSwitchOverTargetRequest{ + Engine: e, + TargetAddress: net.JoinHostPort(targetIM.Status.IP, fmt.Sprint(port)), + }) +} + +func (ec *EngineController) getTargetInstanceManagerForSwitchOver(e *longhorn.Engine) (targetIM *longhorn.InstanceManager, err error) { + if e.Spec.TargetNodeIDForLiveUpgrade != "" { + targetIM, err = ec.ds.GetRunningInstanceManagerByNodeRO(e.Spec.TargetNodeIDForLiveUpgrade, e.Spec.DataEngine) + if err != nil { + return nil, errors.Wrapf(err, "failed to get instance manager on node %v for switching over target", e.Spec.TargetNodeIDForLiveUpgrade) + } + + return targetIM, nil + } + + if e.Spec.NodeID == "" { + return nil, fmt.Errorf("nodeID is required for switching over target") + } + + targetIM, err = ec.ds.GetDefaultInstanceManagerByNodeRO(e.Spec.NodeID, e.Spec.DataEngine) + if err != nil { + if !datastore.ErrorIsNotFound(err) { + return nil, err + } + } + + return targetIM, nil +} + +func (ec *EngineController) DeleteTarget(obj interface{}) error { + e, ok := obj.(*longhorn.Engine) + if !ok { + return fmt.Errorf("invalid object for engine target deletion: %v", obj) + } + + if !types.IsDataEngineV2(e.Spec.DataEngine) { + return fmt.Errorf("deleting target for engine instance is not supported for data engine %v", e.Spec.DataEngine) + } + + ec.logger.WithField("engine", e.Name).Info("Deleting target instance") + + if e.Spec.VolumeName == "" || e.Spec.NodeID == "" { + return fmt.Errorf("missing parameters for engine target deletion: %+v", e) + } + + targetNodeID := e.Spec.NodeID + if e.Spec.NodeID == e.Spec.TargetNodeIDForLiveUpgrade { + targetNodeID = e.Status.CurrentTargetNodeIDForLiveUpgrade + } + if targetNodeID == "" { + return fmt.Errorf("missing target node ID for target deletion: %+v", e) + } + + im, err := ec.ds.GetRunningInstanceManagerByNodeRO(targetNodeID, e.Spec.DataEngine) + if err != nil { + return err + } + c, err := engineapi.NewInstanceManagerClient(im, false) + if err != nil { + return err + } + defer c.Close() + + return c.EngineInstanceDeleteTarget(&engineapi.EngineInstanceDeleteTargetRequest{ + Engine: e, + }) +} + +func (ec *EngineController) RequireRemoteTargetInstance(obj interface{}) (bool, error) { + e, ok := obj.(*longhorn.Engine) + if !ok { + return false, fmt.Errorf("invalid object for checking engine instance remote target requirement: %v", obj) + } + + return e.Spec.TargetNodeIDForLiveUpgrade != "" && e.Spec.TargetNodeIDForLiveUpgrade != e.Spec.NodeID, nil +} + +func (ec *EngineController) IsEngine(obj interface{}) bool { + _, ok := obj.(*longhorn.Engine) + return ok +} + +func (ec *EngineController) GetInstance(obj interface{}, isInstanceOnRemoteNode bool) (*longhorn.InstanceProcess, error) { + e, ok := obj.(*longhorn.Engine) + if !ok { + return nil, fmt.Errorf("invalid object for engine instance get: %v", obj) + } var ( im *longhorn.InstanceManager err error ) - if e.Status.InstanceManagerName == "" { - im, err = ec.ds.GetInstanceManagerByInstanceRO(obj) + + nodeID := e.Spec.NodeID + instanceManagerName := e.Status.InstanceManagerName + if isInstanceOnRemoteNode { + nodeID = e.Spec.TargetNodeIDForLiveUpgrade + im, err = ec.ds.GetRunningInstanceManagerByNodeRO(nodeID, e.Spec.DataEngine) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "failed to get running instance manager on target node %v for engine %v", nodeID, e.Name) + } + instanceManagerName = im.Name + } + + if instanceManagerName == "" { + if e.Spec.DesireState == longhorn.InstanceStateRunning && e.Status.CurrentState == longhorn.InstanceStateSuspended { + im, err = ec.ds.GetRunningInstanceManagerByNodeRO(nodeID, e.Spec.DataEngine) + } else { + im, err = ec.ds.GetInstanceManagerByInstanceRO(obj, false) } - } else { - im, err = ec.ds.GetInstanceManagerRO(e.Status.InstanceManagerName) if err != nil { return nil, err } + } else if im == nil { + im, err = ec.ds.GetInstanceManagerRO(instanceManagerName) + if err != nil { + if !apierrors.IsNotFound(err) { + return nil, err + } + if types.IsDataEngineV2(e.Spec.DataEngine) { + im, err = ec.ds.GetRunningInstanceManagerByNodeRO(nodeID, e.Spec.DataEngine) + if err != nil { + return nil, errors.Wrapf(err, "failed to get running instance manager for engine %v", e.Name) + } + } else { + return nil, err + } + } } c, err := engineapi.NewInstanceManagerClient(im, false) if err != nil { @@ -769,16 +1041,58 @@ func (m *EngineMonitor) sync() bool { return false } - if err := m.refresh(engine); err == nil || !apierrors.IsConflict(errors.Cause(err)) { - utilruntime.HandleError(errors.Wrapf(err, "failed to update status for engine %v", m.Name)) - break + err = m.refresh(engine) + if err != nil && apierrors.IsConflict(errors.Cause(err)) { + // Retry if the error is due to conflict + continue } - // Retry if the error is due to conflict + if types.IsDataEngineV2(engine.Spec.DataEngine) && err != nil && apierrors.IsNotFound(errors.Cause(err)) { + upgrading, upgradingCheckErr := m.ds.IsNodeDataEngineUpgradeRequested(engine.Spec.NodeID) + if upgrading { + updated, updatedCheckErr := m.isInstanceManagerUpdated(engine) + if updated { + for replicaName := range engine.Status.ReplicaModeMap { + replica, replicaErr := m.ds.GetReplicaRO(replicaName) + if replicaErr != nil { + m.logger.WithError(replicaErr).Errorf("Failed to get replica %v", replicaName) + continue + } + if replica.Spec.NodeID == engine.Spec.NodeID && + replica.Status.CurrentState != longhorn.InstanceStateError && + replica.Status.CurrentState != longhorn.InstanceStateStopped { + m.logger.Warnf("Replica %v in state %v is still on the node %v, will retry updating status later", + replicaName, replica.Status.CurrentState, replica.Spec.NodeID) + return false + } + } + } else { + m.logger.Warnf("v2 data engine %v is being upgraded, will retry updating status later", engine.Name) + return false + } + if updatedCheckErr != nil { + upgradingCheckErr = errors.Wrapf(updatedCheckErr, "failed to check if the instance manager is updated") + } + } + if upgradingCheckErr != nil { + err = errors.Wrapf(upgradingCheckErr, "failed to check if the engine %v is being upgraded", engine.Name) + } + } + utilruntime.HandleError(errors.Wrapf(err, "failed to update status for engine %v", m.Name)) + break } return false } +func (m *EngineMonitor) isInstanceManagerUpdated(engine *longhorn.Engine) (bool, error) { + defaultIM, err := m.ds.GetDefaultInstanceManagerByNodeRO(engine.Spec.NodeID, engine.Spec.DataEngine) + if err != nil { + return false, err + } + + return defaultIM.Name == engine.Status.InstanceManagerName, nil +} + func (m *EngineMonitor) refresh(engine *longhorn.Engine) error { existingEngine := engine.DeepCopy() @@ -1595,6 +1909,24 @@ func (ec *EngineController) ReconcileEngineState(e *longhorn.Engine) error { return err } + if types.IsDataEngineV2(e.Spec.DataEngine) { + for replicaName := range e.Status.CurrentReplicaAddressMap { + r, err := ec.ds.GetReplicaRO(replicaName) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + + requested, err := ec.ds.IsNodeDataEngineUpgradeRequested(r.Spec.NodeID) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + + if requested { + return nil + } + } + } + if err := ec.rebuildNewReplica(e); err != nil { return err } @@ -1634,6 +1966,28 @@ func GetBinaryClientForEngine(e *longhorn.Engine, engines engineapi.EngineClient return client, nil } +func (ec *EngineController) removeFailedReplica(e *longhorn.Engine) error { + replicas, err := ec.ds.ListVolumeReplicasRO(e.Spec.VolumeName) + if err != nil { + return err + } + + engineClientProxy, err := ec.getEngineClientProxy(e, e.Status.CurrentImage) + if err != nil { + return errors.Wrapf(err, "failed to get the engine client %v for removing failed replica from engine", e.Name) + } + defer engineClientProxy.Close() + + for _, r := range replicas { + if r.Spec.LastFailedAt != "" { + if err := engineClientProxy.ReplicaRemove(e, "", r.Name); err != nil && !apierrors.IsNotFound(err) { + return errors.Wrapf(err, "failed to remove failed replica %v from engine", r.Name) + } + } + } + return nil +} + func (ec *EngineController) removeUnknownReplica(e *longhorn.Engine) error { unknownReplicaMap := map[string]longhorn.ReplicaMode{} for replica, mode := range e.Status.ReplicaModeMap { @@ -1656,7 +2010,7 @@ func (ec *EngineController) removeUnknownReplica(e *longhorn.Engine) error { defer engineClientProxy.Close() ec.eventRecorder.Eventf(e, corev1.EventTypeNormal, constant.EventReasonDelete, "Removing unknown replica %v in mode %v from engine", url, unknownReplicaMap[url]) - if err := engineClientProxy.ReplicaRemove(e, url); err != nil { + if err := engineClientProxy.ReplicaRemove(e, url, ""); err != nil { ec.eventRecorder.Eventf(e, corev1.EventTypeWarning, constant.EventReasonFailedDeleting, "Failed to remove unknown replica %v in mode %v from engine: %v", url, unknownReplicaMap[url], err) } else { ec.eventRecorder.Eventf(e, corev1.EventTypeNormal, constant.EventReasonDelete, "Removed unknown replica %v in mode %v from engine", url, unknownReplicaMap[url]) @@ -1859,7 +2213,7 @@ func (ec *EngineController) startRebuilding(e *longhorn.Engine, replicaName, add // the replica to failed. // user can decide to delete it then we will try again log.Infof("Removing failed rebuilding replica %v", addr) - if err := engineClientProxy.ReplicaRemove(e, replicaURL); err != nil { + if err := engineClientProxy.ReplicaRemove(e, replicaURL, ""); err != nil { log.WithError(err).Warnf("Failed to remove rebuilding replica %v", addr) ec.eventRecorder.Eventf(e, corev1.EventTypeWarning, constant.EventReasonFailedDeleting, "Failed to remove rebuilding replica %v with address %v for engine %v and volume %v due to rebuilding failure: %v", @@ -2105,7 +2459,53 @@ func (ec *EngineController) Upgrade(e *longhorn.Engine, log *logrus.Entry) (err } } } else { - return errors.Wrapf(err, "upgrading engine %v with data engine %v is not supported", e.Name, e.Spec.DataEngine) + // Check if the initiator instance is running + im, err := ec.ds.GetRunningInstanceManagerByNodeRO(e.Spec.NodeID, longhorn.DataEngineTypeV2) + if err != nil { + return err + } + if im.Status.CurrentState != longhorn.InstanceManagerStateRunning { + return fmt.Errorf("instance manager %v for initiating instance %v is not running", im.Name, e.Name) + } + + initiatorIMClient, err := engineapi.NewInstanceManagerClient(im, false) + if err != nil { + return err + } + defer initiatorIMClient.Close() + + if _, err := initiatorIMClient.InstanceGet(e.Spec.DataEngine, e.Name, string(longhorn.InstanceManagerTypeEngine)); err != nil { + return err + } + + _, ok := im.Status.InstanceEngines[e.Name] + if !ok { + return fmt.Errorf("initiator instance %v is not found in engine list", e.Name) + } + + // Check whether the target instance is existing + im, err = ec.ds.GetRunningInstanceManagerByNodeRO(e.Spec.TargetNodeIDForLiveUpgrade, longhorn.DataEngineTypeV2) + if err != nil { + return err + } + if im.Status.CurrentState != longhorn.InstanceManagerStateRunning { + return fmt.Errorf("instance manager %v for target instance %v is not running", im.Name, e.Name) + } + + targetIMClient, err := engineapi.NewInstanceManagerClient(im, false) + if err != nil { + return err + } + defer targetIMClient.Close() + + if _, err := targetIMClient.InstanceGet(e.Spec.DataEngine, e.Name, string(longhorn.InstanceManagerTypeEngine)); err != nil { + return err + } + + _, ok = im.Status.InstanceEngines[e.Name] + if !ok { + return fmt.Errorf("target instance %v is not found in engine list", e.Name) + } } log.Infof("Engine has been upgraded from %v to %v", e.Status.CurrentImage, e.Spec.Image) @@ -2116,6 +2516,7 @@ func (ec *EngineController) Upgrade(e *longhorn.Engine, log *logrus.Entry) (err e.Status.ReplicaTransitionTimeMap = nil e.Status.RestoreStatus = nil e.Status.RebuildStatus = nil + e.Status.TargetInstanceReplacementCreated = false return nil } @@ -2184,7 +2585,7 @@ func (ec *EngineController) UpgradeEngineInstance(e *longhorn.Engine, log *logru // isResponsibleFor picks a running node that has e.Status.CurrentImage deployed. // We need e.Status.CurrentImage deployed on the node to make request to the corresponding engine instance. // Prefer picking the node e.Spec.NodeID if it meet the above requirement. -func (ec *EngineController) isResponsibleFor(e *longhorn.Engine, defaultEngineImage string) (bool, error) { +func (ec *EngineController) isResponsibleFor(e *longhorn.Engine) (bool, error) { var err error defer func() { err = errors.Wrap(err, "error while checking isResponsibleFor") diff --git a/controller/instance_handler.go b/controller/instance_handler.go index a943d28d03..134c459680 100644 --- a/controller/instance_handler.go +++ b/controller/instance_handler.go @@ -35,10 +35,16 @@ type InstanceHandler struct { } type InstanceManagerHandler interface { - GetInstance(obj interface{}) (*longhorn.InstanceProcess, error) - CreateInstance(obj interface{}) (*longhorn.InstanceProcess, error) + GetInstance(obj interface{}, isInstanceOnRemoteNode bool) (*longhorn.InstanceProcess, error) + CreateInstance(obj interface{}, isInstanceOnRemoteNode bool) (*longhorn.InstanceProcess, error) DeleteInstance(obj interface{}) error LogInstance(ctx context.Context, obj interface{}) (*engineapi.InstanceManagerClient, *imapi.LogStream, error) + SuspendInstance(obj interface{}) error + ResumeInstance(obj interface{}) error + SwitchOverTarget(obj interface{}) error + DeleteTarget(obj interface{}) error + RequireRemoteTargetInstance(obj interface{}) (bool, error) + IsEngine(obj interface{}) bool } func NewInstanceHandler(ds *datastore.DataStore, instanceManagerHandler InstanceManagerHandler, eventRecorder record.EventRecorder) *InstanceHandler { @@ -49,10 +55,130 @@ func NewInstanceHandler(ds *datastore.DataStore, instanceManagerHandler Instance } } -func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceManager, instanceName string, spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus, instances map[string]longhorn.InstanceProcess) { +func (h *InstanceHandler) syncStatusIPsAndPorts(im *longhorn.InstanceManager, obj interface{}, spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus, instanceName string, instance longhorn.InstanceProcess) { + imPod, err := h.ds.GetPodRO(im.Namespace, im.Name) + if err != nil { + logrus.WithError(err).Errorf("Failed to get instance manager pod from %v", im.Name) + return + } + if imPod == nil { + logrus.Warnf("Instance manager pod from %v not exist in datastore", im.Name) + return + } + + storageIP := h.ds.GetStorageIPFromPod(imPod) + if status.StorageIP != storageIP { + status.StorageIP = storageIP + logrus.Infof("Instance %v starts running, Storage IP %v", instanceName, status.StorageIP) + } + + if status.IP != im.Status.IP { + status.IP = im.Status.IP + logrus.Infof("Instance %v starts running, IP %v", instanceName, status.IP) + } + + if status.Port != int(instance.Status.PortStart) { + status.Port = int(instance.Status.PortStart) + logrus.Infof("Instance %v starts running, Port %d", instanceName, status.Port) + } + + if !h.instanceManagerHandler.IsEngine(obj) { + return + } + + if types.IsDataEngineV2(spec.DataEngine) && spec.TargetNodeIDForLiveUpgrade != "" { + targetIM, err := h.ds.GetRunningInstanceManagerByNodeRO(spec.TargetNodeIDForLiveUpgrade, spec.DataEngine) + if err != nil { + logrus.WithError(err).Errorf("Failed to get running instance manager for node %s", spec.TargetNodeIDForLiveUpgrade) + return + } + targetClient, err := engineapi.NewInstanceManagerClient(targetIM, false) + if err != nil { + logrus.WithError(err).Errorf("Failed to get instance manager client for target instance manager %v", targetIM.Name) + return + } + defer targetClient.Close() + + targetInstance, err := targetClient.InstanceGet(spec.DataEngine, instanceName, string(longhorn.InstanceManagerTypeEngine)) + if err != nil { + logrus.WithError(err).Errorf("Failed to get target instance %s from instance manager %s", instanceName, targetIM.Name) + } else { + if status.TargetIP != targetIM.Status.IP { + status.TargetIP = targetIM.Status.IP + logrus.Infof("Instance %v starts running, Target IP %v", instanceName, status.TargetIP) + } + + if status.TargetPort != int(targetInstance.Status.TargetPortStart) { + status.TargetPort = int(targetInstance.Status.TargetPortStart) + logrus.Infof("Instance %v starts running, Target Port %v", instanceName, status.TargetPort) + } + + // Get storage target IP from target instance manager + targetIMPod, err := h.ds.GetPodRO(targetIM.Namespace, targetIM.Name) + if err != nil { + logrus.WithError(err).Errorf("Failed to get instance manager pod from %v", targetIM.Name) + return + } + if targetIMPod == nil { + logrus.Warnf("Instance manager pod from %v not exist in datastore", targetIM.Name) + return + } + + storageTargetIP := h.ds.GetStorageIPFromPod(targetIMPod) + if status.StorageTargetIP != storageTargetIP { + status.StorageTargetIP = storageTargetIP + logrus.Infof("Instance %v starts running, Storage Target IP %v", instanceName, status.StorageTargetIP) + } + } + + // Check if the target instance replacement is running on the target node + if status.CurrentTargetNodeIDForLiveUpgrade != spec.TargetNodeIDForLiveUpgrade && !status.TargetInstanceReplacementCreated { + running, err := h.isTargetInstanceReplacementRunning(instanceName, spec, status) + if err != nil { + logrus.WithError(err).Errorf("Failed to check if target instance %v is running", instanceName) + return + } + if running { + logrus.Infof("Target instance replacement %v is running on target node %v", instanceName, spec.TargetNodeIDForLiveUpgrade) + status.TargetInstanceReplacementCreated = true + } + } + } else { + if status.StorageTargetIP != storageIP { + status.StorageTargetIP = storageIP + logrus.Infof("Instance %v starts running, Storage Target IP %v", instanceName, status.StorageTargetIP) + } + + if status.TargetIP != im.Status.IP { + status.TargetIP = im.Status.IP + logrus.Infof("Instance %v starts running, Target IP %v", instanceName, status.TargetIP) + } + + if status.TargetPort != int(instance.Status.TargetPortStart) { + status.TargetPort = int(instance.Status.TargetPortStart) + logrus.Infof("Instance %v starts running, Target Port %v", instanceName, status.TargetPort) + } + + status.CurrentTargetNodeIDForLiveUpgrade = "" + status.TargetInstanceReplacementCreated = false + } +} + +func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceManager, obj interface{}, spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus, instances map[string]longhorn.InstanceProcess) { + runtimeObj, ok := obj.(runtime.Object) + if !ok { + return + } + instanceName, err := h.getNameFromObj(runtimeObj) + if err != nil { + return + } + defer func() { if status.CurrentState == longhorn.InstanceStateStopped { status.InstanceManagerName = "" + status.CurrentTargetNodeIDForLiveUpgrade = "" + status.TargetInstanceReplacementCreated = false } }() @@ -72,8 +198,11 @@ func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceMan status.CurrentImage = "" } status.IP = "" + status.TargetIP = "" status.StorageIP = "" + status.StorageTargetIP = "" status.Port = 0 + status.TargetPort = 0 h.resetInstanceErrorCondition(status) return } @@ -82,32 +211,71 @@ func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceMan im.Status.CurrentState == longhorn.InstanceManagerStateError || im.DeletionTimestamp != nil { if status.Started { - if status.CurrentState != longhorn.InstanceStateError { - logrus.Warnf("Marking the instance as state ERROR since failed to find the instance manager for the running instance %v", instanceName) + if h.isEngineOfV2DataEngine(obj, spec.DataEngine) { + if h.isV2DataEngineBeingUpgraded(spec, status) { + logrus.Warnf("Skipping the instance %v since the instance manager %v is %v", instanceName, im.Name, im.Status.CurrentState) + return + } + + if spec.Image == status.CurrentImage { + if status.CurrentState != longhorn.InstanceStateError { + upgradeRequested, err := h.ds.IsNodeDataEngineUpgradeRequested(spec.NodeID) + if err != nil { + // TODO: should we return here or mark the instance as error? + logrus.WithError(err).Errorf("Failed to check if node %v is being upgrade requested", spec.NodeID) + return + } + if upgradeRequested { + logrus.Warnf("Skipping the instance %v since the instance manager %v is %v since the node %v is being upgrade requested", + instanceName, im.Name, im.Status.CurrentState, spec.NodeID) + return + } + logrus.Warnf("Marking the instance as state ERROR since failed to find the instance manager for the running instance %v", instanceName) + } + status.CurrentState = longhorn.InstanceStateError + } else { + logrus.Warnf("Skipping the instance %v since the instance manager %v is %v and spec image %v is different from the current image %v", + instanceName, im.Name, im.Status.CurrentState, spec.Image, status.CurrentImage) + return + } + } else { + if status.CurrentState != longhorn.InstanceStateError { + logrus.Warnf("Marking the instance as state ERROR since failed to find the instance manager for the running instance %v", instanceName) + } + status.CurrentState = longhorn.InstanceStateError } - status.CurrentState = longhorn.InstanceStateError } else { status.CurrentState = longhorn.InstanceStateStopped } status.CurrentImage = "" status.IP = "" + status.TargetIP = "" status.StorageIP = "" + status.StorageTargetIP = "" status.Port = 0 + status.TargetPort = 0 h.resetInstanceErrorCondition(status) return } if im.Status.CurrentState == longhorn.InstanceManagerStateStarting { if status.Started { - if status.CurrentState != longhorn.InstanceStateError { - logrus.Warnf("Marking the instance as state ERROR since the starting instance manager %v shouldn't contain the running instance %v", im.Name, instanceName) + if h.isEngineOfV2DataEngine(obj, spec.DataEngine) { + if spec.Image == status.CurrentImage { + upgradeRequested, err := h.ds.IsNodeDataEngineUpgradeRequested(spec.NodeID) + if err != nil { + logrus.WithError(err).Errorf("Failed to get node %v", spec.NodeID) + return + } + if upgradeRequested { + logrus.Warnf("Skipping the instance %v since the instance manager %v is %v", instanceName, im.Name, im.Status.CurrentState) + return + } + h.handleIfInstanceManagerStarting(im.Name, instanceName, status) + } + } else { + h.handleIfInstanceManagerStarting(im.Name, instanceName, status) } - status.CurrentState = longhorn.InstanceStateError - status.CurrentImage = "" - status.IP = "" - status.StorageIP = "" - status.Port = 0 - h.resetInstanceErrorCondition(status) } return } @@ -115,23 +283,56 @@ func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceMan instance, exists := instances[instanceName] if !exists { if status.Started { - if status.CurrentState != longhorn.InstanceStateError { - logrus.Warnf("Marking the instance as state ERROR since failed to find the instance status in instance manager %v for the running instance %v", im.Name, instanceName) + if h.isEngineOfV2DataEngine(obj, spec.DataEngine) { + if h.isV2DataEngineBeingUpgraded(spec, status) { + logrus.Warnf("Skipping checking the instance %v since the instance manager %v is in %v state", + instanceName, im.Name, im.Status.CurrentState) + return + } + if status.CurrentState != longhorn.InstanceStateError { + // Because the async nature, directly check instance here + if spec.TargetNodeIDForLiveUpgrade != "" { + logrus.Infof("Recreated initiator instance %v is not found in instance manager %v, directly checking it in instance manager %v", + instanceName, im.Name, spec.NodeID) + + exist, err := h.isInstanceExist(instanceName, spec) + if exist { + logrus.Infof("Initiator instance %v is found in instance manager %v", instanceName, spec.NodeID) + return + } + if err != nil { + logrus.WithError(err).Errorf("Failed to check if recreated initiator instance %v exists in instance manager %v", + instanceName, spec.NodeID) + return + } + } + logrus.Warnf("Marking the instance as state ERROR since failed to find the instance status in instance manager %v for the running instance %v", + im.Name, instanceName) + } + status.CurrentState = longhorn.InstanceStateError + } else { + if status.CurrentState != longhorn.InstanceStateError { + logrus.Warnf("Marking the instance as state ERROR since failed to find the instance status in instance manager %v for the running instance %v", im.Name, instanceName) + } + status.CurrentState = longhorn.InstanceStateError } - status.CurrentState = longhorn.InstanceStateError } else { status.CurrentState = longhorn.InstanceStateStopped } + status.CurrentImage = "" status.IP = "" + status.TargetIP = "" status.StorageIP = "" + status.StorageTargetIP = "" status.Port = 0 + status.TargetPort = 0 h.resetInstanceErrorCondition(status) return } if status.InstanceManagerName != "" && status.InstanceManagerName != im.Name { - logrus.Errorf("The related process of instance %v is found in the instance manager %v, but the instance manager name in the instance status is %v. "+ + logrus.Warnf("The related process of instance %v is found in the instance manager %v, but the instance manager name in the instance status is %v. "+ "The instance manager name shouldn't change except for cleanup", instanceName, im.Name, status.InstanceManagerName) } @@ -146,44 +347,35 @@ func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceMan status.CurrentState = longhorn.InstanceStateStarting status.CurrentImage = "" status.IP = "" + status.TargetIP = "" status.StorageIP = "" + status.StorageTargetIP = "" status.Port = 0 + status.TargetPort = 0 h.resetInstanceErrorCondition(status) case longhorn.InstanceStateRunning: status.CurrentState = longhorn.InstanceStateRunning - imPod, err := h.ds.GetPodRO(im.Namespace, im.Name) - if err != nil { - logrus.WithError(err).Errorf("Failed to get instance manager pod from %v", im.Name) - return - } - - if imPod == nil { - logrus.Warnf("Instance manager pod from %v not exist in datastore", im.Name) - return - } - - storageIP := h.ds.GetStorageIPFromPod(imPod) - if status.StorageIP != storageIP { - status.StorageIP = storageIP - logrus.Warnf("Instance %v starts running, Storage IP %v", instanceName, status.StorageIP) - } + h.syncStatusIPsAndPorts(im, obj, spec, status, instanceName, instance) - if status.IP != im.Status.IP { - status.IP = im.Status.IP - logrus.Warnf("Instance %v starts running, IP %v", instanceName, status.IP) - } - if status.Port != int(instance.Status.PortStart) { - status.Port = int(instance.Status.PortStart) - logrus.Warnf("Instance %v starts running, Port %d", instanceName, status.Port) - } // only set CurrentImage when first started, since later we may specify // different spec.Image for upgrade if status.CurrentImage == "" { - status.CurrentImage = spec.Image + if types.IsDataEngineV1(spec.DataEngine) { + status.CurrentImage = spec.Image + } else { + if h.instanceManagerHandler.IsEngine(obj) { + status.CurrentImage = spec.Image + } else { + status.CurrentImage = im.Spec.Image + } + } } - h.syncInstanceCondition(instance, status) + h.syncInstanceCondition(instance, status) + case longhorn.InstanceStateSuspended: + status.CurrentState = longhorn.InstanceStateSuspended + status.CurrentTargetNodeIDForLiveUpgrade = spec.TargetNodeIDForLiveUpgrade case longhorn.InstanceStateStopping: if status.Started { status.CurrentState = longhorn.InstanceStateError @@ -192,8 +384,11 @@ func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceMan } status.CurrentImage = "" status.IP = "" + status.TargetIP = "" status.StorageIP = "" + status.StorageTargetIP = "" status.Port = 0 + status.TargetPort = 0 h.resetInstanceErrorCondition(status) case longhorn.InstanceStateStopped: if status.Started { @@ -203,8 +398,11 @@ func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceMan } status.CurrentImage = "" status.IP = "" + status.TargetIP = "" status.StorageIP = "" + status.StorageTargetIP = "" status.Port = 0 + status.TargetPort = 0 h.resetInstanceErrorCondition(status) default: if status.CurrentState != longhorn.InstanceStateError { @@ -213,8 +411,11 @@ func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceMan status.CurrentState = longhorn.InstanceStateError status.CurrentImage = "" status.IP = "" + status.TargetIP = "" status.StorageIP = "" + status.StorageTargetIP = "" status.Port = 0 + status.TargetPort = 0 h.resetInstanceErrorCondition(status) } } @@ -266,7 +467,7 @@ func (h *InstanceHandler) ReconcileInstanceState(obj interface{}, spec *longhorn } } // There should be an available instance manager for a scheduled instance when its related engine image is compatible - if im == nil && spec.Image != "" && spec.NodeID != "" { + if im == nil && spec.NodeID != "" && h.isSpecImageReady(obj, spec) { dataEngineEnabled, err := h.ds.IsDataEngineEnabled(spec.DataEngine) if err != nil { return err @@ -280,7 +481,7 @@ func (h *InstanceHandler) ReconcileInstanceState(obj interface{}, spec *longhorn return err } if !isNodeDownOrDeleted { - im, err = h.ds.GetInstanceManagerByInstanceRO(obj) + im, err = h.getInstanceManagerRO(obj, spec, status) if err != nil { return errors.Wrapf(err, "failed to get instance manager for instance %v", instanceName) } @@ -328,16 +529,45 @@ func (h *InstanceHandler) ReconcileInstanceState(obj interface{}, spec *longhorn if i, exists := instances[instanceName]; exists && i.Status.State == longhorn.InstanceStateRunning { status.Started = true - break + if h.isEngineOfV2DataEngine(obj, spec.DataEngine) { + if isDataEngineNotBeingLiveUpgraded(spec, status) { + // Not in data engine live upgrade + break + } + } else { + break + } } // there is a delay between createInstance() invocation and InstanceManager update, // createInstance() may be called multiple times. - if status.CurrentState != longhorn.InstanceStateStopped { - break + if h.isEngineOfV2DataEngine(obj, spec.DataEngine) { + if status.CurrentState != longhorn.InstanceStateStopped && status.CurrentState != longhorn.InstanceStateSuspended { + if status.CurrentState != longhorn.InstanceStateRunning || isDataEngineNotBeingLiveUpgraded(spec, status) { + break + } + } + } else { + if status.CurrentState != longhorn.InstanceStateStopped { + break + } } - err = h.createInstance(instanceName, spec.DataEngine, runtimeObj) + if h.isEngineOfV2DataEngine(obj, spec.DataEngine) { + if spec.TargetNodeIDForLiveUpgrade != "" { + if h.isEngineResumeRequired(spec, status) { + // Resume the suspended initiator instance + err = h.resumeInstance(instanceName, spec.DataEngine, runtimeObj) + } else { + // Create target instance if it's not created yet + err = h.createInstance(instanceName, spec.DataEngine, runtimeObj) + } + } else { + err = h.createInstance(instanceName, spec.DataEngine, runtimeObj) + } + } else { + err = h.createInstance(instanceName, spec.DataEngine, runtimeObj) + } if err != nil { return err } @@ -359,12 +589,52 @@ func (h *InstanceHandler) ReconcileInstanceState(obj interface{}, spec *longhorn } } } + if h.isEngineOfV2DataEngine(obj, spec.DataEngine) { + if status.CurrentTargetNodeIDForLiveUpgrade != "" { + targetIM, err := h.ds.GetRunningInstanceManagerByNodeRO(status.CurrentTargetNodeIDForLiveUpgrade, spec.DataEngine) + if err != nil { + return err + } + if targetIM != nil { + if err := h.deleteTarget(instanceName, runtimeObj); err != nil { + if !types.ErrorIsNotFound(err) { + return err + } + } + } + } + } status.Started = false + case longhorn.InstanceStateSuspended: + if !h.isEngineOfV2DataEngine(obj, spec.DataEngine) { + return fmt.Errorf("instance %v is not an engine of v2 volume", instanceName) + } + + if err := h.suspendInstance(instanceName, runtimeObj); err != nil { + return err + } + + if err := h.switchOverTarget(instanceName, runtimeObj); err != nil { + logrus.Infof("Resuming instance %v after failing to switch over target", instanceName) + errResume := h.resumeInstance(instanceName, spec.DataEngine, runtimeObj) + if errResume != nil { + logrus.WithError(errResume).Errorf("Failed to resume instance %v after failing to switch over target", instanceName) + } + return err + } + + if spec.TargetNodeIDForLiveUpgrade != status.CurrentTargetNodeIDForLiveUpgrade { + if err := h.deleteTarget(instanceName, runtimeObj); err != nil { + if !types.ErrorIsNotFound(err) { + return err + } + } + } default: return fmt.Errorf("unknown instance desire state: desire %v", spec.DesireState) } - h.syncStatusWithInstanceManager(im, instanceName, spec, status, instances) + h.syncStatusWithInstanceManager(im, obj, spec, status, instances) switch status.CurrentState { case longhorn.InstanceStateRunning: @@ -373,7 +643,9 @@ func (h *InstanceHandler) ReconcileInstanceState(obj interface{}, spec *longhorn if spec.NodeID != im.Spec.NodeID { status.CurrentState = longhorn.InstanceStateError status.IP = "" + status.TargetIP = "" status.StorageIP = "" + status.StorageTargetIP = "" err := fmt.Errorf("instance %v NodeID %v is not the same as the instance manager %v NodeID %v", instanceName, spec.NodeID, im.Name, im.Spec.NodeID) return err @@ -447,25 +719,83 @@ func (h *InstanceHandler) printInstanceLogs(instanceName string, obj runtime.Obj return nil } -func (h *InstanceHandler) createInstance(instanceName string, dataEngine longhorn.DataEngineType, obj runtime.Object) error { - _, err := h.instanceManagerHandler.GetInstance(obj) - if err == nil { - return nil - } - if !types.ErrorIsNotFound(err) && !(types.IsDataEngineV2(dataEngine) && types.ErrorIsStopped(err)) { - return errors.Wrapf(err, "Failed to get instance process %v", instanceName) - } +func (h *InstanceHandler) createInstance(instanceName string, dataEngine longhorn.DataEngineType, obj runtime.Object) (err error) { + if h.isEngineOfV2DataEngine(obj, dataEngine) { + instanceExists := false + targetInstanceRequired := false - logrus.Infof("Creating instance %v", instanceName) - if _, err := h.instanceManagerHandler.CreateInstance(obj); err != nil { - if !types.ErrorAlreadyExists(err) { - h.eventRecorder.Eventf(obj, corev1.EventTypeWarning, constant.EventReasonFailedStarting, "Error starting %v: %v", instanceName, err) - return err + instance, err := h.instanceManagerHandler.GetInstance(obj, false) + if err == nil { + instanceExists = true + + targetInstanceRequired, err = h.instanceManagerHandler.RequireRemoteTargetInstance(obj) + if err != nil { + return errors.Wrapf(err, "failed to check if remote target instance for %v is required", instanceName) + } + if targetInstanceRequired { + _, err = h.instanceManagerHandler.GetInstance(obj, true) + if err == nil { + return nil + } + } else { + if instance.Status.StandbyTargetPortStart != 0 || instance.Status.TargetPortStart != 0 { + return nil + } + + targetInstanceRequired = true + err = fmt.Errorf("cannot find local target instance for %v", instanceName) + } } - // Already exists, lost track may due to previous datastore conflict - return nil + + if !types.ErrorIsNotFound(err) && !(types.IsDataEngineV2(dataEngine) && types.ErrorIsStopped(err)) { + return errors.Wrapf(err, "failed to get instance %v before creating", instanceName) + } + + if !instanceExists { + logrus.Infof("Creating instance %v", instanceName) + if _, err := h.instanceManagerHandler.CreateInstance(obj, false); err != nil { + if !types.ErrorAlreadyExists(err) { + h.eventRecorder.Eventf(obj, corev1.EventTypeWarning, constant.EventReasonFailedStarting, "Error starting %v: %v", instanceName, err) + return err + } + // Already exists, lost track may due to previous datastore conflict + return nil + } + h.eventRecorder.Eventf(obj, corev1.EventTypeNormal, constant.EventReasonStart, "Starts instance %v", instanceName) + } + + if targetInstanceRequired { + logrus.Infof("Creating target instance %v", instanceName) + if _, err := h.instanceManagerHandler.CreateInstance(obj, true); err != nil { + if !types.ErrorAlreadyExists(err) { + h.eventRecorder.Eventf(obj, corev1.EventTypeWarning, constant.EventReasonFailedStarting, "Error starting %v: %v", instanceName, err) + return err + } + // Already exists, lost track may due to previous datastore conflict + return nil + } + h.eventRecorder.Eventf(obj, corev1.EventTypeNormal, constant.EventReasonStart, "Starts target instance %v", instanceName) + } + } else { + _, err := h.instanceManagerHandler.GetInstance(obj, false) + if err == nil { + return nil + } + if !types.ErrorIsNotFound(err) && !(types.IsDataEngineV2(dataEngine) && types.ErrorIsStopped(err)) { + return errors.Wrapf(err, "Failed to get instance process %v", instanceName) + } + + logrus.Infof("Creating instance %v", instanceName) + if _, err := h.instanceManagerHandler.CreateInstance(obj, false); err != nil { + if !types.ErrorAlreadyExists(err) { + h.eventRecorder.Eventf(obj, corev1.EventTypeWarning, constant.EventReasonFailedStarting, "Error starting %v: %v", instanceName, err) + return err + } + // Already exists, lost track may due to previous datastore conflict + return nil + } + h.eventRecorder.Eventf(obj, corev1.EventTypeNormal, constant.EventReasonStart, "Starts %v", instanceName) } - h.eventRecorder.Eventf(obj, corev1.EventTypeNormal, constant.EventReasonStart, "Starts %v", instanceName) return nil } @@ -481,3 +811,191 @@ func (h *InstanceHandler) deleteInstance(instanceName string, obj runtime.Object return nil } + +func (h *InstanceHandler) suspendInstance(instanceName string, obj runtime.Object) error { + logrus.Infof("Suspending instance %v", instanceName) + + if _, err := h.instanceManagerHandler.GetInstance(obj, false); err != nil { + return errors.Wrapf(err, "failed to get instance %v for suspension", instanceName) + } + + if err := h.instanceManagerHandler.SuspendInstance(obj); err != nil { + return errors.Wrapf(err, "failed to suspend instance %v", instanceName) + } + + return nil +} + +func (h *InstanceHandler) resumeInstance(instanceName string, dataEngine longhorn.DataEngineType, obj runtime.Object) error { + logrus.Infof("Resuming instance %v", instanceName) + + if types.IsDataEngineV1(dataEngine) { + return fmt.Errorf("resuming instance is not supported for data engine %v", dataEngine) + } + + if _, err := h.instanceManagerHandler.GetInstance(obj, false); err != nil { + return errors.Wrapf(err, "failed to get instance %v for resumption", instanceName) + } + + if err := h.instanceManagerHandler.ResumeInstance(obj); err != nil { + return errors.Wrapf(err, "failed to resume instance %v", instanceName) + } + + return nil +} + +func (h *InstanceHandler) switchOverTarget(instanceName string, obj runtime.Object) error { + logrus.Infof("Switching over target for instance %v", instanceName) + + if err := h.instanceManagerHandler.SwitchOverTarget(obj); err != nil { + return errors.Wrapf(err, "failed to switch over target for instance %s", instanceName) + } + + return nil +} + +func (h *InstanceHandler) deleteTarget(instanceName string, obj runtime.Object) error { + logrus.Infof("Deleting target for instance %s", instanceName) + + if err := h.instanceManagerHandler.DeleteTarget(obj); err != nil { + return errors.Wrapf(err, "failed to delete target for instance %s", instanceName) + } + + return nil +} + +func (h *InstanceHandler) isV2DataEngineBeingUpgraded(spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus) bool { + if !types.IsDataEngineV2(spec.DataEngine) { + return false + } + + upgradeRequested, err := h.ds.IsNodeDataEngineUpgradeRequested(spec.NodeID) + if err != nil { + logrus.WithError(err).Errorf("Failed to get node %v", spec.NodeID) + return false + } + + if !upgradeRequested { + return false + } + + if spec.TargetNodeIDForLiveUpgrade == "" { + return false + } + + return spec.NodeID != spec.TargetNodeIDForLiveUpgrade && spec.TargetNodeIDForLiveUpgrade == status.CurrentTargetNodeIDForLiveUpgrade +} + +func isVolumeBeingSwitchedBack(spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus) bool { + return spec.NodeID == spec.TargetNodeIDForLiveUpgrade && spec.TargetNodeIDForLiveUpgrade != status.CurrentTargetNodeIDForLiveUpgrade +} + +func isTargetInstanceReplacementCreated(instance *longhorn.InstanceProcess) bool { + return instance.Status.StandbyTargetPortStart != 0 +} + +func isTargetInstanceRemote(instance *longhorn.InstanceProcess) bool { + return instance.Status.PortStart != 0 && instance.Status.TargetPortStart == 0 +} + +func (h *InstanceHandler) getInstanceManagerRO(obj interface{}, spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus) (*longhorn.InstanceManager, error) { + // Only happen when upgrading instance-manager image + if spec.DesireState == longhorn.InstanceStateRunning && status.CurrentState == longhorn.InstanceStateSuspended { + return h.ds.GetRunningInstanceManagerByNodeRO(spec.NodeID, spec.DataEngine) + } + + return h.ds.GetInstanceManagerByInstanceRO(obj, false) +} + +func (h *InstanceHandler) handleIfInstanceManagerStarting(imName, instanceName string, status *longhorn.InstanceStatus) { + if status.CurrentState != longhorn.InstanceStateError { + logrus.Warnf("Marking the instance as state ERROR since the starting instance manager %v shouldn't contain the running instance %v", imName, instanceName) + } + status.CurrentState = longhorn.InstanceStateError + status.CurrentImage = "" + status.IP = "" + status.TargetIP = "" + status.StorageIP = "" + status.StorageTargetIP = "" + status.Port = 0 + status.TargetPort = 0 + h.resetInstanceErrorCondition(status) +} + +func (h *InstanceHandler) isInstanceExist(instanceName string, spec *longhorn.InstanceSpec) (bool, error) { + var err error + + im, err := h.ds.GetRunningInstanceManagerByNodeRO(spec.NodeID, spec.DataEngine) + if err != nil { + return false, err + } + + client, err := engineapi.NewInstanceManagerClient(im, false) + if err != nil { + return false, err + } + defer client.Close() + + _, err = client.InstanceGet(spec.DataEngine, instanceName, string(longhorn.InstanceManagerTypeEngine)) + if err != nil { + return false, err + } + + return true, nil +} + +func (h *InstanceHandler) isSpecImageReady(obj interface{}, spec *longhorn.InstanceSpec) bool { + if types.IsDataEngineV1(spec.DataEngine) { + return spec.Image != "" + } + + if h.instanceManagerHandler.IsEngine(obj) { + return spec.Image != "" + } + + // spec.image is not required for replica because the image of a v2 replica + // is the same as the running instance manager. + return true +} + +func (h *InstanceHandler) isTargetInstanceReplacementRunning(instanceName string, spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus) (bool, error) { + if spec.TargetNodeIDForLiveUpgrade == "" { + return false, nil + } + + logrus.Infof("Checking whether instance %v is running on target node %v", instanceName, spec.TargetNodeIDForLiveUpgrade) + + im, err := h.ds.GetRunningInstanceManagerByNodeRO(spec.TargetNodeIDForLiveUpgrade, spec.DataEngine) + if err != nil { + return false, errors.Wrapf(err, "failed to get instance manager on node %v for checking if target instance is running", spec.TargetNodeIDForLiveUpgrade) + } + + c, err := engineapi.NewInstanceManagerClient(im, false) + if err != nil { + return false, errors.Wrapf(err, "failed to create instance manager client for target instance") + } + defer c.Close() + + instance, err := c.InstanceGet(spec.DataEngine, instanceName, string(longhorn.InstanceManagerTypeEngine)) + if err != nil { + return false, errors.Wrapf(err, "failed to get target instance %v on node %v", instanceName, spec.TargetNodeIDForLiveUpgrade) + } + + if isVolumeBeingSwitchedBack(spec, status) { + return isTargetInstanceRemote(instance) && isTargetInstanceReplacementCreated(instance), nil + } + + return true, nil +} + +func (h *InstanceHandler) isEngineOfV2DataEngine(obj interface{}, dataEngine longhorn.DataEngineType) bool { + return types.IsDataEngineV2(dataEngine) && h.instanceManagerHandler.IsEngine(obj) +} + +func (h *InstanceHandler) isEngineResumeRequired(spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus) bool { + return spec.TargetNodeIDForLiveUpgrade == status.CurrentTargetNodeIDForLiveUpgrade && status.CurrentState == longhorn.InstanceStateSuspended +} + +func isDataEngineNotBeingLiveUpgraded(spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus) bool { + return spec.TargetNodeIDForLiveUpgrade == "" && !status.TargetInstanceReplacementCreated +} diff --git a/controller/instance_handler_test.go b/controller/instance_handler_test.go index 68fe69d6a4..60c533c792 100644 --- a/controller/instance_handler_test.go +++ b/controller/instance_handler_test.go @@ -36,7 +36,7 @@ const ( type MockInstanceManagerHandler struct{} -func (imh *MockInstanceManagerHandler) GetInstance(obj interface{}) (*longhorn.InstanceProcess, error) { +func (imh *MockInstanceManagerHandler) GetInstance(obj interface{}, isInstanceOnRemoteNode bool) (*longhorn.InstanceProcess, error) { metadata, err := meta.Accessor(obj) if err != nil { return nil, err @@ -48,7 +48,7 @@ func (imh *MockInstanceManagerHandler) GetInstance(obj interface{}) (*longhorn.I return &longhorn.InstanceProcess{}, nil } -func (imh *MockInstanceManagerHandler) CreateInstance(obj interface{}) (*longhorn.InstanceProcess, error) { +func (imh *MockInstanceManagerHandler) CreateInstance(obj interface{}, isInstanceOnRemoteNode bool) (*longhorn.InstanceProcess, error) { metadata, err := meta.Accessor(obj) if err != nil { return nil, err @@ -72,10 +72,35 @@ func (imh *MockInstanceManagerHandler) DeleteInstance(obj interface{}) error { return nil } +func (imh *MockInstanceManagerHandler) SuspendInstance(obj interface{}) error { + return fmt.Errorf("SuspendInstance is not mocked") +} + +func (imh *MockInstanceManagerHandler) ResumeInstance(obj interface{}) error { + return fmt.Errorf("ResumeInstance is not mocked") +} + +func (imh *MockInstanceManagerHandler) SwitchOverTarget(obj interface{}) error { + return fmt.Errorf("SwitchOverTarget is not mocked") +} + +func (imh *MockInstanceManagerHandler) DeleteTarget(obj interface{}) error { + return fmt.Errorf("DeleteTarget is not mocked") +} + +func (imh *MockInstanceManagerHandler) IsEngine(obj interface{}) bool { + _, ok := obj.(*longhorn.Engine) + return ok +} + func (imh *MockInstanceManagerHandler) LogInstance(ctx context.Context, obj interface{}) (*engineapi.InstanceManagerClient, *imapi.LogStream, error) { return nil, nil, fmt.Errorf("LogInstance is not mocked") } +func (imh *MockInstanceManagerHandler) RequireRemoteTargetInstance(obj interface{}) (bool, error) { + return false, nil +} + func newEngine(name, currentImage, imName, nodeName, ip string, port int, started bool, currentState, desireState longhorn.InstanceState) *longhorn.Engine { var conditions []longhorn.Condition conditions = types.SetCondition(conditions, @@ -108,8 +133,11 @@ func newEngine(name, currentImage, imName, nodeName, ip string, port int, starte CurrentImage: currentImage, InstanceManagerName: imName, IP: ip, + TargetIP: ip, StorageIP: ip, + StorageTargetIP: ip, Port: port, + TargetPort: 0, // v1 volume doesn't set target port Started: started, Conditions: conditions, }, diff --git a/controller/instance_manager_controller.go b/controller/instance_manager_controller.go index ffa60c578d..eec239013e 100644 --- a/controller/instance_manager_controller.go +++ b/controller/instance_manager_controller.go @@ -596,7 +596,16 @@ func (imc *InstanceManagerController) handlePod(im *longhorn.InstanceManager) er return err } - isPodDeletionNotRequired := (isSettingSynced && dataEngineCPUMaskIsApplied) || areInstancesRunningInPod || isPodDeletedOrNotRunning + dataEngineUpgradeRequested := false + if types.IsDataEngineV2(im.Spec.DataEngine) { + requested, err := imc.ds.IsNodeDataEngineUpgradeRequested(im.Spec.NodeID) + if err != nil { + return err + } + dataEngineUpgradeRequested = requested + } + + isPodDeletionNotRequired := (isSettingSynced && dataEngineCPUMaskIsApplied) || areInstancesRunningInPod || isPodDeletedOrNotRunning || dataEngineUpgradeRequested if im.Status.CurrentState != longhorn.InstanceManagerStateError && im.Status.CurrentState != longhorn.InstanceManagerStateStopped && isPodDeletionNotRequired { diff --git a/controller/node_controller.go b/controller/node_controller.go index e6e3a2a038..db8dae6c7f 100644 --- a/controller/node_controller.go +++ b/controller/node_controller.go @@ -191,6 +191,19 @@ func NewNodeController( } nc.cacheSyncs = append(nc.cacheSyncs, ds.KubeNodeInformer.HasSynced) + if _, err = ds.NodeDataEngineUpgradeInformer.AddEventHandlerWithResyncPeriod( + cache.FilteringResourceEventHandler{ + FilterFunc: nc.isResponsibleForNodeDataEngineUpgrade, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: nc.enqueueManagerNodeDataEngineUpgrade, + UpdateFunc: func(old, cur interface{}) { nc.enqueueManagerNodeDataEngineUpgrade(cur) }, + DeleteFunc: nc.enqueueManagerNodeDataEngineUpgrade, + }, + }, 0); err != nil { + return nil, err + } + nc.cacheSyncs = append(nc.cacheSyncs, ds.NodeDataEngineUpgradeInformer.HasSynced) + return nc, nil } @@ -284,6 +297,43 @@ func (nc *NodeController) snapshotHashRequired(volume *longhorn.Volume) bool { return true } +func (nc *NodeController) isResponsibleForNodeDataEngineUpgrade(obj interface{}) bool { + upgrade, ok := obj.(*longhorn.NodeDataEngineUpgrade) + if !ok { + return false + } + + return upgrade.Status.OwnerID == nc.controllerID +} + +func (nc *NodeController) enqueueManagerNodeDataEngineUpgrade(obj interface{}) { + upgrade, ok := obj.(*longhorn.NodeDataEngineUpgrade) + if !ok { + deletedState, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("received unexpected obj: %#v", obj)) + return + } + + // use the last known state, to enqueue, dependent objects + upgrade, ok = deletedState.Obj.(*longhorn.NodeDataEngineUpgrade) + if !ok { + utilruntime.HandleError(fmt.Errorf("DeletedFinalStateUnknown contained invalid object: %#v", deletedState.Obj)) + return + } + } + + node, err := nc.ds.GetNodeRO(upgrade.Spec.NodeID) + if err != nil { + if !apierrors.IsNotFound(err) { + utilruntime.HandleError(fmt.Errorf("failed to get node %v for upgrade %v: %v ", + upgrade.Spec.NodeID, upgrade.Name, err)) + } + return + } + nc.enqueueNode(node) +} + func isManagerPod(obj interface{}) bool { pod, ok := obj.(*corev1.Pod) if !ok { @@ -434,6 +484,10 @@ func (nc *NodeController) syncNode(key string) (err error) { return err } + if err = nc.setDataEngineUpgradeRequestedCondition(node); err != nil { + return err + } + // Set any RWX leases to non-delinquent if owned by not-ready node. // Usefulness of delinquent state has passed. if err = nc.clearDelinquentLeasesIfNodeNotReady(node); err != nil { @@ -1446,6 +1500,13 @@ func (nc *NodeController) syncInstanceManagers(node *longhorn.Node) error { cleanupRequired = false log.Debugf("Skipping cleaning up non-default unknown instance manager %s", im.Name) } + + if types.IsDataEngineV2(dataEngine) { + dataEngineUpgradeRequestedCondition := types.GetCondition(node.Status.Conditions, longhorn.NodeConditionTypeDataEngineUpgradeRequested) + if dataEngineUpgradeRequestedCondition.Status == longhorn.ConditionStatusTrue { + cleanupRequired = false + } + } } if cleanupRequired { log.Infof("Cleaning up the redundant instance manager %v when there is no running/starting instance", im.Name) @@ -2129,6 +2190,43 @@ func (nc *NodeController) setReadyAndSchedulableConditions(node *longhorn.Node, return nil } +func (nc *NodeController) setDataEngineUpgradeRequestedCondition(node *longhorn.Node) error { + nodeUpgrades, err := nc.ds.ListNodeDataEngineUpgradesByNodeRO(node.Name) + if err != nil { + return err + } + + nodeUpgradeRequested := false + nodeUpgradeName := "" + for _, upgrade := range nodeUpgrades { + if upgrade.Status.State != longhorn.UpgradeStateRebuildingReplica && + upgrade.Status.State != longhorn.UpgradeStateFinalizing && + upgrade.Status.State != longhorn.UpgradeStateError && + upgrade.Status.State != longhorn.UpgradeStateCompleted { + nodeUpgradeRequested = true + nodeUpgradeName = upgrade.Name + break + } + } + + if nodeUpgradeRequested { + node.Status.Conditions = types.SetConditionAndRecord(node.Status.Conditions, + longhorn.NodeConditionTypeDataEngineUpgradeRequested, + longhorn.ConditionStatusTrue, + string(longhorn.NodeConditionReasonNodeDataEngineUpgradeRequested), + fmt.Sprintf("nodeDataeEngineUpgrade %v is in progress", nodeUpgradeName), + nc.eventRecorder, node, corev1.EventTypeNormal) + } else { + node.Status.Conditions = types.SetConditionAndRecord(node.Status.Conditions, + longhorn.NodeConditionTypeDataEngineUpgradeRequested, + longhorn.ConditionStatusFalse, + "", + "", + nc.eventRecorder, node, corev1.EventTypeNormal) + } + return nil +} + func (nc *NodeController) setReadyConditionForManagerPod(node *longhorn.Node, managerPods []*corev1.Pod, nodeReady bool) bool { nodeManagerFound := false for _, pod := range managerPods { @@ -2208,11 +2306,13 @@ func (nc *NodeController) SetSchedulableCondition(node *longhorn.Node, kubeNode message := "" disableScheduling := false + dataEngineUpgradeRequestedCondition := types.GetCondition(node.Status.Conditions, longhorn.NodeConditionTypeDataEngineUpgradeRequested) + if disableSchedulingOnCordonedNode && kubeSpec.Unschedulable { disableScheduling = true reason = string(longhorn.NodeConditionReasonKubernetesNodeCordoned) message = fmt.Sprintf("Node %v is cordoned", node.Name) - } else if node.Spec.DataEngineUpgradeRequested { + } else if dataEngineUpgradeRequestedCondition.Status == longhorn.ConditionStatusTrue { disableScheduling = true reason = string(longhorn.NodeConditionReasonNodeDataEngineUpgradeRequested) message = fmt.Sprintf("Data engine of node %v is being upgraded", node.Name) diff --git a/controller/replica_controller.go b/controller/replica_controller.go index abb8dea5e9..fb8055f0cf 100644 --- a/controller/replica_controller.go +++ b/controller/replica_controller.go @@ -25,13 +25,13 @@ import ( clientset "k8s.io/client-go/kubernetes" v1core "k8s.io/client-go/kubernetes/typed/core/v1" - imapi "github.com/longhorn/longhorn-instance-manager/pkg/api" - lhns "github.com/longhorn/go-common-libs/ns" + imapi "github.com/longhorn/longhorn-instance-manager/pkg/api" "github.com/longhorn/longhorn-manager/datastore" "github.com/longhorn/longhorn-manager/engineapi" "github.com/longhorn/longhorn-manager/types" + "github.com/longhorn/longhorn-manager/util" longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2" ) @@ -55,6 +55,8 @@ type ReplicaController struct { rebuildingLock *sync.Mutex inProgressRebuildingMap map[string]struct{} + + proxyConnCounter util.Counter } func NewReplicaController( @@ -62,7 +64,8 @@ func NewReplicaController( ds *datastore.DataStore, scheme *runtime.Scheme, kubeClient clientset.Interface, - namespace string, controllerID string) (*ReplicaController, error) { + namespace string, controllerID string, + proxyConnCounter util.Counter) (*ReplicaController, error) { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(logrus.Infof) @@ -80,6 +83,8 @@ func NewReplicaController( ds: ds, + proxyConnCounter: proxyConnCounter, + rebuildingLock: &sync.Mutex{}, inProgressRebuildingMap: map[string]struct{}{}, } @@ -576,6 +581,21 @@ func (rc *ReplicaController) DeleteInstance(obj interface{}) (err error) { cleanupRequired = true } + if types.IsDataEngineV2(r.Spec.DataEngine) && r.Spec.FailedAt != "" { + upgradeRequested, err := rc.ds.IsNodeDataEngineUpgradeRequested(r.Spec.NodeID) + if err != nil { + return err + } + if upgradeRequested { + log.Infof("Deleting failed replica instance %v from its engine instance without cleanup since the node %v is requested to upgrade data engine", + r.Name, r.Spec.NodeID) + err = rc.removeFailedReplicaInstanceFromEngineInstance(r) + if err != nil { + return errors.Wrapf(err, "failed to remove failed replica instance %v from engine instance", r.Name) + } + } + } + log.WithField("cleanupRequired", cleanupRequired).Infof("Deleting replica instance on disk %v", r.Spec.DiskPath) err = c.InstanceDelete(r.Spec.DataEngine, r.Name, string(longhorn.InstanceManagerTypeReplica), r.Spec.DiskID, cleanupRequired) @@ -599,6 +619,36 @@ func (rc *ReplicaController) DeleteInstance(obj interface{}) (err error) { return nil } +func (rc *ReplicaController) removeFailedReplicaInstanceFromEngineInstance(r *longhorn.Replica) error { + e, err := rc.ds.GetVolumeCurrentEngine(r.Spec.VolumeName) + if err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return err + } + + engineCliClient, err := GetBinaryClientForEngine(e, &engineapi.EngineCollection{}, e.Status.CurrentImage) + if err != nil { + return err + } + + engineClientProxy, err := engineapi.GetCompatibleClient(e, engineCliClient, rc.ds, rc.logger, rc.proxyConnCounter) + if err != nil { + return err + } + defer engineClientProxy.Close() + + if err := engineClientProxy.ReplicaRemove(e, "", r.Name); err != nil { + if strings.Contains(err.Error(), "cannot find replica") { + return nil + } + return errors.Wrapf(err, "failed to remove failed replica %v from engine instance", r.Name) + } + + return nil +} + func canDeleteInstance(r *longhorn.Replica) bool { return types.IsDataEngineV1(r.Spec.DataEngine) || (types.IsDataEngineV2(r.Spec.DataEngine) && r.DeletionTimestamp != nil) diff --git a/controller/utils.go b/controller/utils.go index 1c27ce1f64..cfecd2855e 100644 --- a/controller/utils.go +++ b/controller/utils.go @@ -23,10 +23,6 @@ func hasReplicaEvictionRequested(rs map[string]*longhorn.Replica) bool { return false } -func isVolumeUpgrading(v *longhorn.Volume) bool { - return v.Status.CurrentImage != v.Spec.Image -} - // isTargetVolumeOfAnActiveCloning checks if the input volume is the target volume of an on-going cloning process func isTargetVolumeOfAnActiveCloning(v *longhorn.Volume) bool { isCloningDesired := types.IsDataFromVolume(v.Spec.DataSource) diff --git a/controller/volume_controller.go b/controller/volume_controller.go index b514e6227c..7a3a799bf0 100644 --- a/controller/volume_controller.go +++ b/controller/volume_controller.go @@ -1004,9 +1004,12 @@ func (c *VolumeController) cleanupCorruptedOrStaleReplicas(v *longhorn.Volume, r } continue } else if r.Spec.Image != v.Spec.Image { - // r.Spec.Active shouldn't be set for the leftover replicas, something must wrong - log.WithField("replica", r.Name).Warnf("Replica engine image %v is different from volume engine image %v, "+ - "but replica spec.Active has been set", r.Spec.Image, v.Spec.Image) + // For a v2 volume, the instance manager image of a replica can be different from the one of its volume + if types.IsDataEngineV1(v.Spec.DataEngine) { + // r.Spec.Active shouldn't be set for the leftover replicas, something must wrong + log.WithField("replica", r.Name).Warnf("Replica engine image %v is different from volume engine image %v, "+ + "but replica spec.Active has been set", r.Spec.Image, v.Spec.Image) + } } } @@ -1613,7 +1616,7 @@ func (c *VolumeController) reconcileAttachDetachStateMachine(v *longhorn.Volume, return err } if !c.areVolumeDependentResourcesOpened(e, rs) { - log.Warnf("Volume is attached but dependent resources are not opened") + log.WithField("e.Status.CurrentState", e.Status.CurrentState).Warn("Volume is attached but dependent resources are not opened") } } return nil @@ -1821,9 +1824,16 @@ func (c *VolumeController) openVolumeDependentResources(v *longhorn.Volume, e *l return err } if canIMLaunchReplica { - if r.Spec.FailedAt == "" && r.Spec.Image == v.Status.CurrentImage { + if r.Spec.FailedAt == "" { if r.Status.CurrentState == longhorn.InstanceStateStopped { - r.Spec.DesireState = longhorn.InstanceStateRunning + if types.IsDataEngineV1(e.Spec.DataEngine) { + if r.Spec.Image == v.Status.CurrentImage { + r.Spec.DesireState = longhorn.InstanceStateRunning + } + } else { + // For v2 volume, the image of replica is no need to be the same as the volume image + r.Spec.DesireState = longhorn.InstanceStateRunning + } } } } else { @@ -1860,8 +1870,11 @@ func (c *VolumeController) openVolumeDependentResources(v *longhorn.Volume, e *l if r.Spec.NodeID == "" { continue } - if r.Spec.Image != v.Status.CurrentImage { - continue + // For v2 volume, the image of replica is no need to be the same as the volume image + if types.IsDataEngineV1(v.Spec.DataEngine) { + if r.Spec.Image != v.Status.CurrentImage { + continue + } } if r.Spec.EngineName != e.Name { continue @@ -1907,7 +1920,15 @@ func (c *VolumeController) openVolumeDependentResources(v *longhorn.Volume, e *l } e.Spec.NodeID = v.Spec.NodeID e.Spec.ReplicaAddressMap = replicaAddressMap - e.Spec.DesireState = longhorn.InstanceStateRunning + + if types.IsDataEngineV1(v.Spec.DataEngine) { + e.Spec.DesireState = longhorn.InstanceStateRunning + } else { + if v.Spec.Image == v.Status.CurrentImage && v.Spec.TargetNodeIDForLiveUpgrade == v.Status.CurrentTargetNodeIDForLiveUpgrade { + e.Spec.DesireState = longhorn.InstanceStateRunning + } + } + // The volume may be activated e.Spec.DisableFrontend = v.Status.FrontendDisabled e.Spec.Frontend = v.Spec.Frontend @@ -1950,6 +1971,7 @@ func (c *VolumeController) closeVolumeDependentResources(v *longhorn.Volume, e * } e.Spec.RequestedBackupRestore = "" e.Spec.NodeID = "" + e.Spec.TargetNodeIDForLiveUpgrade = "" e.Spec.DesireState = longhorn.InstanceStateStopped } // must make sure engine stopped first before stopping replicas @@ -2061,7 +2083,7 @@ func (c *VolumeController) canInstanceManagerLaunchReplica(r *longhorn.Replica) if r.Status.InstanceManagerName != "" { return true, nil } - defaultIM, err := c.ds.GetInstanceManagerByInstanceRO(r) + defaultIM, err := c.ds.GetInstanceManagerByInstanceRO(r, false) if err != nil { return false, errors.Wrapf(err, "failed to find instance manager for replica %v", r.Name) } @@ -2211,6 +2233,25 @@ func hasLocalReplicaOnSameNodeAsEngine(e *longhorn.Engine, rs map[string]*longho // It will count all the potentially usable replicas, since some replicas maybe // blank or in rebuilding state func (c *VolumeController) replenishReplicas(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica, hardNodeAffinity string) error { + log := getLoggerForVolume(c.logger, v) + + // Skip the replenishReplicas if the volume is being upgraded + if types.IsDataEngineV2(v.Spec.DataEngine) { + if v.Status.OwnerID == "" { + log.Warn("spec.ownerID is empty, skip replenishing replica") + return nil + } + node, err := c.ds.GetNode(v.Status.OwnerID) + if err != nil { + return errors.Wrapf(err, "failed to get node %v for checking volume upgrade status", v.Status.OwnerID) + } + dataEngineUpgradeRequestedCondition := types.GetCondition(node.Status.Conditions, longhorn.NodeConditionTypeDataEngineUpgradeRequested) + if dataEngineUpgradeRequestedCondition.Status == longhorn.ConditionStatusTrue { + log.Info("Node is being upgraded, skip replenishing replica") + return nil + } + } + concurrentRebuildingLimit, err := c.ds.GetSettingAsInt(types.SettingNameConcurrentReplicaRebuildPerNodeLimit) if err != nil { return err @@ -2248,8 +2289,6 @@ func (c *VolumeController) replenishReplicas(v *longhorn.Volume, e *longhorn.Eng return nil } - log := getLoggerForVolume(c.logger, v) - replenishCount, updateNodeAffinity := c.getReplenishReplicasCount(v, rs, e) if hardNodeAffinity == "" && updateNodeAffinity != "" { hardNodeAffinity = updateNodeAffinity @@ -3027,8 +3066,8 @@ func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[str } log := getLoggerForVolume(c.logger, v).WithFields(logrus.Fields{ - "engine": e.Name, - "volumeDesiredEngineImage": v.Spec.Image, + "engine": e.Name, + "volumeDesiredImage": v.Spec.Image, }) if !isVolumeUpgrading(v) { @@ -3049,30 +3088,13 @@ func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[str // If volume is detached accidentally during the live upgrade, // the live upgrade info and the inactive replicas are meaningless. if v.Status.State == longhorn.VolumeStateDetached { - if e.Spec.Image != v.Spec.Image { - e.Spec.Image = v.Spec.Image - e.Spec.UpgradedReplicaAddressMap = map[string]string{} - } - for _, r := range rs { - if r.Spec.Image != v.Spec.Image { - r.Spec.Image = v.Spec.Image - rs[r.Name] = r - } - if !r.Spec.Active { - log.Infof("Removing inactive replica %v when the volume is detached accidentally during the live upgrade", r.Name) - if err := c.deleteReplica(r, rs); err != nil { - return err - } - } - } - // TODO current replicas should be calculated by checking if there is - // any other image exists except for the v.Spec.Image - v.Status.CurrentImage = v.Spec.Image - return nil + return c.handleDetachedVolumeUpgrade(v, e, rs, log) } - // Only start live upgrade if volume is healthy - if v.Status.State != longhorn.VolumeStateAttached || v.Status.Robustness != longhorn.VolumeRobustnessHealthy { + // Only start live upgrade if + // - v1 volume is healthy + // - v2 volume is healthy or degraded + if isVolumeNotEligibleForLiveUpgrade(v) { if v.Status.State != longhorn.VolumeStateAttached || v.Status.Robustness != longhorn.VolumeRobustnessDegraded { return nil } @@ -3159,18 +3181,160 @@ func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[str e.Spec.Image = v.Spec.Image } } - c.finishLiveEngineUpgrade(v, e, rs, log) } else { - // TODO: Implement the logic for data engine v2 + if isV2DataEngineLiveUpgradeCompleted(v) { + return nil + } + + log = log.WithField("engine", e.Name) + + if v.Spec.TargetNodeIDForLiveUpgrade != "" { + if e.Spec.TargetNodeIDForLiveUpgrade != v.Spec.TargetNodeIDForLiveUpgrade { + if e.Spec.Image != v.Spec.Image { + log.Infof("Updating image from %s to %s for v2 data engine live upgrade", e.Spec.Image, v.Spec.Image) + e.Spec.Image = v.Spec.Image + } + + log.Infof("Updating target node from %s to %s for v2 data engine live upgrade", e.Spec.TargetNodeIDForLiveUpgrade, v.Spec.TargetNodeIDForLiveUpgrade) + e.Spec.TargetNodeIDForLiveUpgrade = v.Spec.TargetNodeIDForLiveUpgrade + return nil + } + + if !e.Status.TargetInstanceReplacementCreated && e.Status.CurrentTargetNodeIDForLiveUpgrade != v.Spec.TargetNodeIDForLiveUpgrade { + log.Debug("Waiting for target instance replacement to be created") + return nil + } + + if e.Status.CurrentTargetNodeIDForLiveUpgrade != v.Spec.TargetNodeIDForLiveUpgrade { + if e.Status.CurrentState == longhorn.InstanceStateRunning { + log.Infof("Suspending engine for v2 data engine live upgrade") + e.Spec.DesireState = longhorn.InstanceStateSuspended + return nil + } else { + // TODO: what if e.Status.CurrentState != longhorn.InstanceStateRunning + } + } + + // At this moment: + // 1. volume is running and healthy + // 2. engine is suspended + // 3. initiator is correcting to new target + // 4. old target is still existing + + if replicaAddressMap, err := c.constructReplicaAddressMap(v, e, rs); err != nil { + return nil + } else { + if !reflect.DeepEqual(e.Spec.UpgradedReplicaAddressMap, replicaAddressMap) { + e.Spec.UpgradedReplicaAddressMap = replicaAddressMap + return nil + } + } + + if e.Status.CurrentState == longhorn.InstanceStateSuspended { + log.Infof("Resuming engine for live upgrade") + e.Spec.DesireState = longhorn.InstanceStateRunning + return nil + } + + if e.Status.CurrentState != longhorn.InstanceStateRunning { + log.Debugf("Engine is in %v, waiting for engine to be running", e.Status.CurrentState) + return nil + } + + // At this point: + // 1. volume is running and healthy + // 1. engine is running + // 2. initiator is correcting to new target + // 4. old target is still deleted + + if v.Status.CurrentTargetNodeIDForLiveUpgrade != v.Spec.TargetNodeIDForLiveUpgrade { + v.Status.CurrentTargetNodeIDForLiveUpgrade = v.Spec.TargetNodeIDForLiveUpgrade + return nil + } + } + } + + c.finishLiveEngineUpgrade(v, e, rs, log) + + return nil +} + +func (c *VolumeController) handleDetachedVolumeUpgrade(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica, log logrus.FieldLogger) error { + if e.Spec.Image != v.Spec.Image { + e.Spec.Image = v.Spec.Image + e.Spec.UpgradedReplicaAddressMap = map[string]string{} + e.Spec.TargetNodeIDForLiveUpgrade = "" + } + + for name, r := range rs { + if err := c.updateDetachedReplica(v, r); err != nil { + return err + } + if !r.Spec.Active { + log.Infof("Removing inactive replica %v when the volume is detached during live upgrade", r.Name) + if err := c.deleteReplica(r, rs); err != nil { + return err + } + continue + } + rs[name] = r + } + // TODO current replicas should be calculated by checking if there is + // any other image exists except for the v.Spec.Image + v.Status.CurrentImage = v.Spec.Image + return nil +} + +func (c *VolumeController) updateDetachedReplica(v *longhorn.Volume, r *longhorn.Replica) error { + if r.Spec.Image == v.Spec.Image { + return nil + } + + if types.IsDataEngineV1(v.Spec.DataEngine) { + r.Spec.Image = v.Spec.Image return nil } + + im, err := c.ds.GetRunningInstanceManagerByNodeRO(r.Spec.NodeID, longhorn.DataEngineTypeV2) + if err != nil { + return err + } + r.Spec.Image = im.Spec.Image return nil } +func isVolumeNotEligibleForLiveUpgrade(v *longhorn.Volume) bool { + if v.Status.State != longhorn.VolumeStateAttached { + return true + } + + if types.IsDataEngineV1(v.Spec.DataEngine) { + if v.Status.Robustness != longhorn.VolumeRobustnessHealthy { + return true + } + } + + if types.IsDataEngineV2(v.Spec.DataEngine) { + if v.Status.Robustness != longhorn.VolumeRobustnessHealthy && + v.Status.Robustness != longhorn.VolumeRobustnessDegraded { + return true + } + } + + return false +} + +func isV2DataEngineLiveUpgradeCompleted(v *longhorn.Volume) bool { + return v.Spec.TargetNodeIDForLiveUpgrade != "" && + v.Spec.Image == v.Status.CurrentImage && + v.Spec.TargetNodeIDForLiveUpgrade == v.Status.CurrentTargetNodeIDForLiveUpgrade && + v.Spec.NodeID == v.Status.CurrentNodeID +} + func (c *VolumeController) constructReplicaAddressMap(v *longhorn.Volume, e *longhorn.Engine, dataPathToNewReplica map[string]*longhorn.Replica) (map[string]string, error) { log := getLoggerForVolume(c.logger, v).WithFields(logrus.Fields{ - "engine": e.Name, - "volumeDesiredEngineImage": v.Spec.Image, + "engine": e.Name, + "volumeDesiredImage": v.Spec.Image, }) replicaAddressMap := map[string]string{} @@ -3200,8 +3364,8 @@ func (c *VolumeController) constructReplicaAddressMap(v *longhorn.Volume, e *lon func (c *VolumeController) groupReplicasByImageAndState(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica) (unknownReplicas, dataPathToOldRunningReplica, dataPathToNewReplica map[string]*longhorn.Replica) { log := getLoggerForVolume(c.logger, v).WithFields(logrus.Fields{ - "engine": e.Name, - "volumeDesiredEngineImage": v.Spec.Image, + "engine": e.Name, + "volumeDesiredImage": v.Spec.Image, }) unknownReplicas = map[string]*longhorn.Replica{} @@ -3256,20 +3420,29 @@ func (c *VolumeController) checkOldAndNewEngineImagesForLiveUpgrade(v *longhorn. } func (c *VolumeController) finishLiveEngineUpgrade(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica, log *logrus.Entry) { - if e.Status.CurrentImage != v.Spec.Image || - e.Status.CurrentState != longhorn.InstanceStateRunning { + if e.Status.CurrentImage != v.Spec.Image { + return + } + if e.Status.CurrentState != longhorn.InstanceStateRunning { return } - c.switchActiveReplicas(rs, func(r *longhorn.Replica, image string) bool { - return r.Spec.Image == image && r.DeletionTimestamp.IsZero() - }, v.Spec.Image) + if types.IsDataEngineV1(v.Spec.DataEngine) { + c.switchActiveReplicas(rs, func(r *longhorn.Replica, image string) bool { + return r.Spec.Image == image && r.DeletionTimestamp.IsZero() + }, v.Spec.Image) + } e.Spec.ReplicaAddressMap = e.Spec.UpgradedReplicaAddressMap e.Spec.UpgradedReplicaAddressMap = map[string]string{} // cleanupCorruptedOrStaleReplicas() will take care of old replicas log.Infof("Engine %v has been upgraded from %v to %v", e.Name, v.Status.CurrentImage, v.Spec.Image) v.Status.CurrentImage = v.Spec.Image + + if v.Spec.TargetNodeIDForLiveUpgrade == "" { + v.Status.CurrentTargetNodeIDForLiveUpgrade = "" + e.Spec.TargetNodeIDForLiveUpgrade = "" + } } func (c *VolumeController) updateRequestedBackupForVolumeRestore(v *longhorn.Volume, e *longhorn.Engine) (err error) { @@ -3646,6 +3819,13 @@ func (c *VolumeController) createEngine(v *longhorn.Volume, currentEngineName st } func (c *VolumeController) newReplica(v *longhorn.Volume, e *longhorn.Engine, hardNodeAffinity string) *longhorn.Replica { + image := v.Status.CurrentImage + if types.IsDataEngineV2(v.Spec.DataEngine) { + // spec.image of v2 replica can be empty and different from the volume image, + // because the image of a v2 replica is the same as the running instance manager. + image = "" + } + return &longhorn.Replica{ ObjectMeta: metav1.ObjectMeta{ Name: types.GenerateReplicaNameForVolume(v.Name), @@ -3655,7 +3835,7 @@ func (c *VolumeController) newReplica(v *longhorn.Volume, e *longhorn.Engine, ha InstanceSpec: longhorn.InstanceSpec{ VolumeName: v.Name, VolumeSize: v.Spec.Size, - Image: v.Status.CurrentImage, + Image: image, DataEngine: v.Spec.DataEngine, DesireState: longhorn.InstanceStateStopped, }, @@ -4926,9 +5106,9 @@ func (c *VolumeController) shouldCleanUpFailedReplica(v *longhorn.Volume, r *lon return true } // TODO: Remove it once we can reuse failed replicas during v2 rebuilding - if types.IsDataEngineV2(v.Spec.DataEngine) { - return true - } + // if types.IsDataEngineV2(v.Spec.DataEngine) { + // return true + // } // Failed too long ago to be useful during a rebuild. if v.Spec.StaleReplicaTimeout > 0 && util.TimestampAfterTimeout(r.Spec.FailedAt, time.Duration(v.Spec.StaleReplicaTimeout)*time.Minute) { @@ -4942,3 +5122,13 @@ func (c *VolumeController) shouldCleanUpFailedReplica(v *longhorn.Volume, r *lon } return false } + +func isVolumeUpgrading(v *longhorn.Volume) bool { + imageNotUpdated := v.Status.CurrentImage != v.Spec.Image + + if types.IsDataEngineV1(v.Spec.DataEngine) { + return imageNotUpdated + } + + return imageNotUpdated || v.Spec.TargetNodeIDForLiveUpgrade != v.Status.CurrentTargetNodeIDForLiveUpgrade +} diff --git a/controller/volume_controller_test.go b/controller/volume_controller_test.go index 6e1eab29f0..f85d321f74 100644 --- a/controller/volume_controller_test.go +++ b/controller/volume_controller_test.go @@ -503,8 +503,11 @@ func (s *TestSuite) TestVolumeLifeCycle(c *C) { e.Status.OwnerID = TestNode1 e.Status.CurrentState = longhorn.InstanceStateStopped e.Status.IP = "" + e.Status.TargetIP = "" e.Status.StorageIP = "" + e.Status.StorageTargetIP = "" e.Status.Port = 0 + e.Status.TargetPort = 0 e.Status.Endpoint = "" e.Status.CurrentImage = "" e.Status.ReplicaModeMap = map[string]longhorn.ReplicaMode{}