From 393807e40574515298012fa21602e0a1b354df71 Mon Sep 17 00:00:00 2001 From: Derek Su Date: Mon, 25 Nov 2024 20:08:00 +0800 Subject: [PATCH] feat(v2 upgrade/webhook): handle the state transition of engines and replicas Longhorn 9104 Signed-off-by: Derek Su --- controller/engine_controller.go | 408 +++++++++++++++-- controller/instance_handler.go | 646 ++++++++++++++++++++++++--- controller/instance_handler_test.go | 33 +- controller/utils.go | 4 - controller/volume_controller.go | 281 ++++++++++-- controller/volume_controller_test.go | 3 + 6 files changed, 1229 insertions(+), 146 deletions(-) diff --git a/controller/engine_controller.go b/controller/engine_controller.go index 92da3d22d2..b5162f584f 100644 --- a/controller/engine_controller.go +++ b/controller/engine_controller.go @@ -3,6 +3,7 @@ package controller import ( "context" "fmt" + "net" "path/filepath" "reflect" "regexp" @@ -280,12 +281,7 @@ func (ec *EngineController) syncEngine(key string) (err error) { return errors.Wrapf(err, "failed to get engine") } - defaultEngineImage, err := ec.ds.GetSettingValueExisted(types.SettingNameDefaultEngineImage) - if err != nil { - return err - } - - isResponsible, err := ec.isResponsibleFor(engine, defaultEngineImage) + isResponsible, err := ec.isResponsibleFor(engine) if err != nil { return err } @@ -438,24 +434,67 @@ func (ec *EngineController) enqueueInstanceManagerChange(obj interface{}) { } } -func (ec *EngineController) CreateInstance(obj interface{}) (*longhorn.InstanceProcess, error) { +func (ec *EngineController) findInstanceManagerAndIPs(obj interface{}) (im *longhorn.InstanceManager, initiatorIP string, targetIP string, err error) { + e, ok := obj.(*longhorn.Engine) + if !ok { + return nil, "", "", fmt.Errorf("invalid object for engine: %v", obj) + } + + initiatorIM, err := ec.ds.GetInstanceManagerByInstanceRO(obj, false) + if err != nil { + return nil, "", "", err + } + + initiatorIP = initiatorIM.Status.IP + targetIP = initiatorIM.Status.IP + im = initiatorIM + + // Target, or called Target Instance, is on another node + if e.Spec.TargetNodeID != "" { + targetIM, err := ec.ds.GetInstanceManagerByInstanceRO(obj, true) + if err != nil { + return nil, "", "", err + } + + targetIP = targetIM.Status.IP + + if !e.Status.TargetInstanceReplacementCreated && e.Status.CurrentTargetNodeID == "" { + im = targetIM + } + } + + return im, initiatorIP, targetIP, nil +} + +func (ec *EngineController) CreateInstance(obj interface{}, isInstanceOnRemoteNode bool) (*longhorn.InstanceProcess, error) { e, ok := obj.(*longhorn.Engine) if !ok { return nil, fmt.Errorf("invalid object for engine process creation: %v", obj) } + + log := getLoggerForEngine(ec.logger, e) + if e.Spec.VolumeName == "" || e.Spec.NodeID == "" { return nil, fmt.Errorf("missing parameters for engine instance creation: %v", e) } + frontend := e.Spec.Frontend if e.Spec.DisableFrontend { frontend = longhorn.VolumeFrontendEmpty } - im, err := ec.ds.GetInstanceManagerByInstanceRO(obj) + im, initiatorIP, targetIP, err := ec.findInstanceManagerAndIPs(obj) if err != nil { return nil, err } + log.WithFields(logrus.Fields{ + "instanceManager": im.Name, + "instanceManagerImage": im.Spec.Image, + "initiatorIP": initiatorIP, + "targetIP": targetIP, + }).Info("Creating engine instance") + c, err := engineapi.NewInstanceManagerClient(im, false) if err != nil { return nil, err @@ -482,6 +521,10 @@ func (ec *EngineController) CreateInstance(obj interface{}) (*longhorn.InstanceP return nil, err } + // No need to care about the initiator and target ports if the engine is not being upgraded. + initiatorAddress := net.JoinHostPort(initiatorIP, strconv.Itoa(0)) + targetAddress := net.JoinHostPort(targetIP, strconv.Itoa(e.Status.Port)) + return c.EngineInstanceCreate(&engineapi.EngineInstanceCreateRequest{ Engine: e, VolumeFrontend: frontend, @@ -490,9 +533,8 @@ func (ec *EngineController) CreateInstance(obj interface{}) (*longhorn.InstanceP DataLocality: v.Spec.DataLocality, ImIP: im.Status.IP, EngineCLIAPIVersion: cliAPIVersion, - UpgradeRequired: false, - InitiatorAddress: im.Status.IP, - TargetAddress: im.Status.IP, + InitiatorAddress: initiatorAddress, + TargetAddress: targetAddress, }) } @@ -511,7 +553,7 @@ func (ec *EngineController) DeleteInstance(obj interface{}) (err error) { log.Warn("Engine does not set instance manager name and node ID, will skip the actual instance deletion") return nil } - im, err = ec.ds.GetInstanceManagerByInstance(obj) + im, err = ec.ds.GetInstanceManagerByInstance(obj, false) if err != nil { log.WithError(err).Warn("Failed to detect instance manager for engine, will skip the actual instance deletion") return nil @@ -589,29 +631,256 @@ func (ec *EngineController) DeleteInstance(obj interface{}) (err error) { return err } + if e.Status.CurrentTargetNodeID != "" { + err = c.EngineInstanceDeleteTarget(&engineapi.EngineInstanceDeleteTargetRequest{ + Engine: e, + }) + } + if err != nil && !types.ErrorIsNotFound(err) { + return err + } + return nil } -func (ec *EngineController) GetInstance(obj interface{}) (*longhorn.InstanceProcess, error) { +func (ec *EngineController) SuspendInstance(obj interface{}) error { e, ok := obj.(*longhorn.Engine) if !ok { - return nil, fmt.Errorf("invalid object for engine instance get: %v", obj) + return fmt.Errorf("invalid object for engine instance suspension: %v", obj) + } + + if !types.IsDataEngineV2(e.Spec.DataEngine) { + return fmt.Errorf("suspending engine instance is not supported for data engine %v", e.Spec.DataEngine) + } + + if e.Spec.VolumeName == "" || e.Spec.NodeID == "" { + return fmt.Errorf("missing parameters for engine instance suspension: %+v", e) + } + + im, err := ec.ds.GetInstanceManagerByInstanceRO(obj, false) + if err != nil { + return err + } + c, err := engineapi.NewInstanceManagerClient(im, false) + if err != nil { + return err + } + defer c.Close() + + return c.EngineInstanceSuspend(&engineapi.EngineInstanceSuspendRequest{ + Engine: e, + }) +} + +func (ec *EngineController) ResumeInstance(obj interface{}) error { + e, ok := obj.(*longhorn.Engine) + if !ok { + return fmt.Errorf("invalid object for engine instance resumption: %v", obj) + } + + if !types.IsDataEngineV2(e.Spec.DataEngine) { + return fmt.Errorf("resuming engine instance is not supported for data engine %v", e.Spec.DataEngine) + } + + if e.Spec.VolumeName == "" || e.Spec.NodeID == "" { + return fmt.Errorf("missing parameters for engine instance resumption: %+v", e) + } + + im, err := ec.ds.GetInstanceManagerByInstanceRO(obj, false) + if err != nil { + return err + } + c, err := engineapi.NewInstanceManagerClient(im, false) + if err != nil { + return err + } + defer c.Close() + + return c.EngineInstanceResume(&engineapi.EngineInstanceResumeRequest{ + Engine: e, + }) +} + +func (ec *EngineController) SwitchOverTarget(obj interface{}) error { + e, ok := obj.(*longhorn.Engine) + if !ok { + return fmt.Errorf("invalid object for target switchover: %v", obj) + } + + if !types.IsDataEngineV2(e.Spec.DataEngine) { + return fmt.Errorf("target switchover is not supported for data engine %v", e.Spec.DataEngine) + } + + if e.Spec.VolumeName == "" || e.Spec.NodeID == "" { + return fmt.Errorf("missing parameters for target switchover: %+v", e) + } + + initiatorInstance, err := ec.GetInstance(obj, false) + if err != nil { + return errors.Wrapf(err, "failed to get initiator instance %v for switchover", e.Name) + } + + log := getLoggerForEngine(ec.logger, e) + + log.Infof("Preparing to switch over target to node %v. Initiator instance port details: port=%v, targetPort=%v, standbyTargetPort=%v", + e.Spec.TargetNodeID, initiatorInstance.Status.PortStart, initiatorInstance.Status.TargetPortStart, initiatorInstance.Status.StandbyTargetPortStart) + + targetInstance, err := ec.GetInstance(obj, true) + if err != nil { + return errors.Wrapf(err, "failed to get target instance %v for switchover", e.Name) + } + log.Infof("Preparing to switch over target to node %v. Target instance port details: port=%v, targetPort=%v, standbyTargetPort=%v", + e.Spec.TargetNodeID, targetInstance.Status.PortStart, targetInstance.Status.TargetPortStart, targetInstance.Status.StandbyTargetPortStart) + + targetIM, err := ec.getTargetInstanceManagerForSwitchOver(e) + if err != nil { + return err + } + + initiatorIM, err := ec.ds.GetInstanceManagerByInstanceRO(obj, false) + if err != nil { + return err + } + c, err := engineapi.NewInstanceManagerClient(initiatorIM, false) + if err != nil { + return err + } + defer c.Close() + + port := targetInstance.Status.TargetPortStart + if targetInstance.Status.StandbyTargetPortStart != 0 { + port = targetInstance.Status.StandbyTargetPortStart } + log.Infof("Switching over target to %v:%v", targetIM.Status.IP, port) + return c.EngineInstanceSwitchOverTarget(&engineapi.EngineInstanceSwitchOverTargetRequest{ + Engine: e, + TargetAddress: net.JoinHostPort(targetIM.Status.IP, fmt.Sprint(port)), + }) +} + +func (ec *EngineController) getTargetInstanceManagerForSwitchOver(e *longhorn.Engine) (targetIM *longhorn.InstanceManager, err error) { + if e.Spec.TargetNodeID != "" { + targetIM, err = ec.ds.GetRunningInstanceManagerByNodeRO(e.Spec.TargetNodeID, e.Spec.DataEngine) + if err != nil { + return nil, errors.Wrapf(err, "failed to get instance manager on node %v for switching over target", e.Spec.TargetNodeID) + } + + return targetIM, nil + } + + if e.Spec.NodeID == "" { + return nil, fmt.Errorf("nodeID is required for switching over target") + } + + targetIM, err = ec.ds.GetDefaultInstanceManagerByNodeRO(e.Spec.NodeID, e.Spec.DataEngine) + if err != nil { + if !datastore.ErrorIsNotFound(err) { + return nil, err + } + } + + return targetIM, nil +} + +func (ec *EngineController) DeleteTarget(obj interface{}) error { + e, ok := obj.(*longhorn.Engine) + if !ok { + return fmt.Errorf("invalid object for engine target deletion: %v", obj) + } + + if !types.IsDataEngineV2(e.Spec.DataEngine) { + return fmt.Errorf("deleting target for engine instance is not supported for data engine %v", e.Spec.DataEngine) + } + + ec.logger.WithField("engine", e.Name).Info("Deleting target instance") + + if e.Spec.VolumeName == "" || e.Spec.NodeID == "" { + return fmt.Errorf("missing parameters for engine target deletion: %+v", e) + } + + targetNodeID := e.Spec.NodeID + if e.Spec.NodeID == e.Spec.TargetNodeID { + targetNodeID = e.Status.CurrentTargetNodeID + } + if targetNodeID == "" { + return fmt.Errorf("missing target node ID for target deletion: %+v", e) + } + + im, err := ec.ds.GetRunningInstanceManagerByNodeRO(targetNodeID, e.Spec.DataEngine) + if err != nil { + return err + } + c, err := engineapi.NewInstanceManagerClient(im, false) + if err != nil { + return err + } + defer c.Close() + + return c.EngineInstanceDeleteTarget(&engineapi.EngineInstanceDeleteTargetRequest{ + Engine: e, + }) +} + +func (ec *EngineController) RequireRemoteTargetInstance(obj interface{}) (bool, error) { + e, ok := obj.(*longhorn.Engine) + if !ok { + return false, fmt.Errorf("invalid object for checking engine instance remote target requirement: %v", obj) + } + + return e.Spec.TargetNodeID != "" && e.Spec.TargetNodeID != e.Spec.NodeID, nil +} + +func (ec *EngineController) IsEngine(obj interface{}) bool { + _, ok := obj.(*longhorn.Engine) + return ok +} + +func (ec *EngineController) GetInstance(obj interface{}, isInstanceOnRemoteNode bool) (*longhorn.InstanceProcess, error) { + e, ok := obj.(*longhorn.Engine) + if !ok { + return nil, fmt.Errorf("invalid object for engine instance get: %v", obj) + } var ( im *longhorn.InstanceManager err error ) - if e.Status.InstanceManagerName == "" { - im, err = ec.ds.GetInstanceManagerByInstanceRO(obj) + + nodeID := e.Spec.NodeID + instanceManagerName := e.Status.InstanceManagerName + if isInstanceOnRemoteNode { + nodeID = e.Spec.TargetNodeID + im, err := ec.ds.GetRunningInstanceManagerByNodeRO(nodeID, e.Spec.DataEngine) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "failed to get running instance manager for engine %v", e.Name) + } + instanceManagerName = im.Name + } + + if instanceManagerName == "" { + if e.Spec.DesireState == longhorn.InstanceStateRunning && e.Status.CurrentState == longhorn.InstanceStateSuspended { + im, err = ec.ds.GetRunningInstanceManagerByNodeRO(nodeID, e.Spec.DataEngine) + } else { + im, err = ec.ds.GetInstanceManagerByInstanceRO(obj, false) } - } else { - im, err = ec.ds.GetInstanceManagerRO(e.Status.InstanceManagerName) if err != nil { return nil, err } + } else if im == nil { + im, err = ec.ds.GetInstanceManagerRO(instanceManagerName) + if err != nil { + if !apierrors.IsNotFound(err) { + return nil, err + } + if types.IsDataEngineV2(e.Spec.DataEngine) { + im, err = ec.ds.GetRunningInstanceManagerByNodeRO(nodeID, e.Spec.DataEngine) + if err != nil { + return nil, errors.Wrapf(err, "failed to get running instance manager for engine %v", e.Name) + } + } else { + return nil, err + } + } } c, err := engineapi.NewInstanceManagerClient(im, false) if err != nil { @@ -769,16 +1038,58 @@ func (m *EngineMonitor) sync() bool { return false } - if err := m.refresh(engine); err == nil || !apierrors.IsConflict(errors.Cause(err)) { - utilruntime.HandleError(errors.Wrapf(err, "failed to update status for engine %v", m.Name)) - break + err = m.refresh(engine) + if err != nil && apierrors.IsConflict(errors.Cause(err)) { + // Retry if the error is due to conflict + continue + } + if types.IsDataEngineV2(engine.Spec.DataEngine) && err != nil && apierrors.IsNotFound(errors.Cause(err)) { + upgrading, upgradingCheckErr := m.ds.IsNodeDataEngineUpgradeRequested(engine.Spec.NodeID) + if upgrading { + updated, updatedCheckErr := m.isInstanceManagerUpdated(engine) + if updated { + for replicaName := range engine.Status.ReplicaModeMap { + replica, replicaErr := m.ds.GetReplicaRO(replicaName) + if replicaErr != nil { + m.logger.WithError(replicaErr).Errorf("Failed to get replica %v", replicaName) + continue + } + if replica.Spec.NodeID == engine.Spec.NodeID && + replica.Status.CurrentState != longhorn.InstanceStateError && + replica.Status.CurrentState != longhorn.InstanceStateStopped { + m.logger.Warnf("Replica %v in state %v is still on the node %v, will retry updating status later", + replicaName, replica.Status.CurrentState, replica.Spec.NodeID) + return false + } + } + } else { + m.logger.Warnf("v2 data engine %v is being upgraded, will retry updating status later", engine.Name) + return false + } + if updatedCheckErr != nil { + upgradingCheckErr = errors.Wrapf(updatedCheckErr, "failed to check if the instance manager is updated") + } + } + if upgradingCheckErr != nil { + err = errors.Wrapf(upgradingCheckErr, "failed to check if the engine %v is being upgraded", engine.Name) + } } - // Retry if the error is due to conflict + utilruntime.HandleError(errors.Wrapf(err, "failed to update status for engine %v", m.Name)) + break } return false } +func (m *EngineMonitor) isInstanceManagerUpdated(engine *longhorn.Engine) (bool, error) { + defaultIM, err := m.ds.GetDefaultInstanceManagerByNodeRO(engine.Spec.NodeID, engine.Spec.DataEngine) + if err != nil { + return false, err + } + + return defaultIM.Name == engine.Status.InstanceManagerName, nil +} + func (m *EngineMonitor) refresh(engine *longhorn.Engine) error { existingEngine := engine.DeepCopy() @@ -2105,7 +2416,53 @@ func (ec *EngineController) Upgrade(e *longhorn.Engine, log *logrus.Entry) (err } } } else { - return errors.Wrapf(err, "upgrading engine %v with data engine %v is not supported", e.Name, e.Spec.DataEngine) + // Check if the initiator instance is running + im, err := ec.ds.GetRunningInstanceManagerByNodeRO(e.Spec.NodeID, longhorn.DataEngineTypeV2) + if err != nil { + return err + } + if im.Status.CurrentState != longhorn.InstanceManagerStateRunning { + return fmt.Errorf("instance manager %v for initiating instance %v is not running", im.Name, e.Name) + } + + initiatorIMClient, err := engineapi.NewInstanceManagerClient(im, false) + if err != nil { + return err + } + defer initiatorIMClient.Close() + + if _, err := initiatorIMClient.InstanceGet(e.Spec.DataEngine, e.Name, string(longhorn.InstanceManagerTypeEngine)); err != nil { + return err + } + + _, ok := im.Status.InstanceEngines[e.Name] + if !ok { + return fmt.Errorf("initiator instance %v is not found in engine list", e.Name) + } + + // Check whether the target instance is existing + im, err = ec.ds.GetRunningInstanceManagerByNodeRO(e.Spec.TargetNodeID, longhorn.DataEngineTypeV2) + if err != nil { + return err + } + if im.Status.CurrentState != longhorn.InstanceManagerStateRunning { + return fmt.Errorf("instance manager %v for target instance %v is not running", im.Name, e.Name) + } + + targetIMClient, err := engineapi.NewInstanceManagerClient(im, false) + if err != nil { + return err + } + defer targetIMClient.Close() + + if _, err := targetIMClient.InstanceGet(e.Spec.DataEngine, e.Name, string(longhorn.InstanceManagerTypeEngine)); err != nil { + return err + } + + _, ok = im.Status.InstanceEngines[e.Name] + if !ok { + return fmt.Errorf("target instance %v is not found in engine list", e.Name) + } } log.Infof("Engine has been upgraded from %v to %v", e.Status.CurrentImage, e.Spec.Image) @@ -2116,6 +2473,7 @@ func (ec *EngineController) Upgrade(e *longhorn.Engine, log *logrus.Entry) (err e.Status.ReplicaTransitionTimeMap = nil e.Status.RestoreStatus = nil e.Status.RebuildStatus = nil + e.Status.TargetInstanceReplacementCreated = false return nil } @@ -2184,7 +2542,7 @@ func (ec *EngineController) UpgradeEngineInstance(e *longhorn.Engine, log *logru // isResponsibleFor picks a running node that has e.Status.CurrentImage deployed. // We need e.Status.CurrentImage deployed on the node to make request to the corresponding engine instance. // Prefer picking the node e.Spec.NodeID if it meet the above requirement. -func (ec *EngineController) isResponsibleFor(e *longhorn.Engine, defaultEngineImage string) (bool, error) { +func (ec *EngineController) isResponsibleFor(e *longhorn.Engine) (bool, error) { var err error defer func() { err = errors.Wrap(err, "error while checking isResponsibleFor") diff --git a/controller/instance_handler.go b/controller/instance_handler.go index a943d28d03..fc5d6b53b0 100644 --- a/controller/instance_handler.go +++ b/controller/instance_handler.go @@ -35,10 +35,16 @@ type InstanceHandler struct { } type InstanceManagerHandler interface { - GetInstance(obj interface{}) (*longhorn.InstanceProcess, error) - CreateInstance(obj interface{}) (*longhorn.InstanceProcess, error) + GetInstance(obj interface{}, isInstanceOnRemoteNode bool) (*longhorn.InstanceProcess, error) + CreateInstance(obj interface{}, isInstanceOnRemoteNode bool) (*longhorn.InstanceProcess, error) DeleteInstance(obj interface{}) error LogInstance(ctx context.Context, obj interface{}) (*engineapi.InstanceManagerClient, *imapi.LogStream, error) + SuspendInstance(obj interface{}) error + ResumeInstance(obj interface{}) error + SwitchOverTarget(obj interface{}) error + DeleteTarget(obj interface{}) error + RequireRemoteTargetInstance(obj interface{}) (bool, error) + IsEngine(obj interface{}) bool } func NewInstanceHandler(ds *datastore.DataStore, instanceManagerHandler InstanceManagerHandler, eventRecorder record.EventRecorder) *InstanceHandler { @@ -49,10 +55,130 @@ func NewInstanceHandler(ds *datastore.DataStore, instanceManagerHandler Instance } } -func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceManager, instanceName string, spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus, instances map[string]longhorn.InstanceProcess) { +func (h *InstanceHandler) syncStatusIPsAndPorts(im *longhorn.InstanceManager, obj interface{}, spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus, instanceName string, instance longhorn.InstanceProcess) { + imPod, err := h.ds.GetPodRO(im.Namespace, im.Name) + if err != nil { + logrus.WithError(err).Errorf("Failed to get instance manager pod from %v", im.Name) + return + } + if imPod == nil { + logrus.Warnf("Instance manager pod from %v not exist in datastore", im.Name) + return + } + + storageIP := h.ds.GetStorageIPFromPod(imPod) + if status.StorageIP != storageIP { + status.StorageIP = storageIP + logrus.Infof("Instance %v starts running, Storage IP %v", instanceName, status.StorageIP) + } + + if status.IP != im.Status.IP { + status.IP = im.Status.IP + logrus.Infof("Instance %v starts running, IP %v", instanceName, status.IP) + } + + if status.Port != int(instance.Status.PortStart) { + status.Port = int(instance.Status.PortStart) + logrus.Infof("Instance %v starts running, Port %d", instanceName, status.Port) + } + + if !h.instanceManagerHandler.IsEngine(obj) { + return + } + + if types.IsDataEngineV2(spec.DataEngine) && spec.TargetNodeID != "" { + targetIM, err := h.ds.GetRunningInstanceManagerByNodeRO(spec.TargetNodeID, spec.DataEngine) + if err != nil { + logrus.WithError(err).Errorf("Failed to get running instance manager for node %s", spec.TargetNodeID) + return + } + targetClient, err := engineapi.NewInstanceManagerClient(targetIM, false) + if err != nil { + logrus.WithError(err).Errorf("Failed to get instance manager client for target instance manager %v", targetIM.Name) + return + } + defer targetClient.Close() + + targetInstance, err := targetClient.InstanceGet(spec.DataEngine, instanceName, string(longhorn.InstanceManagerTypeEngine)) + if err != nil { + logrus.WithError(err).Errorf("Failed to get target instance %s from instance manager %s", instanceName, targetIM.Name) + } else { + if status.TargetIP != targetIM.Status.IP { + status.TargetIP = targetIM.Status.IP + logrus.Infof("Instance %v starts running, Target IP %v", instanceName, status.TargetIP) + } + + if status.TargetPort != int(targetInstance.Status.TargetPortStart) { + status.TargetPort = int(targetInstance.Status.TargetPortStart) + logrus.Infof("Instance %v starts running, Target IP %v", instanceName, status.TargetIP) + } + + // Get storage target IP from target instance manager + targetIMPod, err := h.ds.GetPodRO(targetIM.Namespace, targetIM.Name) + if err != nil { + logrus.WithError(err).Errorf("Failed to get instance manager pod from %v", targetIM.Name) + return + } + if targetIMPod == nil { + logrus.Warnf("Instance manager pod from %v not exist in datastore", targetIM.Name) + return + } + + storageTargetIP := h.ds.GetStorageIPFromPod(targetIMPod) + if status.StorageTargetIP != storageTargetIP { + status.StorageTargetIP = storageTargetIP + logrus.Infof("Instance %v starts running, Storage Target IP %v", instanceName, status.StorageTargetIP) + } + } + + // Check if the target instance replacement is running on the target node + if status.CurrentTargetNodeID != spec.TargetNodeID && !status.TargetInstanceReplacementCreated { + running, err := h.isTargetInstanceReplacementRunning(instanceName, spec, status) + if err != nil { + logrus.WithError(err).Errorf("Failed to check if target instance %v is running", instanceName) + return + } + if running { + logrus.Infof("Target instance replacement %v is running on target node %v", instanceName, spec.TargetNodeID) + status.TargetInstanceReplacementCreated = true + } + } + } else { + if status.StorageTargetIP != storageIP { + status.StorageTargetIP = storageIP + logrus.Infof("Instance %v starts running, Storage Target IP %v", instanceName, status.StorageTargetIP) + } + + if status.TargetIP != im.Status.IP { + status.TargetIP = im.Status.IP + logrus.Infof("Instance %v starts running, Target IP %v", instanceName, status.TargetPort) + } + + if status.TargetPort != int(instance.Status.TargetPortStart) { + status.TargetPort = int(instance.Status.TargetPortStart) + logrus.Infof("Instance %v starts running, Target Port %v", instanceName, status.TargetPort) + } + + status.CurrentTargetNodeID = "" + status.TargetInstanceReplacementCreated = false + } +} + +func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceManager, obj interface{}, spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus, instances map[string]longhorn.InstanceProcess) { + runtimeObj, ok := obj.(runtime.Object) + if !ok { + return + } + instanceName, err := h.getNameFromObj(runtimeObj) + if err != nil { + return + } + defer func() { if status.CurrentState == longhorn.InstanceStateStopped { status.InstanceManagerName = "" + status.CurrentTargetNodeID = "" + status.TargetInstanceReplacementCreated = false } }() @@ -72,8 +198,11 @@ func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceMan status.CurrentImage = "" } status.IP = "" + status.TargetIP = "" status.StorageIP = "" + status.StorageTargetIP = "" status.Port = 0 + status.TargetPort = 0 h.resetInstanceErrorCondition(status) return } @@ -82,32 +211,65 @@ func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceMan im.Status.CurrentState == longhorn.InstanceManagerStateError || im.DeletionTimestamp != nil { if status.Started { - if status.CurrentState != longhorn.InstanceStateError { - logrus.Warnf("Marking the instance as state ERROR since failed to find the instance manager for the running instance %v", instanceName) + if h.isEngineOfV2DataEngine(obj, spec.DataEngine) { + if h.isV2DataEngineBeingUpgraded(spec, status) { + logrus.Warnf("Skipping the instance %v since the instance manager %v is %v", instanceName, im.Name, im.Status.CurrentState) + return + } + + if spec.Image == status.CurrentImage { + if status.CurrentState != longhorn.InstanceStateError { + upgradeRequested, err := h.ds.IsNodeDataEngineUpgradeRequested(spec.NodeID) + if err != nil { + // TODO: should we return here or mark the instance as error? + logrus.WithError(err).Errorf("Failed to check if node %v is being upgrade requested", spec.NodeID) + return + } + if upgradeRequested { + logrus.Warnf("Skipping the instance %v since the instance manager %v is %v since the node %v is being upgrade requested", + instanceName, im.Name, im.Status.CurrentState, spec.NodeID) + return + } + logrus.Warnf("Marking the instance as state ERROR since failed to find the instance manager for the running instance %v", instanceName) + } + status.CurrentState = longhorn.InstanceStateError + } else { + logrus.Warnf("Skipping the instance %v since the instance manager %v is %v and spec image %v is different from the current image %v", + instanceName, im.Name, im.Status.CurrentState, spec.Image, status.CurrentImage) + return + } + } else { + if status.CurrentState != longhorn.InstanceStateError { + logrus.Warnf("Marking the instance as state ERROR since failed to find the instance manager for the running instance %v", instanceName) + } + status.CurrentState = longhorn.InstanceStateError } - status.CurrentState = longhorn.InstanceStateError } else { status.CurrentState = longhorn.InstanceStateStopped } status.CurrentImage = "" status.IP = "" + status.TargetIP = "" status.StorageIP = "" + status.StorageTargetIP = "" status.Port = 0 + status.TargetPort = 0 h.resetInstanceErrorCondition(status) return } if im.Status.CurrentState == longhorn.InstanceManagerStateStarting { if status.Started { - if status.CurrentState != longhorn.InstanceStateError { - logrus.Warnf("Marking the instance as state ERROR since the starting instance manager %v shouldn't contain the running instance %v", im.Name, instanceName) + if h.isEngineOfV2DataEngine(obj, spec.DataEngine) { + if spec.Image == status.CurrentImage { + if h.isV2DataEngineBeingUpgraded(spec, status) { + return + } + h.handleIfInstanceManagerStarting(im.Name, instanceName, status) + } + } else { + h.handleIfInstanceManagerStarting(im.Name, instanceName, status) } - status.CurrentState = longhorn.InstanceStateError - status.CurrentImage = "" - status.IP = "" - status.StorageIP = "" - status.Port = 0 - h.resetInstanceErrorCondition(status) } return } @@ -115,23 +277,56 @@ func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceMan instance, exists := instances[instanceName] if !exists { if status.Started { - if status.CurrentState != longhorn.InstanceStateError { - logrus.Warnf("Marking the instance as state ERROR since failed to find the instance status in instance manager %v for the running instance %v", im.Name, instanceName) + if h.isEngineOfV2DataEngine(obj, spec.DataEngine) { + if h.isV2DataEngineBeingUpgraded(spec, status) { + logrus.Warnf("Skipping checking the instance %v since the instance manager %v is in %v state", + instanceName, im.Name, im.Status.CurrentState) + return + } + if status.CurrentState != longhorn.InstanceStateError { + // Because the async nature, directly check instance here + if spec.TargetNodeID != "" { + logrus.Infof("Recreated initiator instance %v is not found in instance manager %v, directly checking it in instance manager %v", + instanceName, im.Name, spec.NodeID) + + exist, err := h.isInstanceExist(instanceName, spec) + if exist { + logrus.Infof("Initiator instance %v is found in instance manager %v", instanceName, spec.NodeID) + return + } + if err != nil { + logrus.WithError(err).Errorf("Failed to check if recreated initiator instance %v exists in instance manager %v", + instanceName, spec.NodeID) + return + } + } + logrus.Warnf("Marking the instance as state ERROR since failed to find the instance status in instance manager %v for the running instance %v", + im.Name, instanceName) + } + status.CurrentState = longhorn.InstanceStateError + } else { + if status.CurrentState != longhorn.InstanceStateError { + logrus.Warnf("Marking the instance as state ERROR since failed to find the instance status in instance manager %v for the running instance %v", im.Name, instanceName) + } + status.CurrentState = longhorn.InstanceStateError } - status.CurrentState = longhorn.InstanceStateError } else { status.CurrentState = longhorn.InstanceStateStopped } + status.CurrentImage = "" status.IP = "" + status.TargetIP = "" status.StorageIP = "" + status.StorageTargetIP = "" status.Port = 0 + status.TargetPort = 0 h.resetInstanceErrorCondition(status) return } if status.InstanceManagerName != "" && status.InstanceManagerName != im.Name { - logrus.Errorf("The related process of instance %v is found in the instance manager %v, but the instance manager name in the instance status is %v. "+ + logrus.Warnf("The related process of instance %v is found in the instance manager %v, but the instance manager name in the instance status is %v. "+ "The instance manager name shouldn't change except for cleanup", instanceName, im.Name, status.InstanceManagerName) } @@ -146,44 +341,35 @@ func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceMan status.CurrentState = longhorn.InstanceStateStarting status.CurrentImage = "" status.IP = "" + status.TargetIP = "" status.StorageIP = "" + status.StorageTargetIP = "" status.Port = 0 + status.TargetPort = 0 h.resetInstanceErrorCondition(status) case longhorn.InstanceStateRunning: status.CurrentState = longhorn.InstanceStateRunning - imPod, err := h.ds.GetPodRO(im.Namespace, im.Name) - if err != nil { - logrus.WithError(err).Errorf("Failed to get instance manager pod from %v", im.Name) - return - } - - if imPod == nil { - logrus.Warnf("Instance manager pod from %v not exist in datastore", im.Name) - return - } - - storageIP := h.ds.GetStorageIPFromPod(imPod) - if status.StorageIP != storageIP { - status.StorageIP = storageIP - logrus.Warnf("Instance %v starts running, Storage IP %v", instanceName, status.StorageIP) - } + h.syncStatusIPsAndPorts(im, obj, spec, status, instanceName, instance) - if status.IP != im.Status.IP { - status.IP = im.Status.IP - logrus.Warnf("Instance %v starts running, IP %v", instanceName, status.IP) - } - if status.Port != int(instance.Status.PortStart) { - status.Port = int(instance.Status.PortStart) - logrus.Warnf("Instance %v starts running, Port %d", instanceName, status.Port) - } // only set CurrentImage when first started, since later we may specify // different spec.Image for upgrade if status.CurrentImage == "" { - status.CurrentImage = spec.Image + if types.IsDataEngineV1(spec.DataEngine) { + status.CurrentImage = spec.Image + } else { + if h.instanceManagerHandler.IsEngine(obj) { + status.CurrentImage = spec.Image + } else { + status.CurrentImage = im.Spec.Image + } + } } - h.syncInstanceCondition(instance, status) + h.syncInstanceCondition(instance, status) + case longhorn.InstanceStateSuspended: + status.CurrentState = longhorn.InstanceStateSuspended + status.CurrentTargetNodeID = spec.TargetNodeID case longhorn.InstanceStateStopping: if status.Started { status.CurrentState = longhorn.InstanceStateError @@ -192,8 +378,11 @@ func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceMan } status.CurrentImage = "" status.IP = "" + status.TargetIP = "" status.StorageIP = "" + status.StorageTargetIP = "" status.Port = 0 + status.TargetPort = 0 h.resetInstanceErrorCondition(status) case longhorn.InstanceStateStopped: if status.Started { @@ -203,8 +392,11 @@ func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceMan } status.CurrentImage = "" status.IP = "" + status.TargetIP = "" status.StorageIP = "" + status.StorageTargetIP = "" status.Port = 0 + status.TargetPort = 0 h.resetInstanceErrorCondition(status) default: if status.CurrentState != longhorn.InstanceStateError { @@ -213,8 +405,11 @@ func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceMan status.CurrentState = longhorn.InstanceStateError status.CurrentImage = "" status.IP = "" + status.TargetIP = "" status.StorageIP = "" + status.StorageTargetIP = "" status.Port = 0 + status.TargetPort = 0 h.resetInstanceErrorCondition(status) } } @@ -266,7 +461,7 @@ func (h *InstanceHandler) ReconcileInstanceState(obj interface{}, spec *longhorn } } // There should be an available instance manager for a scheduled instance when its related engine image is compatible - if im == nil && spec.Image != "" && spec.NodeID != "" { + if im == nil && spec.NodeID != "" && h.isSpecImageReady(obj, spec) { dataEngineEnabled, err := h.ds.IsDataEngineEnabled(spec.DataEngine) if err != nil { return err @@ -280,7 +475,7 @@ func (h *InstanceHandler) ReconcileInstanceState(obj interface{}, spec *longhorn return err } if !isNodeDownOrDeleted { - im, err = h.ds.GetInstanceManagerByInstanceRO(obj) + im, err = h.getInstanceManagerRO(obj, spec, status) if err != nil { return errors.Wrapf(err, "failed to get instance manager for instance %v", instanceName) } @@ -328,16 +523,45 @@ func (h *InstanceHandler) ReconcileInstanceState(obj interface{}, spec *longhorn if i, exists := instances[instanceName]; exists && i.Status.State == longhorn.InstanceStateRunning { status.Started = true - break + if h.isEngineOfV2DataEngine(obj, spec.DataEngine) { + if isDataEngineNotBeingLiveUpgraded(spec, status) { + // Not in data engine live upgrade + break + } + } else { + break + } } // there is a delay between createInstance() invocation and InstanceManager update, // createInstance() may be called multiple times. - if status.CurrentState != longhorn.InstanceStateStopped { - break + if h.isEngineOfV2DataEngine(obj, spec.DataEngine) { + if status.CurrentState != longhorn.InstanceStateStopped && status.CurrentState != longhorn.InstanceStateSuspended { + if status.CurrentState != longhorn.InstanceStateRunning || isDataEngineNotBeingLiveUpgraded(spec, status) { + break + } + } + } else { + if status.CurrentState != longhorn.InstanceStateStopped { + break + } } - err = h.createInstance(instanceName, spec.DataEngine, runtimeObj) + if h.isEngineOfV2DataEngine(obj, spec.DataEngine) { + if spec.TargetNodeID != "" { + if h.isEngineResumeRequired(spec, status) { + // Resume the suspended initiator instance + err = h.resumeInstance(instanceName, spec.DataEngine, runtimeObj) + } else { + // Create target instance if it's not created yet + err = h.createInstance(instanceName, spec.DataEngine, runtimeObj) + } + } else { + err = h.createInstance(instanceName, spec.DataEngine, runtimeObj) + } + } else { + err = h.createInstance(instanceName, spec.DataEngine, runtimeObj) + } if err != nil { return err } @@ -359,12 +583,52 @@ func (h *InstanceHandler) ReconcileInstanceState(obj interface{}, spec *longhorn } } } + if h.isEngineOfV2DataEngine(obj, spec.DataEngine) { + if status.CurrentTargetNodeID != "" { + targetIM, err := h.ds.GetRunningInstanceManagerByNodeRO(status.CurrentTargetNodeID, spec.DataEngine) + if err != nil { + return err + } + if targetIM != nil { + if err := h.deleteTarget(instanceName, runtimeObj); err != nil { + if !types.ErrorIsNotFound(err) { + return err + } + } + } + } + } status.Started = false + case longhorn.InstanceStateSuspended: + if !h.isEngineOfV2DataEngine(obj, spec.DataEngine) { + return fmt.Errorf("instance %v is not an engine of v2 volume", instanceName) + } + + if err := h.suspendInstance(instanceName, runtimeObj); err != nil { + return err + } + + if err := h.switchOverTarget(instanceName, runtimeObj); err != nil { + logrus.Infof("Resuming instance %v after failing to switch over target", instanceName) + errResume := h.resumeInstance(instanceName, spec.DataEngine, runtimeObj) + if errResume != nil { + logrus.WithError(errResume).Errorf("Failed to resume instance %v after failing to switch over target", instanceName) + } + return err + } + + if spec.TargetNodeID != status.CurrentTargetNodeID { + if err := h.deleteTarget(instanceName, runtimeObj); err != nil { + if !types.ErrorIsNotFound(err) { + return err + } + } + } default: return fmt.Errorf("unknown instance desire state: desire %v", spec.DesireState) } - h.syncStatusWithInstanceManager(im, instanceName, spec, status, instances) + h.syncStatusWithInstanceManager(im, obj, spec, status, instances) switch status.CurrentState { case longhorn.InstanceStateRunning: @@ -373,7 +637,9 @@ func (h *InstanceHandler) ReconcileInstanceState(obj interface{}, spec *longhorn if spec.NodeID != im.Spec.NodeID { status.CurrentState = longhorn.InstanceStateError status.IP = "" + status.TargetIP = "" status.StorageIP = "" + status.StorageTargetIP = "" err := fmt.Errorf("instance %v NodeID %v is not the same as the instance manager %v NodeID %v", instanceName, spec.NodeID, im.Name, im.Spec.NodeID) return err @@ -447,25 +713,83 @@ func (h *InstanceHandler) printInstanceLogs(instanceName string, obj runtime.Obj return nil } -func (h *InstanceHandler) createInstance(instanceName string, dataEngine longhorn.DataEngineType, obj runtime.Object) error { - _, err := h.instanceManagerHandler.GetInstance(obj) - if err == nil { - return nil - } - if !types.ErrorIsNotFound(err) && !(types.IsDataEngineV2(dataEngine) && types.ErrorIsStopped(err)) { - return errors.Wrapf(err, "Failed to get instance process %v", instanceName) - } +func (h *InstanceHandler) createInstance(instanceName string, dataEngine longhorn.DataEngineType, obj runtime.Object) (err error) { + if h.isEngineOfV2DataEngine(obj, dataEngine) { + instanceExists := false + targetInstanceRequired := false - logrus.Infof("Creating instance %v", instanceName) - if _, err := h.instanceManagerHandler.CreateInstance(obj); err != nil { - if !types.ErrorAlreadyExists(err) { - h.eventRecorder.Eventf(obj, corev1.EventTypeWarning, constant.EventReasonFailedStarting, "Error starting %v: %v", instanceName, err) - return err + instance, err := h.instanceManagerHandler.GetInstance(obj, false) + if err == nil { + instanceExists = true + + targetInstanceRequired, err = h.instanceManagerHandler.RequireRemoteTargetInstance(obj) + if err != nil { + return errors.Wrapf(err, "failed to check if remote target instance for %v is required", instanceName) + } + if targetInstanceRequired { + _, err = h.instanceManagerHandler.GetInstance(obj, true) + if err == nil { + return nil + } + } else { + if instance.Status.StandbyTargetPortStart != 0 || instance.Status.TargetPortStart != 0 { + return nil + } + + targetInstanceRequired = true + err = fmt.Errorf("cannot find local target instance for %v", instanceName) + } } - // Already exists, lost track may due to previous datastore conflict - return nil + + if !types.ErrorIsNotFound(err) && !(types.IsDataEngineV2(dataEngine) && types.ErrorIsStopped(err)) { + return errors.Wrapf(err, "failed to get instance %v", instanceName) + } + + if !instanceExists { + logrus.Infof("Creating instance %v", instanceName) + if _, err := h.instanceManagerHandler.CreateInstance(obj, false); err != nil { + if !types.ErrorAlreadyExists(err) { + h.eventRecorder.Eventf(obj, corev1.EventTypeWarning, constant.EventReasonFailedStarting, "Error starting %v: %v", instanceName, err) + return err + } + // Already exists, lost track may due to previous datastore conflict + return nil + } + h.eventRecorder.Eventf(obj, corev1.EventTypeNormal, constant.EventReasonStart, "Starts instance %v", instanceName) + } + + if targetInstanceRequired { + logrus.Infof("Creating target instance %v", instanceName) + if _, err := h.instanceManagerHandler.CreateInstance(obj, true); err != nil { + if !types.ErrorAlreadyExists(err) { + h.eventRecorder.Eventf(obj, corev1.EventTypeWarning, constant.EventReasonFailedStarting, "Error starting %v: %v", instanceName, err) + return err + } + // Already exists, lost track may due to previous datastore conflict + return nil + } + h.eventRecorder.Eventf(obj, corev1.EventTypeNormal, constant.EventReasonStart, "Starts target instance %v", instanceName) + } + } else { + _, err := h.instanceManagerHandler.GetInstance(obj, false) + if err == nil { + return nil + } + if !types.ErrorIsNotFound(err) && !(types.IsDataEngineV2(dataEngine) && types.ErrorIsStopped(err)) { + return errors.Wrapf(err, "Failed to get instance process %v", instanceName) + } + + logrus.Infof("Creating instance %v", instanceName) + if _, err := h.instanceManagerHandler.CreateInstance(obj, false); err != nil { + if !types.ErrorAlreadyExists(err) { + h.eventRecorder.Eventf(obj, corev1.EventTypeWarning, constant.EventReasonFailedStarting, "Error starting %v: %v", instanceName, err) + return err + } + // Already exists, lost track may due to previous datastore conflict + return nil + } + h.eventRecorder.Eventf(obj, corev1.EventTypeNormal, constant.EventReasonStart, "Starts %v", instanceName) } - h.eventRecorder.Eventf(obj, corev1.EventTypeNormal, constant.EventReasonStart, "Starts %v", instanceName) return nil } @@ -481,3 +805,191 @@ func (h *InstanceHandler) deleteInstance(instanceName string, obj runtime.Object return nil } + +func (h *InstanceHandler) suspendInstance(instanceName string, obj runtime.Object) error { + logrus.Infof("Suspending instance %v", instanceName) + + if _, err := h.instanceManagerHandler.GetInstance(obj, false); err != nil { + return errors.Wrapf(err, "failed to get instance %v for suspension", instanceName) + } + + if err := h.instanceManagerHandler.SuspendInstance(obj); err != nil { + return errors.Wrapf(err, "failed to suspend instance %v", instanceName) + } + + return nil +} + +func (h *InstanceHandler) resumeInstance(instanceName string, dataEngine longhorn.DataEngineType, obj runtime.Object) error { + logrus.Infof("Resuming instance %v", instanceName) + + if types.IsDataEngineV1(dataEngine) { + return fmt.Errorf("resuming instance is not supported for data engine %v", dataEngine) + } + + if _, err := h.instanceManagerHandler.GetInstance(obj, false); err != nil { + return errors.Wrapf(err, "failed to get instance %v for resumption", instanceName) + } + + if err := h.instanceManagerHandler.ResumeInstance(obj); err != nil { + return errors.Wrapf(err, "failed to resume instance %v", instanceName) + } + + return nil +} + +func (h *InstanceHandler) switchOverTarget(instanceName string, obj runtime.Object) error { + logrus.Infof("Switching over target for instance %v", instanceName) + + if err := h.instanceManagerHandler.SwitchOverTarget(obj); err != nil { + return errors.Wrapf(err, "failed to switch over target for instance %s", instanceName) + } + + return nil +} + +func (h *InstanceHandler) deleteTarget(instanceName string, obj runtime.Object) error { + logrus.Infof("Deleting target for instance %s", instanceName) + + if err := h.instanceManagerHandler.DeleteTarget(obj); err != nil { + return errors.Wrapf(err, "failed to delete target for instance %s", instanceName) + } + + return nil +} + +func (h *InstanceHandler) isV2DataEngineBeingUpgraded(spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus) bool { + if !types.IsDataEngineV2(spec.DataEngine) { + return false + } + + upgradeRequested, err := h.ds.IsNodeDataEngineUpgradeRequested(spec.NodeID) + if err != nil { + logrus.WithError(err).Errorf("Failed to get node %v", spec.NodeID) + return false + } + + if !upgradeRequested { + return false + } + + if spec.TargetNodeID == "" { + return false + } + + return spec.NodeID != spec.TargetNodeID && spec.TargetNodeID == status.CurrentTargetNodeID +} + +func isVolumeBeingSwitchedBack(spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus) bool { + return spec.NodeID == spec.TargetNodeID && spec.TargetNodeID != status.CurrentTargetNodeID +} + +func isTargetInstanceReplacementCreated(instance *longhorn.InstanceProcess) bool { + return instance.Status.StandbyTargetPortStart != 0 +} + +func isTargetInstanceRemote(instance *longhorn.InstanceProcess) bool { + return instance.Status.PortStart != 0 && instance.Status.TargetPortStart == 0 +} + +func (h *InstanceHandler) getInstanceManagerRO(obj interface{}, spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus) (*longhorn.InstanceManager, error) { + // Only happen when upgrading instance-manager image + if spec.DesireState == longhorn.InstanceStateRunning && status.CurrentState == longhorn.InstanceStateSuspended { + return h.ds.GetRunningInstanceManagerByNodeRO(spec.NodeID, spec.DataEngine) + } + + return h.ds.GetInstanceManagerByInstanceRO(obj, false) +} + +func (h *InstanceHandler) handleIfInstanceManagerStarting(imName, instanceName string, status *longhorn.InstanceStatus) { + if status.CurrentState != longhorn.InstanceStateError { + logrus.Warnf("Marking the instance as state ERROR since the starting instance manager %v shouldn't contain the running instance %v", imName, instanceName) + } + status.CurrentState = longhorn.InstanceStateError + status.CurrentImage = "" + status.IP = "" + status.TargetIP = "" + status.StorageIP = "" + status.StorageTargetIP = "" + status.Port = 0 + status.TargetPort = 0 + h.resetInstanceErrorCondition(status) +} + +func (h *InstanceHandler) isInstanceExist(instanceName string, spec *longhorn.InstanceSpec) (bool, error) { + var err error + + im, err := h.ds.GetRunningInstanceManagerByNodeRO(spec.NodeID, spec.DataEngine) + if err != nil { + return false, err + } + + client, err := engineapi.NewInstanceManagerClient(im, false) + if err != nil { + return false, err + } + defer client.Close() + + _, err = client.InstanceGet(spec.DataEngine, instanceName, string(longhorn.InstanceManagerTypeEngine)) + if err != nil { + return false, err + } + + return true, nil +} + +func (h *InstanceHandler) isSpecImageReady(obj interface{}, spec *longhorn.InstanceSpec) bool { + if types.IsDataEngineV1(spec.DataEngine) { + return spec.Image != "" + } + + if h.instanceManagerHandler.IsEngine(obj) { + return spec.Image != "" + } + + // spec.image is not required for replica because the image of a v2 replica + // is the same as the running instance manager. + return true +} + +func (h *InstanceHandler) isTargetInstanceReplacementRunning(instanceName string, spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus) (bool, error) { + if spec.TargetNodeID == "" { + return false, nil + } + + logrus.Infof("Checking whether instance %v is running on target node %v", instanceName, spec.TargetNodeID) + + im, err := h.ds.GetRunningInstanceManagerByNodeRO(spec.TargetNodeID, spec.DataEngine) + if err != nil { + return false, errors.Wrapf(err, "failed to get instance manager on node %v for checking if target instance is running", spec.TargetNodeID) + } + + c, err := engineapi.NewInstanceManagerClient(im, false) + if err != nil { + return false, errors.Wrapf(err, "failed to create instance manager client for target instance") + } + defer c.Close() + + instance, err := c.InstanceGet(spec.DataEngine, instanceName, string(longhorn.InstanceManagerTypeEngine)) + if err != nil { + return false, errors.Wrapf(err, "failed to get target instance %v on node %v", instanceName, spec.TargetNodeID) + } + + if isVolumeBeingSwitchedBack(spec, status) { + return isTargetInstanceRemote(instance) && isTargetInstanceReplacementCreated(instance), nil + } + + return true, nil +} + +func (h *InstanceHandler) isEngineOfV2DataEngine(obj interface{}, dataEngine longhorn.DataEngineType) bool { + return types.IsDataEngineV2(dataEngine) && h.instanceManagerHandler.IsEngine(obj) +} + +func (h *InstanceHandler) isEngineResumeRequired(spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus) bool { + return spec.TargetNodeID == status.CurrentTargetNodeID && status.CurrentState == longhorn.InstanceStateSuspended +} + +func isDataEngineNotBeingLiveUpgraded(spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus) bool { + return spec.TargetNodeID == "" && !status.TargetInstanceReplacementCreated +} diff --git a/controller/instance_handler_test.go b/controller/instance_handler_test.go index 68fe69d6a4..2602b4421f 100644 --- a/controller/instance_handler_test.go +++ b/controller/instance_handler_test.go @@ -36,7 +36,7 @@ const ( type MockInstanceManagerHandler struct{} -func (imh *MockInstanceManagerHandler) GetInstance(obj interface{}) (*longhorn.InstanceProcess, error) { +func (imh *MockInstanceManagerHandler) GetInstance(obj interface{}, isInstanceOnRemoteNode bool) (*longhorn.InstanceProcess, error) { metadata, err := meta.Accessor(obj) if err != nil { return nil, err @@ -48,7 +48,7 @@ func (imh *MockInstanceManagerHandler) GetInstance(obj interface{}) (*longhorn.I return &longhorn.InstanceProcess{}, nil } -func (imh *MockInstanceManagerHandler) CreateInstance(obj interface{}) (*longhorn.InstanceProcess, error) { +func (imh *MockInstanceManagerHandler) CreateInstance(obj interface{}, isInstanceOnRemoteNode bool) (*longhorn.InstanceProcess, error) { metadata, err := meta.Accessor(obj) if err != nil { return nil, err @@ -72,10 +72,36 @@ func (imh *MockInstanceManagerHandler) DeleteInstance(obj interface{}) error { return nil } +func (imh *MockInstanceManagerHandler) SuspendInstance(obj interface{}) error { + return fmt.Errorf("SuspendInstance is not mocked") +} + +func (imh *MockInstanceManagerHandler) ResumeInstance(obj interface{}) error { + return fmt.Errorf("ResumeInstance is not mocked") +} + +func (imh *MockInstanceManagerHandler) SwitchOverTarget(obj interface{}) error { + return fmt.Errorf("SwitchOverTarget is not mocked") +} + +func (imh *MockInstanceManagerHandler) DeleteTarget(obj interface{}) error { + // DeleteTarget is not mocked + return nil +} + +func (imh *MockInstanceManagerHandler) IsEngine(obj interface{}) bool { + _, ok := obj.(*longhorn.Engine) + return ok +} + func (imh *MockInstanceManagerHandler) LogInstance(ctx context.Context, obj interface{}) (*engineapi.InstanceManagerClient, *imapi.LogStream, error) { return nil, nil, fmt.Errorf("LogInstance is not mocked") } +func (imh *MockInstanceManagerHandler) RequireRemoteTargetInstance(obj interface{}) (bool, error) { + return false, nil +} + func newEngine(name, currentImage, imName, nodeName, ip string, port int, started bool, currentState, desireState longhorn.InstanceState) *longhorn.Engine { var conditions []longhorn.Condition conditions = types.SetCondition(conditions, @@ -108,8 +134,11 @@ func newEngine(name, currentImage, imName, nodeName, ip string, port int, starte CurrentImage: currentImage, InstanceManagerName: imName, IP: ip, + TargetIP: ip, StorageIP: ip, + StorageTargetIP: ip, Port: port, + TargetPort: 0, // v1 volume doesn't set target port Started: started, Conditions: conditions, }, diff --git a/controller/utils.go b/controller/utils.go index 1c27ce1f64..cfecd2855e 100644 --- a/controller/utils.go +++ b/controller/utils.go @@ -23,10 +23,6 @@ func hasReplicaEvictionRequested(rs map[string]*longhorn.Replica) bool { return false } -func isVolumeUpgrading(v *longhorn.Volume) bool { - return v.Status.CurrentImage != v.Spec.Image -} - // isTargetVolumeOfAnActiveCloning checks if the input volume is the target volume of an on-going cloning process func isTargetVolumeOfAnActiveCloning(v *longhorn.Volume) bool { isCloningDesired := types.IsDataFromVolume(v.Spec.DataSource) diff --git a/controller/volume_controller.go b/controller/volume_controller.go index 5e32ae1d31..5c64d3a93e 100644 --- a/controller/volume_controller.go +++ b/controller/volume_controller.go @@ -1004,9 +1004,12 @@ func (c *VolumeController) cleanupCorruptedOrStaleReplicas(v *longhorn.Volume, r } continue } else if r.Spec.Image != v.Spec.Image { - // r.Spec.Active shouldn't be set for the leftover replicas, something must wrong - log.WithField("replica", r.Name).Warnf("Replica engine image %v is different from volume engine image %v, "+ - "but replica spec.Active has been set", r.Spec.Image, v.Spec.Image) + // For a v2 volume, the instance manager image of a replica can be different from the one of its volume + if types.IsDataEngineV1(v.Spec.DataEngine) { + // r.Spec.Active shouldn't be set for the leftover replicas, something must wrong + log.WithField("replica", r.Name).Warnf("Replica engine image %v is different from volume engine image %v, "+ + "but replica spec.Active has been set", r.Spec.Image, v.Spec.Image) + } } } @@ -1613,7 +1616,7 @@ func (c *VolumeController) reconcileAttachDetachStateMachine(v *longhorn.Volume, return err } if !c.areVolumeDependentResourcesOpened(e, rs) { - log.Warnf("Volume is attached but dependent resources are not opened") + log.WithField("e.Status.CurrentState", e.Status.CurrentState).Warn("Volume is attached but dependent resources are not opened") } } return nil @@ -1821,9 +1824,16 @@ func (c *VolumeController) openVolumeDependentResources(v *longhorn.Volume, e *l return err } if canIMLaunchReplica { - if r.Spec.FailedAt == "" && r.Spec.Image == v.Status.CurrentImage { + if r.Spec.FailedAt == "" { if r.Status.CurrentState == longhorn.InstanceStateStopped { - r.Spec.DesireState = longhorn.InstanceStateRunning + if types.IsDataEngineV1(e.Spec.DataEngine) { + if r.Spec.Image == v.Status.CurrentImage { + r.Spec.DesireState = longhorn.InstanceStateRunning + } + } else { + // For v2 volume, the image of replica is no need to be the same as the volume image + r.Spec.DesireState = longhorn.InstanceStateRunning + } } } } else { @@ -1860,8 +1870,11 @@ func (c *VolumeController) openVolumeDependentResources(v *longhorn.Volume, e *l if r.Spec.NodeID == "" { continue } - if r.Spec.Image != v.Status.CurrentImage { - continue + // For v2 volume, the image of replica is no need to be the same as the volume image + if types.IsDataEngineV1(v.Spec.DataEngine) { + if r.Spec.Image != v.Status.CurrentImage { + continue + } } if r.Spec.EngineName != e.Name { continue @@ -1907,7 +1920,15 @@ func (c *VolumeController) openVolumeDependentResources(v *longhorn.Volume, e *l } e.Spec.NodeID = v.Spec.NodeID e.Spec.ReplicaAddressMap = replicaAddressMap - e.Spec.DesireState = longhorn.InstanceStateRunning + + if types.IsDataEngineV1(v.Spec.DataEngine) { + e.Spec.DesireState = longhorn.InstanceStateRunning + } else { + if v.Spec.Image == v.Status.CurrentImage && v.Spec.TargetNodeID == v.Status.CurrentTargetNodeID { + e.Spec.DesireState = longhorn.InstanceStateRunning + } + } + // The volume may be activated e.Spec.DisableFrontend = v.Status.FrontendDisabled e.Spec.Frontend = v.Spec.Frontend @@ -1950,6 +1971,7 @@ func (c *VolumeController) closeVolumeDependentResources(v *longhorn.Volume, e * } e.Spec.RequestedBackupRestore = "" e.Spec.NodeID = "" + e.Spec.TargetNodeID = "" e.Spec.DesireState = longhorn.InstanceStateStopped } // must make sure engine stopped first before stopping replicas @@ -2061,7 +2083,7 @@ func (c *VolumeController) canInstanceManagerLaunchReplica(r *longhorn.Replica) if r.Status.InstanceManagerName != "" { return true, nil } - defaultIM, err := c.ds.GetInstanceManagerByInstanceRO(r) + defaultIM, err := c.ds.GetInstanceManagerByInstanceRO(r, false) if err != nil { return false, errors.Wrapf(err, "failed to find instance manager for replica %v", r.Name) } @@ -2211,6 +2233,20 @@ func hasLocalReplicaOnSameNodeAsEngine(e *longhorn.Engine, rs map[string]*longho // It will count all the potentially usable replicas, since some replicas maybe // blank or in rebuilding state func (c *VolumeController) replenishReplicas(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica, hardNodeAffinity string) error { + log := getLoggerForVolume(c.logger, v) + + // Skip the replenishReplicas if the volume is being upgraded + if types.IsDataEngineV2(v.Spec.DataEngine) { + node, err := c.ds.GetNode(v.Status.OwnerID) + if err != nil { + return errors.Wrapf(err, "failed to get node %v for checking volume upgrade status", v.Status.OwnerID) + } + if node.Spec.DataEngineUpgradeRequested { + log.Info("Node is being upgraded, skip replenishing replica") + return nil + } + } + concurrentRebuildingLimit, err := c.ds.GetSettingAsInt(types.SettingNameConcurrentReplicaRebuildPerNodeLimit) if err != nil { return err @@ -2248,8 +2284,6 @@ func (c *VolumeController) replenishReplicas(v *longhorn.Volume, e *longhorn.Eng return nil } - log := getLoggerForVolume(c.logger, v) - replenishCount, updateNodeAffinity := c.getReplenishReplicasCount(v, rs, e) if hardNodeAffinity == "" && updateNodeAffinity != "" { hardNodeAffinity = updateNodeAffinity @@ -3027,8 +3061,8 @@ func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[str } log := getLoggerForVolume(c.logger, v).WithFields(logrus.Fields{ - "engine": e.Name, - "volumeDesiredEngineImage": v.Spec.Image, + "engine": e.Name, + "volumeDesiredImage": v.Spec.Image, }) if !isVolumeUpgrading(v) { @@ -3049,30 +3083,13 @@ func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[str // If volume is detached accidentally during the live upgrade, // the live upgrade info and the inactive replicas are meaningless. if v.Status.State == longhorn.VolumeStateDetached { - if e.Spec.Image != v.Spec.Image { - e.Spec.Image = v.Spec.Image - e.Spec.UpgradedReplicaAddressMap = map[string]string{} - } - for _, r := range rs { - if r.Spec.Image != v.Spec.Image { - r.Spec.Image = v.Spec.Image - rs[r.Name] = r - } - if !r.Spec.Active { - log.Infof("Removing inactive replica %v when the volume is detached accidentally during the live upgrade", r.Name) - if err := c.deleteReplica(r, rs); err != nil { - return err - } - } - } - // TODO current replicas should be calculated by checking if there is - // any other image exists except for the v.Spec.Image - v.Status.CurrentImage = v.Spec.Image - return nil + return c.handleDetachedVolumeUpgrade(v, e, rs, log) } - // Only start live upgrade if volume is healthy - if v.Status.State != longhorn.VolumeStateAttached || v.Status.Robustness != longhorn.VolumeRobustnessHealthy { + // Only start live upgrade if + // - v1 volume is healthy + // - v2 volume is healthy or degraded + if isVolumeNotEligibleForLiveUpgrade(v) { if v.Status.State != longhorn.VolumeStateAttached || v.Status.Robustness != longhorn.VolumeRobustnessDegraded { return nil } @@ -3159,18 +3176,160 @@ func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[str e.Spec.Image = v.Spec.Image } } - c.finishLiveEngineUpgrade(v, e, rs, log) } else { - // TODO: Implement the logic for data engine v2 + if isV2DataEngineLiveUpgradeCompleted(v) { + return nil + } + + log = log.WithField("engine", e.Name) + + if v.Spec.TargetNodeID != "" { + if e.Spec.TargetNodeID != v.Spec.TargetNodeID { + if e.Spec.Image != v.Spec.Image { + log.Infof("Updating image from %s to %s for v2 data engine live upgrade", e.Spec.Image, v.Spec.Image) + e.Spec.Image = v.Spec.Image + } + + log.Infof("Updating target node from %s to %s for v2 data engine live upgrade", e.Spec.TargetNodeID, v.Spec.TargetNodeID) + e.Spec.TargetNodeID = v.Spec.TargetNodeID + return nil + } + + if !e.Status.TargetInstanceReplacementCreated && e.Status.CurrentTargetNodeID != v.Spec.TargetNodeID { + log.Debug("Waiting for target instance replacement to be created") + return nil + } + + if e.Status.CurrentTargetNodeID != v.Spec.TargetNodeID { + if e.Status.CurrentState == longhorn.InstanceStateRunning { + log.Infof("Suspending engine for v2 data engine live upgrade") + e.Spec.DesireState = longhorn.InstanceStateSuspended + return nil + } else { + // TODO: what if e.Status.CurrentState != longhorn.InstanceStateRunning + } + } + + // At this moment: + // 1. volume is running and healthy + // 2. engine is suspended + // 3. initiator is correcting to new target + // 4. old target is still existing + + if replicaAddressMap, err := c.constructReplicaAddressMap(v, e, rs); err != nil { + return nil + } else { + if !reflect.DeepEqual(e.Spec.UpgradedReplicaAddressMap, replicaAddressMap) { + e.Spec.UpgradedReplicaAddressMap = replicaAddressMap + return nil + } + } + + if e.Status.CurrentState == longhorn.InstanceStateSuspended { + log.Infof("Resuming engine for live upgrade") + e.Spec.DesireState = longhorn.InstanceStateRunning + return nil + } + + if e.Status.CurrentState != longhorn.InstanceStateRunning { + log.Debugf("Engine is in %v, waiting for engine to be running", e.Status.CurrentState) + return nil + } + + // At this point: + // 1. volume is running and healthy + // 1. engine is running + // 2. initiator is correcting to new target + // 4. old target is still deleted + + if v.Status.CurrentTargetNodeID != v.Spec.TargetNodeID { + v.Status.CurrentTargetNodeID = v.Spec.TargetNodeID + return nil + } + } + } + + c.finishLiveEngineUpgrade(v, e, rs, log) + + return nil +} + +func (c *VolumeController) handleDetachedVolumeUpgrade(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica, log logrus.FieldLogger) error { + if e.Spec.Image != v.Spec.Image { + e.Spec.Image = v.Spec.Image + e.Spec.UpgradedReplicaAddressMap = map[string]string{} + e.Spec.TargetNodeID = "" + } + + for name, r := range rs { + if err := c.updateDetachedReplica(v, r); err != nil { + return err + } + if !r.Spec.Active { + log.Infof("Removing inactive replica %v when the volume is detached during live upgrade", r.Name) + if err := c.deleteReplica(r, rs); err != nil { + return err + } + continue + } + rs[name] = r + } + // TODO current replicas should be calculated by checking if there is + // any other image exists except for the v.Spec.Image + v.Status.CurrentImage = v.Spec.Image + return nil +} + +func (c *VolumeController) updateDetachedReplica(v *longhorn.Volume, r *longhorn.Replica) error { + if r.Spec.Image == v.Spec.Image { return nil } + + if types.IsDataEngineV1(v.Spec.DataEngine) { + r.Spec.Image = v.Spec.Image + return nil + } + + im, err := c.ds.GetRunningInstanceManagerByNodeRO(r.Spec.NodeID, longhorn.DataEngineTypeV2) + if err != nil { + return err + } + r.Spec.Image = im.Spec.Image return nil } +func isVolumeNotEligibleForLiveUpgrade(v *longhorn.Volume) bool { + if v.Status.State != longhorn.VolumeStateAttached { + return true + } + + if types.IsDataEngineV1(v.Spec.DataEngine) { + if v.Status.Robustness != longhorn.VolumeRobustnessHealthy { + return true + } + } + + if types.IsDataEngineV2(v.Spec.DataEngine) { + if v.Status.Robustness != longhorn.VolumeRobustnessHealthy && + v.Status.Robustness != longhorn.VolumeRobustnessDegraded { + return true + } + } + + return false +} + +func isV2DataEngineLiveUpgradeCompleted(v *longhorn.Volume) bool { + return v.Spec.TargetNodeID != "" && + v.Spec.Image == v.Status.CurrentImage && + v.Spec.TargetNodeID == v.Status.CurrentTargetNodeID && + v.Spec.NodeID == v.Status.CurrentNodeID +} + func (c *VolumeController) constructReplicaAddressMap(v *longhorn.Volume, e *longhorn.Engine, dataPathToNewReplica map[string]*longhorn.Replica) (map[string]string, error) { log := getLoggerForVolume(c.logger, v).WithFields(logrus.Fields{ - "engine": e.Name, - "volumeDesiredEngineImage": v.Spec.Image, + "engine": e.Name, + "volumeDesiredImage": v.Spec.Image, }) replicaAddressMap := map[string]string{} @@ -3200,8 +3359,8 @@ func (c *VolumeController) constructReplicaAddressMap(v *longhorn.Volume, e *lon func (c *VolumeController) groupReplicasByImageAndState(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica) (unknownReplicas, dataPathToOldRunningReplica, dataPathToNewReplica map[string]*longhorn.Replica) { log := getLoggerForVolume(c.logger, v).WithFields(logrus.Fields{ - "engine": e.Name, - "volumeDesiredEngineImage": v.Spec.Image, + "engine": e.Name, + "volumeDesiredImage": v.Spec.Image, }) unknownReplicas = map[string]*longhorn.Replica{} @@ -3256,20 +3415,29 @@ func (c *VolumeController) checkOldAndNewEngineImagesForLiveUpgrade(v *longhorn. } func (c *VolumeController) finishLiveEngineUpgrade(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica, log *logrus.Entry) { - if e.Status.CurrentImage != v.Spec.Image || - e.Status.CurrentState != longhorn.InstanceStateRunning { + if e.Status.CurrentImage != v.Spec.Image { + return + } + if e.Status.CurrentState != longhorn.InstanceStateRunning { return } - c.switchActiveReplicas(rs, func(r *longhorn.Replica, image string) bool { - return r.Spec.Image == image && r.DeletionTimestamp.IsZero() - }, v.Spec.Image) + if types.IsDataEngineV1(v.Spec.DataEngine) { + c.switchActiveReplicas(rs, func(r *longhorn.Replica, image string) bool { + return r.Spec.Image == image && r.DeletionTimestamp.IsZero() + }, v.Spec.Image) + } e.Spec.ReplicaAddressMap = e.Spec.UpgradedReplicaAddressMap e.Spec.UpgradedReplicaAddressMap = map[string]string{} // cleanupCorruptedOrStaleReplicas() will take care of old replicas log.Infof("Engine %v has been upgraded from %v to %v", e.Name, v.Status.CurrentImage, v.Spec.Image) v.Status.CurrentImage = v.Spec.Image + + if v.Spec.TargetNodeID == "" { + v.Status.CurrentTargetNodeID = "" + e.Spec.TargetNodeID = "" + } } func (c *VolumeController) updateRequestedBackupForVolumeRestore(v *longhorn.Volume, e *longhorn.Engine) (err error) { @@ -3646,6 +3814,13 @@ func (c *VolumeController) createEngine(v *longhorn.Volume, currentEngineName st } func (c *VolumeController) newReplica(v *longhorn.Volume, e *longhorn.Engine, hardNodeAffinity string) *longhorn.Replica { + image := v.Status.CurrentImage + if types.IsDataEngineV2(v.Spec.DataEngine) { + // spec.image of v2 replica can be empty and different from the volume image, + // because the image of a v2 replica is the same as the running instance manager. + image = "" + } + return &longhorn.Replica{ ObjectMeta: metav1.ObjectMeta{ Name: types.GenerateReplicaNameForVolume(v.Name), @@ -3655,7 +3830,7 @@ func (c *VolumeController) newReplica(v *longhorn.Volume, e *longhorn.Engine, ha InstanceSpec: longhorn.InstanceSpec{ VolumeName: v.Name, VolumeSize: v.Spec.Size, - Image: v.Status.CurrentImage, + Image: image, DataEngine: v.Spec.DataEngine, DesireState: longhorn.InstanceStateStopped, }, @@ -4907,3 +5082,13 @@ func (c *VolumeController) shouldCleanUpFailedReplica(v *longhorn.Volume, r *lon } return false } + +func isVolumeUpgrading(v *longhorn.Volume) bool { + imageNotUpdated := v.Status.CurrentImage != v.Spec.Image + + if types.IsDataEngineV1(v.Spec.DataEngine) { + return imageNotUpdated + } + + return imageNotUpdated || v.Spec.TargetNodeID != v.Status.CurrentTargetNodeID +} diff --git a/controller/volume_controller_test.go b/controller/volume_controller_test.go index 6e1eab29f0..f85d321f74 100644 --- a/controller/volume_controller_test.go +++ b/controller/volume_controller_test.go @@ -503,8 +503,11 @@ func (s *TestSuite) TestVolumeLifeCycle(c *C) { e.Status.OwnerID = TestNode1 e.Status.CurrentState = longhorn.InstanceStateStopped e.Status.IP = "" + e.Status.TargetIP = "" e.Status.StorageIP = "" + e.Status.StorageTargetIP = "" e.Status.Port = 0 + e.Status.TargetPort = 0 e.Status.Endpoint = "" e.Status.CurrentImage = "" e.Status.ReplicaModeMap = map[string]longhorn.ReplicaMode{}