diff --git a/controller/engine_controller.go b/controller/engine_controller.go
index 92da3d22d2..4a3cbaba2d 100644
--- a/controller/engine_controller.go
+++ b/controller/engine_controller.go
@@ -3,6 +3,7 @@ package controller
 import (
 	"context"
 	"fmt"
+	"net"
 	"path/filepath"
 	"reflect"
 	"regexp"
@@ -280,12 +281,7 @@ func (ec *EngineController) syncEngine(key string) (err error) {
 		return errors.Wrapf(err, "failed to get engine")
 	}
 
-	defaultEngineImage, err := ec.ds.GetSettingValueExisted(types.SettingNameDefaultEngineImage)
-	if err != nil {
-		return err
-	}
-
-	isResponsible, err := ec.isResponsibleFor(engine, defaultEngineImage)
+	isResponsible, err := ec.isResponsibleFor(engine)
 	if err != nil {
 		return err
 	}
@@ -438,24 +434,67 @@ func (ec *EngineController) enqueueInstanceManagerChange(obj interface{}) {
 	}
 }
 
-func (ec *EngineController) CreateInstance(obj interface{}) (*longhorn.InstanceProcess, error) {
+func (ec *EngineController) findInstanceManagerAndIPs(obj interface{}) (im *longhorn.InstanceManager, initiatorIP string, targetIP string, err error) {
+	e, ok := obj.(*longhorn.Engine)
+	if !ok {
+		return nil, "", "", fmt.Errorf("invalid object for engine: %v", obj)
+	}
+
+	initiatorIM, err := ec.ds.GetInstanceManagerByInstanceRO(obj, false)
+	if err != nil {
+		return nil, "", "", err
+	}
+
+	initiatorIP = initiatorIM.Status.IP
+	targetIP = initiatorIM.Status.IP
+	im = initiatorIM
+
+	// Target, or called Target Instance, is on another node
+	if e.Spec.TargetNodeID != "" {
+		targetIM, err := ec.ds.GetInstanceManagerByInstanceRO(obj, true)
+		if err != nil {
+			return nil, "", "", err
+		}
+
+		targetIP = targetIM.Status.IP
+
+		if !e.Status.TargetInstanceReplacementCreated && e.Status.CurrentTargetNodeID == "" {
+			im = targetIM
+		}
+	}
+
+	return im, initiatorIP, targetIP, nil
+}
+
+func (ec *EngineController) CreateInstance(obj interface{}, isInstanceOnRemoteNode bool) (*longhorn.InstanceProcess, error) {
 	e, ok := obj.(*longhorn.Engine)
 	if !ok {
 		return nil, fmt.Errorf("invalid object for engine process creation: %v", obj)
 	}
+
+	log := getLoggerForEngine(ec.logger, e)
+
 	if e.Spec.VolumeName == "" || e.Spec.NodeID == "" {
 		return nil, fmt.Errorf("missing parameters for engine instance creation: %v", e)
 	}
+
 	frontend := e.Spec.Frontend
 	if e.Spec.DisableFrontend {
 		frontend = longhorn.VolumeFrontendEmpty
 	}
 
-	im, err := ec.ds.GetInstanceManagerByInstanceRO(obj)
+	im, initiatorIP, targetIP, err := ec.findInstanceManagerAndIPs(obj)
 	if err != nil {
 		return nil, err
 	}
 
+	log.WithFields(logrus.Fields{
+		"instanceManager":      im.Name,
+		"instanceManagerImage": im.Spec.Image,
+		"initiatorIP":          initiatorIP,
+		"targetIP":             targetIP,
+	}).Info("Creating engine instance")
+
 	c, err := engineapi.NewInstanceManagerClient(im, false)
 	if err != nil {
 		return nil, err
@@ -482,6 +521,10 @@ func (ec *EngineController) CreateInstance(obj interface{}) (*longhorn.InstanceP
 		return nil, err
 	}
 
+	// No need to care about the initiator and target ports if the engine is not being upgraded.
+	initiatorAddress := net.JoinHostPort(initiatorIP, strconv.Itoa(0))
+	targetAddress := net.JoinHostPort(targetIP, strconv.Itoa(e.Status.Port))
+
 	return c.EngineInstanceCreate(&engineapi.EngineInstanceCreateRequest{
 		Engine:                           e,
 		VolumeFrontend:                   frontend,
@@ -490,9 +533,8 @@ func (ec *EngineController) CreateInstance(obj interface{}) (*longhorn.InstanceP
 		DataLocality:                     v.Spec.DataLocality,
 		ImIP:                             im.Status.IP,
 		EngineCLIAPIVersion:              cliAPIVersion,
-		UpgradeRequired:                  false,
-		InitiatorAddress:                 im.Status.IP,
-		TargetAddress:                    im.Status.IP,
+		InitiatorAddress:                 initiatorAddress,
+		TargetAddress:                    targetAddress,
 	})
 }
 
@@ -511,7 +553,7 @@ func (ec *EngineController) DeleteInstance(obj interface{}) (err error) {
 			log.Warn("Engine does not set instance manager name and node ID, will skip the actual instance deletion")
 			return nil
 		}
-		im, err = ec.ds.GetInstanceManagerByInstance(obj)
+		im, err = ec.ds.GetInstanceManagerByInstance(obj, false)
 		if err != nil {
 			log.WithError(err).Warn("Failed to detect instance manager for engine, will skip the actual instance deletion")
 			return nil
@@ -589,29 +631,256 @@ func (ec *EngineController) DeleteInstance(obj interface{}) (err error) {
 		return err
 	}
 
+	if e.Status.CurrentTargetNodeID != "" {
+		err = c.EngineInstanceDeleteTarget(&engineapi.EngineInstanceDeleteTargetRequest{
+			Engine: e,
+		})
+		if err != nil && !types.ErrorIsNotFound(err) {
+			return err
+		}
+	}
+
 	return nil
 }
 
-func (ec *EngineController) GetInstance(obj interface{}) (*longhorn.InstanceProcess, error) {
+func (ec *EngineController) SuspendInstance(obj interface{}) error {
 	e, ok := obj.(*longhorn.Engine)
 	if !ok {
-		return nil, fmt.Errorf("invalid object for engine instance get: %v", obj)
+		return fmt.Errorf("invalid object for engine instance suspension: %v", obj)
+	}
+
+	if !types.IsDataEngineV2(e.Spec.DataEngine) {
+		return fmt.Errorf("suspending engine instance is not supported for data engine %v", e.Spec.DataEngine)
+	}
+
+	if e.Spec.VolumeName == "" || e.Spec.NodeID == "" {
+		return fmt.Errorf("missing parameters for engine instance suspension: %+v", e)
+	}
+
+	im, err := ec.ds.GetInstanceManagerByInstanceRO(obj, false)
+	if err != nil {
+		return err
+	}
+	c, err := engineapi.NewInstanceManagerClient(im, false)
+	if err != nil {
+		return err
+	}
+	defer c.Close()
+
+	return c.EngineInstanceSuspend(&engineapi.EngineInstanceSuspendRequest{
+		Engine: e,
+	})
+}
+
+func (ec *EngineController) ResumeInstance(obj interface{}) error {
+	e, ok := obj.(*longhorn.Engine)
+	if !ok {
+		return fmt.Errorf("invalid object for engine instance resumption: %v", obj)
+	}
+
+	if !types.IsDataEngineV2(e.Spec.DataEngine) {
+		return fmt.Errorf("resuming engine instance is not supported for data engine %v", e.Spec.DataEngine)
+	}
+
+	if e.Spec.VolumeName == "" || e.Spec.NodeID == "" {
+		return fmt.Errorf("missing parameters for engine instance resumption: %+v", e)
+	}
+
+	im, err := ec.ds.GetInstanceManagerByInstanceRO(obj, false)
+	if err != nil {
+		return err
+	}
+	c, err := engineapi.NewInstanceManagerClient(im, false)
+	if err != nil {
+		return err
+	}
+	defer c.Close()
+
+	return c.EngineInstanceResume(&engineapi.EngineInstanceResumeRequest{
+		Engine: e,
+	})
+}
+
+func (ec *EngineController) SwitchOverTarget(obj interface{}) error {
+	e, ok := obj.(*longhorn.Engine)
+	if !ok {
+		return fmt.Errorf("invalid object for target switchover: %v", obj)
+	}
+
+	if !types.IsDataEngineV2(e.Spec.DataEngine) {
+		return fmt.Errorf("target switchover is not supported for data engine %v", e.Spec.DataEngine)
+	}
+
+	if e.Spec.VolumeName == "" || e.Spec.NodeID == "" {
+		return fmt.Errorf("missing parameters for target switchover: %+v", e)
+	}
+
+	initiatorInstance, err := ec.GetInstance(obj, false)
+	if err != nil {
+		return errors.Wrapf(err, "failed to get initiator instance %v for switchover", e.Name)
+	}
+
+	log := getLoggerForEngine(ec.logger, e)
+
+	log.Infof("Preparing to switch over target to node %v. Initiator instance port details: port=%v, targetPort=%v, standbyTargetPort=%v",
+		e.Spec.TargetNodeID, initiatorInstance.Status.PortStart, initiatorInstance.Status.TargetPortStart, initiatorInstance.Status.StandbyTargetPortStart)
+
+	targetInstance, err := ec.GetInstance(obj, true)
+	if err != nil {
+		return errors.Wrapf(err, "failed to get target instance %v for switchover", e.Name)
+	}
+	log.Infof("Preparing to switch over target to node %v. Target instance port details: port=%v, targetPort=%v, standbyTargetPort=%v",
+		e.Spec.TargetNodeID, targetInstance.Status.PortStart, targetInstance.Status.TargetPortStart, targetInstance.Status.StandbyTargetPortStart)
+
+	targetIM, err := ec.getTargetInstanceManagerForSwitchOver(e)
+	if err != nil {
+		return err
+	}
+
+	initiatorIM, err := ec.ds.GetInstanceManagerByInstanceRO(obj, false)
+	if err != nil {
+		return err
+	}
+	c, err := engineapi.NewInstanceManagerClient(initiatorIM, false)
+	if err != nil {
+		return err
+	}
+	defer c.Close()
+
+	port := targetInstance.Status.TargetPortStart
+	if targetInstance.Status.StandbyTargetPortStart != 0 {
+		port = targetInstance.Status.StandbyTargetPortStart
 	}
 
+	log.Infof("Switching over target to %v:%v", targetIM.Status.IP, port)
+	return c.EngineInstanceSwitchOverTarget(&engineapi.EngineInstanceSwitchOverTargetRequest{
+		Engine:        e,
+		TargetAddress: net.JoinHostPort(targetIM.Status.IP, fmt.Sprint(port)),
+	})
+}
+
+func (ec *EngineController) getTargetInstanceManagerForSwitchOver(e *longhorn.Engine) (targetIM *longhorn.InstanceManager, err error) {
+	if e.Spec.TargetNodeID != "" {
+		targetIM, err = ec.ds.GetRunningInstanceManagerByNodeRO(e.Spec.TargetNodeID, e.Spec.DataEngine)
+		if err != nil {
+			return nil, errors.Wrapf(err, "failed to get instance manager on node %v for switching over target", e.Spec.TargetNodeID)
+		}
+
+		return targetIM, nil
+	}
+
+	if e.Spec.NodeID == "" {
+		return nil, fmt.Errorf("nodeID is required for switching over target")
+	}
+
+	targetIM, err = ec.ds.GetDefaultInstanceManagerByNodeRO(e.Spec.NodeID, e.Spec.DataEngine)
+	if err != nil {
+		if !datastore.ErrorIsNotFound(err) {
+			return nil, err
+		}
+	}
+
+	return targetIM, nil
+}
+
+func (ec *EngineController) DeleteTarget(obj interface{}) error {
+	e, ok := obj.(*longhorn.Engine)
+	if !ok {
+		return fmt.Errorf("invalid object for engine target deletion: %v", obj)
+	}
+
+	if !types.IsDataEngineV2(e.Spec.DataEngine) {
+		return fmt.Errorf("deleting target for engine instance is not supported for data engine %v", e.Spec.DataEngine)
+	}
+
+	ec.logger.WithField("engine", e.Name).Info("Deleting target instance")
+
+	if e.Spec.VolumeName == "" || e.Spec.NodeID == "" {
+		return fmt.Errorf("missing parameters for engine target deletion: %+v", e)
+	}
+
+	targetNodeID := e.Spec.NodeID
+	if e.Spec.NodeID == e.Spec.TargetNodeID {
+		targetNodeID = e.Status.CurrentTargetNodeID
+	}
+	if targetNodeID == "" {
+		return fmt.Errorf("missing target node ID for target deletion: %+v", e)
+	}
+
+	im, err := ec.ds.GetRunningInstanceManagerByNodeRO(targetNodeID, e.Spec.DataEngine)
+	if err != nil {
+		return err
+	}
+	c, err := engineapi.NewInstanceManagerClient(im, false)
+	if err != nil {
+		return err
+	}
+	defer c.Close()
+
+	return c.EngineInstanceDeleteTarget(&engineapi.EngineInstanceDeleteTargetRequest{
+		Engine: e,
+	})
+}
+
+func (ec *EngineController) RequireRemoteTargetInstance(obj interface{}) (bool, error) {
+	e, ok := obj.(*longhorn.Engine)
+	if !ok {
+		return false, fmt.Errorf("invalid object for checking engine instance remote target requirement: %v", obj)
+	}
+
+	return e.Spec.TargetNodeID != "" && e.Spec.TargetNodeID != e.Spec.NodeID, nil
+}
+
+func (ec *EngineController) IsEngine(obj interface{}) bool {
+	_, ok := obj.(*longhorn.Engine)
+	return ok
+}
+
+func (ec *EngineController) GetInstance(obj interface{}, isInstanceOnRemoteNode bool) (*longhorn.InstanceProcess, error) {
+	e, ok := obj.(*longhorn.Engine)
+	if !ok {
+		return nil, fmt.Errorf("invalid object for engine instance get: %v", obj)
+	}
 	var (
 		im  *longhorn.InstanceManager
 		err error
 	)
-	if e.Status.InstanceManagerName == "" {
-		im, err = ec.ds.GetInstanceManagerByInstanceRO(obj)
+
+	nodeID := e.Spec.NodeID
+	instanceManagerName := e.Status.InstanceManagerName
+	if isInstanceOnRemoteNode {
+		nodeID = e.Spec.TargetNodeID
+		im, err := ec.ds.GetRunningInstanceManagerByNodeRO(nodeID, e.Spec.DataEngine)
 		if err != nil {
-			return nil, err
+			return nil, errors.Wrapf(err, "failed to get running instance manager for engine %v", e.Name)
+		}
+		instanceManagerName = im.Name
+	}
+
+	if instanceManagerName == "" {
+		if e.Spec.DesireState == longhorn.InstanceStateRunning && e.Status.CurrentState == longhorn.InstanceStateSuspended {
+			im, err = ec.ds.GetRunningInstanceManagerByNodeRO(nodeID, e.Spec.DataEngine)
+		} else {
+			im, err = ec.ds.GetInstanceManagerByInstanceRO(obj, false)
 		}
-	} else {
-		im, err = ec.ds.GetInstanceManagerRO(e.Status.InstanceManagerName)
 		if err != nil {
 			return nil, err
 		}
+	} else if im == nil {
+		im, err = ec.ds.GetInstanceManagerRO(instanceManagerName)
+		if err != nil {
+			if !apierrors.IsNotFound(err) {
+				return nil, err
+			}
+			if types.IsDataEngineV2(e.Spec.DataEngine) {
+				im, err = ec.ds.GetRunningInstanceManagerByNodeRO(nodeID, e.Spec.DataEngine)
+				if err != nil {
+					return nil, errors.Wrapf(err, "failed to get running instance manager for engine %v", e.Name)
+				}
+			} else {
+				return nil, err
+			}
+		}
 	}
 	c, err := engineapi.NewInstanceManagerClient(im, false)
 	if err != nil {
@@ -769,16 +1038,58 @@ func (m *EngineMonitor) sync() bool {
 			return false
 		}
 
-		if err := m.refresh(engine); err == nil || !apierrors.IsConflict(errors.Cause(err)) {
-			utilruntime.HandleError(errors.Wrapf(err, "failed to update status for engine %v", m.Name))
-			break
+		err = m.refresh(engine)
+		if err != nil && apierrors.IsConflict(errors.Cause(err)) {
+			// Retry if the error is due to conflict
+			continue
+		}
+		if types.IsDataEngineV2(engine.Spec.DataEngine) && err != nil && apierrors.IsNotFound(errors.Cause(err)) {
+			upgrading, upgradingCheckErr := m.ds.IsNodeDataEngineUpgradeRequested(engine.Spec.NodeID)
+			if upgrading {
+				updated, updatedCheckErr := m.isInstanceManagerUpdated(engine)
+				if updated {
+					for replicaName := range engine.Status.ReplicaModeMap {
+						replica, replicaErr := m.ds.GetReplicaRO(replicaName)
+						if replicaErr != nil {
+							m.logger.WithError(replicaErr).Errorf("Failed to get replica %v", replicaName)
+							continue
+						}
+						if replica.Spec.NodeID == engine.Spec.NodeID &&
+							replica.Status.CurrentState != longhorn.InstanceStateError &&
+							replica.Status.CurrentState != longhorn.InstanceStateStopped {
+							m.logger.Warnf("Replica %v in state %v is still on the node %v, will retry updating status later",
+								replicaName, replica.Status.CurrentState, replica.Spec.NodeID)
+							return false
+						}
+					}
+				} else {
+					m.logger.Warnf("v2 data engine %v is being upgraded, will retry updating status later", engine.Name)
+					return false
+				}
+				if updatedCheckErr != nil {
+					upgradingCheckErr = errors.Wrapf(updatedCheckErr, "failed to check if the instance manager is updated")
+				}
+			}
+			if upgradingCheckErr != nil {
+				err = errors.Wrapf(upgradingCheckErr, "failed to check if the engine %v is being upgraded", engine.Name)
+			}
 		}
-		// Retry if the error is due to conflict
+		utilruntime.HandleError(errors.Wrapf(err, "failed to update status for engine %v", m.Name))
+		break
 	}
 
 	return false
 }
 
+func (m *EngineMonitor) isInstanceManagerUpdated(engine *longhorn.Engine) (bool, error) {
+	defaultIM, err := m.ds.GetDefaultInstanceManagerByNodeRO(engine.Spec.NodeID, engine.Spec.DataEngine)
+	if err != nil {
+		return false, err
+	}
+
+	return defaultIM.Name == engine.Status.InstanceManagerName, nil
+}
+
 func (m *EngineMonitor) refresh(engine *longhorn.Engine) error {
 	existingEngine := engine.DeepCopy()
 
@@ -2105,7 +2416,53 @@ func (ec *EngineController) Upgrade(e *longhorn.Engine, log *logrus.Entry) (err
 			}
 		}
 	} else {
-		return errors.Wrapf(err, "upgrading engine %v with data engine %v is not supported", e.Name, e.Spec.DataEngine)
+		// Check if the initiator instance is running
+		im, err := ec.ds.GetRunningInstanceManagerByNodeRO(e.Spec.NodeID, longhorn.DataEngineTypeV2)
+		if err != nil {
+			return err
+		}
+		if im.Status.CurrentState != longhorn.InstanceManagerStateRunning {
+			return fmt.Errorf("instance manager %v for initiating instance %v is not running", im.Name, e.Name)
+		}
+
+		initiatorIMClient, err := engineapi.NewInstanceManagerClient(im, false)
+		if err != nil {
+			return err
+		}
+		defer initiatorIMClient.Close()
+
+		if _, err := initiatorIMClient.InstanceGet(e.Spec.DataEngine, e.Name, string(longhorn.InstanceManagerTypeEngine)); err != nil {
+			return err
+		}
+
+		_, ok := im.Status.InstanceEngines[e.Name]
+		if !ok {
+			return fmt.Errorf("initiator instance %v is not found in engine list", e.Name)
+		}
+
+		// Check whether the target instance is existing
+		im, err = ec.ds.GetRunningInstanceManagerByNodeRO(e.Spec.TargetNodeID, longhorn.DataEngineTypeV2)
+		if err != nil {
+			return err
+		}
+		if im.Status.CurrentState != longhorn.InstanceManagerStateRunning {
+			return fmt.Errorf("instance manager %v for target instance %v is not running", im.Name, e.Name)
+		}
+
+		targetIMClient, err := engineapi.NewInstanceManagerClient(im, false)
+		if err != nil {
+			return err
+		}
+		defer targetIMClient.Close()
+
+		if _, err := targetIMClient.InstanceGet(e.Spec.DataEngine, e.Name, string(longhorn.InstanceManagerTypeEngine)); err != nil {
+			return err
+		}
+
+		_, ok = im.Status.InstanceEngines[e.Name]
+		if !ok {
+			return fmt.Errorf("target instance %v is not found in engine list", e.Name)
+		}
 	}
 
 	log.Infof("Engine has been upgraded from %v to %v", e.Status.CurrentImage, e.Spec.Image)
@@ -2116,6 +2473,7 @@ func (ec *EngineController) Upgrade(e *longhorn.Engine, log *logrus.Entry) (err
 	e.Status.ReplicaTransitionTimeMap = nil
 	e.Status.RestoreStatus = nil
 	e.Status.RebuildStatus = nil
+	e.Status.TargetInstanceReplacementCreated = false
 	return nil
 }
 
@@ -2184,7 +2542,7 @@ func (ec *EngineController) UpgradeEngineInstance(e *longhorn.Engine, log *logru
 // isResponsibleFor picks a running node that has e.Status.CurrentImage deployed.
 // We need e.Status.CurrentImage deployed on the node to make request to the corresponding engine instance.
 // Prefer picking the node e.Spec.NodeID if it meet the above requirement.
-func (ec *EngineController) isResponsibleFor(e *longhorn.Engine, defaultEngineImage string) (bool, error) {
+func (ec *EngineController) isResponsibleFor(e *longhorn.Engine) (bool, error) {
 	var err error
 	defer func() {
 		err = errors.Wrap(err, "error while checking isResponsibleFor")
diff --git a/controller/instance_handler.go b/controller/instance_handler.go
index a943d28d03..fc5d6b53b0 100644
--- a/controller/instance_handler.go
+++ b/controller/instance_handler.go
@@ -35,10 +35,16 @@ type InstanceHandler struct {
 }
 
 type InstanceManagerHandler interface {
-	GetInstance(obj interface{}) (*longhorn.InstanceProcess, error)
-	CreateInstance(obj interface{}) (*longhorn.InstanceProcess, error)
+	GetInstance(obj interface{}, isInstanceOnRemoteNode bool) (*longhorn.InstanceProcess, error)
+	CreateInstance(obj interface{}, isInstanceOnRemoteNode bool) (*longhorn.InstanceProcess, error)
 	DeleteInstance(obj interface{}) error
 	LogInstance(ctx context.Context, obj interface{}) (*engineapi.InstanceManagerClient, *imapi.LogStream, error)
+	SuspendInstance(obj interface{}) error
+	ResumeInstance(obj interface{}) error
+	SwitchOverTarget(obj interface{}) error
+	DeleteTarget(obj interface{}) error
+	RequireRemoteTargetInstance(obj interface{}) (bool, error)
+	IsEngine(obj interface{}) bool
 }
 
 func NewInstanceHandler(ds *datastore.DataStore, instanceManagerHandler InstanceManagerHandler, eventRecorder record.EventRecorder) *InstanceHandler {
@@ -49,10 +55,130 @@ func NewInstanceHandler(ds *datastore.DataStore, instanceManagerHandler Instance
 	}
 }
 
-func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceManager, instanceName string, spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus, instances map[string]longhorn.InstanceProcess) {
+func (h *InstanceHandler) syncStatusIPsAndPorts(im *longhorn.InstanceManager, obj interface{}, spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus, instanceName string, instance longhorn.InstanceProcess) {
+	imPod, err := h.ds.GetPodRO(im.Namespace, im.Name)
+	if err != nil {
+		logrus.WithError(err).Errorf("Failed to get instance manager pod from %v", im.Name)
+		return
+	}
+	if imPod == nil {
+		logrus.Warnf("Instance manager pod from %v not exist in datastore", im.Name)
+		return
+	}
+
+	storageIP := h.ds.GetStorageIPFromPod(imPod)
+	if status.StorageIP != storageIP {
+		status.StorageIP = storageIP
+		logrus.Infof("Instance %v starts running, Storage IP %v", instanceName, status.StorageIP)
+	}
+
+	if status.IP != im.Status.IP {
+		status.IP = im.Status.IP
+		logrus.Infof("Instance %v starts running, IP %v", instanceName, status.IP)
+	}
+
+	if status.Port != int(instance.Status.PortStart) {
+		status.Port = int(instance.Status.PortStart)
+		logrus.Infof("Instance %v starts running, Port %d", instanceName, status.Port)
+	}
+
+	if !h.instanceManagerHandler.IsEngine(obj) {
+		return
+	}
+
+	if types.IsDataEngineV2(spec.DataEngine) && spec.TargetNodeID != "" {
+		targetIM, err := h.ds.GetRunningInstanceManagerByNodeRO(spec.TargetNodeID, spec.DataEngine)
+		if err != nil {
+			logrus.WithError(err).Errorf("Failed to get running instance manager for node %s", spec.TargetNodeID)
+			return
+		}
+		targetClient, err := engineapi.NewInstanceManagerClient(targetIM, false)
+		if err != nil {
+			logrus.WithError(err).Errorf("Failed to get instance manager client for target instance manager %v", targetIM.Name)
+			return
+		}
+		defer targetClient.Close()
+
+		targetInstance, err := targetClient.InstanceGet(spec.DataEngine, instanceName, string(longhorn.InstanceManagerTypeEngine))
+		if err != nil {
+			logrus.WithError(err).Errorf("Failed to get target instance %s from instance manager %s", instanceName, targetIM.Name)
+		} else {
+			if status.TargetIP != targetIM.Status.IP {
+				status.TargetIP = targetIM.Status.IP
+				logrus.Infof("Instance %v starts running, Target IP %v", instanceName, status.TargetIP)
+			}
+
+			if status.TargetPort != int(targetInstance.Status.TargetPortStart) {
+				status.TargetPort = int(targetInstance.Status.TargetPortStart)
+				logrus.Infof("Instance %v starts running, Target IP %v", instanceName, status.TargetIP)
+			}
+
+			// Get storage target IP from target instance manager
+			targetIMPod, err := h.ds.GetPodRO(targetIM.Namespace, targetIM.Name)
+			if err != nil {
+				logrus.WithError(err).Errorf("Failed to get instance manager pod from %v", targetIM.Name)
+				return
+			}
+			if targetIMPod == nil {
+				logrus.Warnf("Instance manager pod from %v not exist in datastore", targetIM.Name)
+				return
+			}
+
+			storageTargetIP := h.ds.GetStorageIPFromPod(targetIMPod)
+			if status.StorageTargetIP != storageTargetIP {
+				status.StorageTargetIP = storageTargetIP
+				logrus.Infof("Instance %v starts running, Storage Target IP %v", instanceName, status.StorageTargetIP)
+			}
+		}
+
+		// Check if the target instance replacement is running on the target node
+		if status.CurrentTargetNodeID != spec.TargetNodeID && !status.TargetInstanceReplacementCreated {
+			running, err := h.isTargetInstanceReplacementRunning(instanceName, spec, status)
+			if err != nil {
+				logrus.WithError(err).Errorf("Failed to check if target instance %v is running", instanceName)
+				return
+			}
+			if running {
+				logrus.Infof("Target instance replacement %v is running on target node %v", instanceName, spec.TargetNodeID)
+				status.TargetInstanceReplacementCreated = true
+			}
+		}
+	} else {
+		if status.StorageTargetIP != storageIP {
+			status.StorageTargetIP = storageIP
+			logrus.Infof("Instance %v starts running, Storage Target IP %v", instanceName, status.StorageTargetIP)
+		}
+
+		if status.TargetIP != im.Status.IP {
+			status.TargetIP = im.Status.IP
+			logrus.Infof("Instance %v starts running, Target IP %v", instanceName, status.TargetPort)
+		}
+
+		if status.TargetPort != int(instance.Status.TargetPortStart) {
+			status.TargetPort = int(instance.Status.TargetPortStart)
+			logrus.Infof("Instance %v starts running, Target Port %v", instanceName, status.TargetPort)
+		}
+
+		status.CurrentTargetNodeID = ""
+		status.TargetInstanceReplacementCreated = false
+	}
+}
+
+func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceManager, obj interface{}, spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus, instances map[string]longhorn.InstanceProcess) {
+	runtimeObj, ok := obj.(runtime.Object)
+	if !ok {
+		return
+	}
+	instanceName, err := h.getNameFromObj(runtimeObj)
+	if err != nil {
+		return
+	}
+
 	defer func() {
 		if status.CurrentState == longhorn.InstanceStateStopped {
 			status.InstanceManagerName = ""
+			status.CurrentTargetNodeID = ""
+			status.TargetInstanceReplacementCreated = false
 		}
 	}()
 
@@ -72,8 +198,11 @@ func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceMan
 			status.CurrentImage = ""
 		}
 		status.IP = ""
+		status.TargetIP = ""
 		status.StorageIP = ""
+		status.StorageTargetIP = ""
 		status.Port = 0
+		status.TargetPort = 0
 		h.resetInstanceErrorCondition(status)
 		return
 	}
@@ -82,32 +211,65 @@ func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceMan
 		im.Status.CurrentState == longhorn.InstanceManagerStateError ||
 		im.DeletionTimestamp != nil {
 		if status.Started {
-			if status.CurrentState != longhorn.InstanceStateError {
-				logrus.Warnf("Marking the instance as state ERROR since failed to find the instance manager for the running instance %v", instanceName)
+			if h.isEngineOfV2DataEngine(obj, spec.DataEngine) {
+				if h.isV2DataEngineBeingUpgraded(spec, status) {
+					logrus.Warnf("Skipping the instance %v since the instance manager %v is %v", instanceName, im.Name, im.Status.CurrentState)
+					return
+				}
+
+				if spec.Image == status.CurrentImage {
+					if status.CurrentState != longhorn.InstanceStateError {
+						upgradeRequested, err := h.ds.IsNodeDataEngineUpgradeRequested(spec.NodeID)
+						if err != nil {
+							// TODO: should we return here or mark the instance as error?
+							logrus.WithError(err).Errorf("Failed to check if node %v is being upgrade requested", spec.NodeID)
+							return
+						}
+						if upgradeRequested {
+							logrus.Warnf("Skipping the instance %v since the instance manager %v is %v since the node %v is being upgrade requested",
+								instanceName, im.Name, im.Status.CurrentState, spec.NodeID)
+							return
+						}
+						logrus.Warnf("Marking the instance as state ERROR since failed to find the instance manager for the running instance %v", instanceName)
+					}
+					status.CurrentState = longhorn.InstanceStateError
+				} else {
+					logrus.Warnf("Skipping the instance %v since the instance manager %v is %v and spec image %v is different from the current image %v",
+						instanceName, im.Name, im.Status.CurrentState, spec.Image, status.CurrentImage)
+					return
+				}
+			} else {
+				if status.CurrentState != longhorn.InstanceStateError {
+					logrus.Warnf("Marking the instance as state ERROR since failed to find the instance manager for the running instance %v", instanceName)
+				}
+				status.CurrentState = longhorn.InstanceStateError
 			}
-			status.CurrentState = longhorn.InstanceStateError
 		} else {
 			status.CurrentState = longhorn.InstanceStateStopped
 		}
 		status.CurrentImage = ""
 		status.IP = ""
+		status.TargetIP = ""
 		status.StorageIP = ""
+		status.StorageTargetIP = ""
 		status.Port = 0
+		status.TargetPort = 0
 		h.resetInstanceErrorCondition(status)
 		return
 	}
 
 	if im.Status.CurrentState == longhorn.InstanceManagerStateStarting {
 		if status.Started {
-			if status.CurrentState != longhorn.InstanceStateError {
-				logrus.Warnf("Marking the instance as state ERROR since the starting instance manager %v shouldn't contain the running instance %v", im.Name, instanceName)
+			if h.isEngineOfV2DataEngine(obj, spec.DataEngine) {
+				if spec.Image == status.CurrentImage {
+					if h.isV2DataEngineBeingUpgraded(spec, status) {
+						return
+					}
+					h.handleIfInstanceManagerStarting(im.Name, instanceName, status)
+				}
+			} else {
+				h.handleIfInstanceManagerStarting(im.Name, instanceName, status)
 			}
-			status.CurrentState = longhorn.InstanceStateError
-			status.CurrentImage = ""
-			status.IP = ""
-			status.StorageIP = ""
-			status.Port = 0
-			h.resetInstanceErrorCondition(status)
 		}
 		return
 	}
@@ -115,23 +277,56 @@ func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceMan
 	instance, exists := instances[instanceName]
 	if !exists {
 		if status.Started {
-			if status.CurrentState != longhorn.InstanceStateError {
-				logrus.Warnf("Marking the instance as state ERROR since failed to find the instance status in instance manager %v for the running instance %v", im.Name, instanceName)
+			if h.isEngineOfV2DataEngine(obj, spec.DataEngine) {
+				if h.isV2DataEngineBeingUpgraded(spec, status) {
+					logrus.Warnf("Skipping checking the instance %v since the instance manager %v is in %v state",
+						instanceName, im.Name, im.Status.CurrentState)
+					return
+				}
+				if status.CurrentState != longhorn.InstanceStateError {
+					// Because the async nature, directly check instance here
+					if spec.TargetNodeID != "" {
+						logrus.Infof("Recreated initiator instance %v is not found in instance manager %v, directly checking it in instance manager %v",
+							instanceName, im.Name, spec.NodeID)
+
+						exist, err := h.isInstanceExist(instanceName, spec)
+						if exist {
+							logrus.Infof("Initiator instance %v is found in instance manager %v", instanceName, spec.NodeID)
+							return
+						}
+						if err != nil {
+							logrus.WithError(err).Errorf("Failed to check if recreated initiator instance %v exists in instance manager %v",
+								instanceName, spec.NodeID)
+							return
+						}
+					}
+					logrus.Warnf("Marking the instance as state ERROR since failed to find the instance status in instance manager %v for the running instance %v",
+						im.Name, instanceName)
+				}
+				status.CurrentState = longhorn.InstanceStateError
+			} else {
+				if status.CurrentState != longhorn.InstanceStateError {
+					logrus.Warnf("Marking the instance as state ERROR since failed to find the instance status in instance manager %v for the running instance %v", im.Name, instanceName)
+				}
+				status.CurrentState = longhorn.InstanceStateError
 			}
-			status.CurrentState = longhorn.InstanceStateError
 		} else {
 			status.CurrentState = longhorn.InstanceStateStopped
 		}
+
 		status.CurrentImage = ""
 		status.IP = ""
+		status.TargetIP = ""
 		status.StorageIP = ""
+		status.StorageTargetIP = ""
 		status.Port = 0
+		status.TargetPort = 0
 		h.resetInstanceErrorCondition(status)
 		return
 	}
 
 	if status.InstanceManagerName != "" && status.InstanceManagerName != im.Name {
-		logrus.Errorf("The related process of instance %v is found in the instance manager %v, but the instance manager name in the instance status is %v. "+
+		logrus.Warnf("The related process of instance %v is found in the instance manager %v, but the instance manager name in the instance status is %v. "+
 			"The instance manager name shouldn't change except for cleanup",
 			instanceName, im.Name, status.InstanceManagerName)
 	}
@@ -146,44 +341,35 @@ func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceMan
 		status.CurrentState = longhorn.InstanceStateStarting
 		status.CurrentImage = ""
 		status.IP = ""
+		status.TargetIP = ""
 		status.StorageIP = ""
+		status.StorageTargetIP = ""
 		status.Port = 0
+		status.TargetPort = 0
 		h.resetInstanceErrorCondition(status)
 	case longhorn.InstanceStateRunning:
 		status.CurrentState = longhorn.InstanceStateRunning
 
-		imPod, err := h.ds.GetPodRO(im.Namespace, im.Name)
-		if err != nil {
-			logrus.WithError(err).Errorf("Failed to get instance manager pod from %v", im.Name)
-			return
-		}
-
-		if imPod == nil {
-			logrus.Warnf("Instance manager pod from %v not exist in datastore", im.Name)
-			return
-		}
-
-		storageIP := h.ds.GetStorageIPFromPod(imPod)
-		if status.StorageIP != storageIP {
-			status.StorageIP = storageIP
-			logrus.Warnf("Instance %v starts running, Storage IP %v", instanceName, status.StorageIP)
-		}
+		h.syncStatusIPsAndPorts(im, obj, spec, status, instanceName, instance)
 
-		if status.IP != im.Status.IP {
-			status.IP = im.Status.IP
-			logrus.Warnf("Instance %v starts running, IP %v", instanceName, status.IP)
-		}
-		if status.Port != int(instance.Status.PortStart) {
-			status.Port = int(instance.Status.PortStart)
-			logrus.Warnf("Instance %v starts running, Port %d", instanceName, status.Port)
-		}
 		// only set CurrentImage when first started, since later we may specify
 		// different spec.Image for upgrade
 		if status.CurrentImage == "" {
-			status.CurrentImage = spec.Image
+			if types.IsDataEngineV1(spec.DataEngine) {
+				status.CurrentImage = spec.Image
+			} else {
+				if h.instanceManagerHandler.IsEngine(obj) {
+					status.CurrentImage = spec.Image
+				} else {
+					status.CurrentImage = im.Spec.Image
+				}
+			}
 		}
-		h.syncInstanceCondition(instance, status)
 
+		h.syncInstanceCondition(instance, status)
+	case longhorn.InstanceStateSuspended:
+		status.CurrentState = longhorn.InstanceStateSuspended
+		status.CurrentTargetNodeID = spec.TargetNodeID
 	case longhorn.InstanceStateStopping:
 		if status.Started {
 			status.CurrentState = longhorn.InstanceStateError
@@ -192,8 +378,11 @@ func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceMan
 		}
 		status.CurrentImage = ""
 		status.IP = ""
+		status.TargetIP = ""
 		status.StorageIP = ""
+		status.StorageTargetIP = ""
 		status.Port = 0
+		status.TargetPort = 0
 		h.resetInstanceErrorCondition(status)
 	case longhorn.InstanceStateStopped:
 		if status.Started {
@@ -203,8 +392,11 @@ func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceMan
 		}
 		status.CurrentImage = ""
 		status.IP = ""
+		status.TargetIP = ""
 		status.StorageIP = ""
+		status.StorageTargetIP = ""
 		status.Port = 0
+		status.TargetPort = 0
 		h.resetInstanceErrorCondition(status)
 	default:
 		if status.CurrentState != longhorn.InstanceStateError {
@@ -213,8 +405,11 @@ func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceMan
 		status.CurrentState = longhorn.InstanceStateError
 		status.CurrentImage = ""
 		status.IP = ""
+		status.TargetIP = ""
 		status.StorageIP = ""
+		status.StorageTargetIP = ""
 		status.Port = 0
+		status.TargetPort = 0
 		h.resetInstanceErrorCondition(status)
 	}
 }
@@ -266,7 +461,7 @@ func (h *InstanceHandler) ReconcileInstanceState(obj interface{}, spec *longhorn
 		}
 	}
 	// There should be an available instance manager for a scheduled instance when its related engine image is compatible
-	if im == nil && spec.Image != "" && spec.NodeID != "" {
+	if im == nil && spec.NodeID != "" && h.isSpecImageReady(obj, spec) {
 		dataEngineEnabled, err := h.ds.IsDataEngineEnabled(spec.DataEngine)
 		if err != nil {
 			return err
@@ -280,7 +475,7 @@ func (h *InstanceHandler) ReconcileInstanceState(obj interface{}, spec *longhorn
 			return err
 		}
 		if !isNodeDownOrDeleted {
-			im, err = h.ds.GetInstanceManagerByInstanceRO(obj)
+			im, err = h.getInstanceManagerRO(obj, spec, status)
 			if err != nil {
 				return errors.Wrapf(err, "failed to get instance manager for instance %v", instanceName)
 			}
@@ -328,16 +523,45 @@ func (h *InstanceHandler) ReconcileInstanceState(obj interface{}, spec *longhorn
 
 		if i, exists := instances[instanceName]; exists && i.Status.State == longhorn.InstanceStateRunning {
 			status.Started = true
-			break
+			if h.isEngineOfV2DataEngine(obj, spec.DataEngine) {
+				if isDataEngineNotBeingLiveUpgraded(spec, status) {
+					// Not in data engine live upgrade
+					break
+				}
+			} else {
+				break
+			}
 		}
 
 		// there is a delay between createInstance() invocation and InstanceManager update,
 		// createInstance() may be called multiple times.
-		if status.CurrentState != longhorn.InstanceStateStopped {
-			break
+		if h.isEngineOfV2DataEngine(obj, spec.DataEngine) {
+			if status.CurrentState != longhorn.InstanceStateStopped && status.CurrentState != longhorn.InstanceStateSuspended {
+				if status.CurrentState != longhorn.InstanceStateRunning || isDataEngineNotBeingLiveUpgraded(spec, status) {
+					break
+				}
+			}
+		} else {
+			if status.CurrentState != longhorn.InstanceStateStopped {
+				break
+			}
 		}
 
-		err = h.createInstance(instanceName, spec.DataEngine, runtimeObj)
+		if h.isEngineOfV2DataEngine(obj, spec.DataEngine) {
+			if spec.TargetNodeID != "" {
+				if h.isEngineResumeRequired(spec, status) {
+					// Resume the suspended initiator instance
+					err = h.resumeInstance(instanceName, spec.DataEngine, runtimeObj)
+				} else {
+					// Create target instance if it's not created yet
+					err = h.createInstance(instanceName, spec.DataEngine, runtimeObj)
+				}
+			} else {
+				err = h.createInstance(instanceName, spec.DataEngine, runtimeObj)
+			}
+		} else {
+			err = h.createInstance(instanceName, spec.DataEngine, runtimeObj)
+		}
 		if err != nil {
 			return err
 		}
@@ -359,12 +583,52 @@ func (h *InstanceHandler) ReconcileInstanceState(obj interface{}, spec *longhorn
 				}
 			}
 		}
+		if h.isEngineOfV2DataEngine(obj, spec.DataEngine) {
+			if status.CurrentTargetNodeID != "" {
+				targetIM, err := h.ds.GetRunningInstanceManagerByNodeRO(status.CurrentTargetNodeID, spec.DataEngine)
+				if err != nil {
+					return err
+				}
+				if targetIM != nil {
+					if err := h.deleteTarget(instanceName, runtimeObj); err != nil {
+						if !types.ErrorIsNotFound(err) {
+							return err
+						}
+					}
+				}
+			}
+		}
 		status.Started = false
+	case longhorn.InstanceStateSuspended:
+		if !h.isEngineOfV2DataEngine(obj, spec.DataEngine) {
+			return fmt.Errorf("instance %v is not an engine of v2 volume", instanceName)
+		}
+
+		if err := h.suspendInstance(instanceName, runtimeObj); err != nil {
+			return err
+		}
+
+		if err := h.switchOverTarget(instanceName, runtimeObj); err != nil {
+			logrus.Infof("Resuming instance %v after failing to switch over target", instanceName)
+			errResume := h.resumeInstance(instanceName, spec.DataEngine, runtimeObj)
+			if errResume != nil {
+				logrus.WithError(errResume).Errorf("Failed to resume instance %v after failing to switch over target", instanceName)
+			}
+			return err
+		}
+
+		if spec.TargetNodeID != status.CurrentTargetNodeID {
+			if err := h.deleteTarget(instanceName, runtimeObj); err != nil {
+				if !types.ErrorIsNotFound(err) {
+					return err
+				}
+			}
+		}
 	default:
 		return fmt.Errorf("unknown instance desire state: desire %v", spec.DesireState)
 	}
 
-	h.syncStatusWithInstanceManager(im, instanceName, spec, status, instances)
+	h.syncStatusWithInstanceManager(im, obj, spec, status, instances)
 
 	switch status.CurrentState {
 	case longhorn.InstanceStateRunning:
@@ -373,7 +637,9 @@ func (h *InstanceHandler) ReconcileInstanceState(obj interface{}, spec *longhorn
 			if spec.NodeID != im.Spec.NodeID {
 				status.CurrentState = longhorn.InstanceStateError
 				status.IP = ""
+				status.TargetIP = ""
 				status.StorageIP = ""
+				status.StorageTargetIP = ""
 				err := fmt.Errorf("instance %v NodeID %v is not the same as the instance manager %v NodeID %v",
 					instanceName, spec.NodeID, im.Name, im.Spec.NodeID)
 				return err
@@ -447,25 +713,83 @@ func (h *InstanceHandler) printInstanceLogs(instanceName string, obj runtime.Obj
 	return nil
 }
 
-func (h *InstanceHandler) createInstance(instanceName string, dataEngine longhorn.DataEngineType, obj runtime.Object) error {
-	_, err := h.instanceManagerHandler.GetInstance(obj)
-	if err == nil {
-		return nil
-	}
-	if !types.ErrorIsNotFound(err) && !(types.IsDataEngineV2(dataEngine) && types.ErrorIsStopped(err)) {
-		return errors.Wrapf(err, "Failed to get instance process %v", instanceName)
-	}
+func (h *InstanceHandler) createInstance(instanceName string, dataEngine longhorn.DataEngineType, obj runtime.Object) (err error) {
+	if h.isEngineOfV2DataEngine(obj, dataEngine) {
+		instanceExists := false
+		targetInstanceRequired := false
 
-	logrus.Infof("Creating instance %v", instanceName)
-	if _, err := h.instanceManagerHandler.CreateInstance(obj); err != nil {
-		if !types.ErrorAlreadyExists(err) {
-			h.eventRecorder.Eventf(obj, corev1.EventTypeWarning, constant.EventReasonFailedStarting, "Error starting %v: %v", instanceName, err)
-			return err
+		instance, err := h.instanceManagerHandler.GetInstance(obj, false)
+		if err == nil {
+			instanceExists = true
+
+			targetInstanceRequired, err = h.instanceManagerHandler.RequireRemoteTargetInstance(obj)
+			if err != nil {
+				return errors.Wrapf(err, "failed to check if remote target instance for %v is required", instanceName)
+			}
+			if targetInstanceRequired {
+				_, err = h.instanceManagerHandler.GetInstance(obj, true)
+				if err == nil {
+					return nil
+				}
+			} else {
+				if instance.Status.StandbyTargetPortStart != 0 || instance.Status.TargetPortStart != 0 {
+					return nil
+				}
+
+				targetInstanceRequired = true
+				err = fmt.Errorf("cannot find local target instance for %v", instanceName)
+			}
 		}
-		// Already exists, lost track may due to previous datastore conflict
-		return nil
+
+		if !types.ErrorIsNotFound(err) && !(types.IsDataEngineV2(dataEngine) && types.ErrorIsStopped(err)) {
+			return errors.Wrapf(err, "failed to get instance %v", instanceName)
+		}
+
+		if !instanceExists {
+			logrus.Infof("Creating instance %v", instanceName)
+			if _, err := h.instanceManagerHandler.CreateInstance(obj, false); err != nil {
+				if !types.ErrorAlreadyExists(err) {
+					h.eventRecorder.Eventf(obj, corev1.EventTypeWarning, constant.EventReasonFailedStarting, "Error starting %v: %v", instanceName, err)
+					return err
+				}
+				// Already exists, lost track may due to previous datastore conflict
+				return nil
+			}
+			h.eventRecorder.Eventf(obj, corev1.EventTypeNormal, constant.EventReasonStart, "Starts instance %v", instanceName)
+		}
+
+		if targetInstanceRequired {
+			logrus.Infof("Creating target instance %v", instanceName)
+			if _, err := h.instanceManagerHandler.CreateInstance(obj, true); err != nil {
+				if !types.ErrorAlreadyExists(err) {
+					h.eventRecorder.Eventf(obj, corev1.EventTypeWarning, constant.EventReasonFailedStarting, "Error starting %v: %v", instanceName, err)
+					return err
+				}
+				// Already exists, lost track may due to previous datastore conflict
+				return nil
+			}
+			h.eventRecorder.Eventf(obj, corev1.EventTypeNormal, constant.EventReasonStart, "Starts target instance %v", instanceName)
+		}
+	} else {
+		_, err := h.instanceManagerHandler.GetInstance(obj, false)
+		if err == nil {
+			return nil
+		}
+		if !types.ErrorIsNotFound(err) && !(types.IsDataEngineV2(dataEngine) && types.ErrorIsStopped(err)) {
+			return errors.Wrapf(err, "Failed to get instance process %v", instanceName)
+		}
+
+		logrus.Infof("Creating instance %v", instanceName)
+		if _, err := h.instanceManagerHandler.CreateInstance(obj, false); err != nil {
+			if !types.ErrorAlreadyExists(err) {
+				h.eventRecorder.Eventf(obj, corev1.EventTypeWarning, constant.EventReasonFailedStarting, "Error starting %v: %v", instanceName, err)
+				return err
+			}
+			// Already exists, lost track may due to previous datastore conflict
+			return nil
+		}
+		h.eventRecorder.Eventf(obj, corev1.EventTypeNormal, constant.EventReasonStart, "Starts %v", instanceName)
 	}
-	h.eventRecorder.Eventf(obj, corev1.EventTypeNormal, constant.EventReasonStart, "Starts %v", instanceName)
 
 	return nil
 }
@@ -481,3 +805,191 @@ func (h *InstanceHandler) deleteInstance(instanceName string, obj runtime.Object
 
 	return nil
 }
+
+func (h *InstanceHandler) suspendInstance(instanceName string, obj runtime.Object) error {
+	logrus.Infof("Suspending instance %v", instanceName)
+
+	if _, err := h.instanceManagerHandler.GetInstance(obj, false); err != nil {
+		return errors.Wrapf(err, "failed to get instance %v for suspension", instanceName)
+	}
+
+	if err := h.instanceManagerHandler.SuspendInstance(obj); err != nil {
+		return errors.Wrapf(err, "failed to suspend instance %v", instanceName)
+	}
+
+	return nil
+}
+
+func (h *InstanceHandler) resumeInstance(instanceName string, dataEngine longhorn.DataEngineType, obj runtime.Object) error {
+	logrus.Infof("Resuming instance %v", instanceName)
+
+	if types.IsDataEngineV1(dataEngine) {
+		return fmt.Errorf("resuming instance is not supported for data engine %v", dataEngine)
+	}
+
+	if _, err := h.instanceManagerHandler.GetInstance(obj, false); err != nil {
+		return errors.Wrapf(err, "failed to get instance %v for resumption", instanceName)
+	}
+
+	if err := h.instanceManagerHandler.ResumeInstance(obj); err != nil {
+		return errors.Wrapf(err, "failed to resume instance %v", instanceName)
+	}
+
+	return nil
+}
+
+func (h *InstanceHandler) switchOverTarget(instanceName string, obj runtime.Object) error {
+	logrus.Infof("Switching over target for instance %v", instanceName)
+
+	if err := h.instanceManagerHandler.SwitchOverTarget(obj); err != nil {
+		return errors.Wrapf(err, "failed to switch over target for instance %s", instanceName)
+	}
+
+	return nil
+}
+
+func (h *InstanceHandler) deleteTarget(instanceName string, obj runtime.Object) error {
+	logrus.Infof("Deleting target for instance %s", instanceName)
+
+	if err := h.instanceManagerHandler.DeleteTarget(obj); err != nil {
+		return errors.Wrapf(err, "failed to delete target for instance %s", instanceName)
+	}
+
+	return nil
+}
+
+func (h *InstanceHandler) isV2DataEngineBeingUpgraded(spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus) bool {
+	if !types.IsDataEngineV2(spec.DataEngine) {
+		return false
+	}
+
+	upgradeRequested, err := h.ds.IsNodeDataEngineUpgradeRequested(spec.NodeID)
+	if err != nil {
+		logrus.WithError(err).Errorf("Failed to get node %v", spec.NodeID)
+		return false
+	}
+
+	if !upgradeRequested {
+		return false
+	}
+
+	if spec.TargetNodeID == "" {
+		return false
+	}
+
+	return spec.NodeID != spec.TargetNodeID && spec.TargetNodeID == status.CurrentTargetNodeID
+}
+
+func isVolumeBeingSwitchedBack(spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus) bool {
+	return spec.NodeID == spec.TargetNodeID && spec.TargetNodeID != status.CurrentTargetNodeID
+}
+
+func isTargetInstanceReplacementCreated(instance *longhorn.InstanceProcess) bool {
+	return instance.Status.StandbyTargetPortStart != 0
+}
+
+func isTargetInstanceRemote(instance *longhorn.InstanceProcess) bool {
+	return instance.Status.PortStart != 0 && instance.Status.TargetPortStart == 0
+}
+
+func (h *InstanceHandler) getInstanceManagerRO(obj interface{}, spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus) (*longhorn.InstanceManager, error) {
+	// Only happen when upgrading instance-manager image
+	if spec.DesireState == longhorn.InstanceStateRunning && status.CurrentState == longhorn.InstanceStateSuspended {
+		return h.ds.GetRunningInstanceManagerByNodeRO(spec.NodeID, spec.DataEngine)
+	}
+
+	return h.ds.GetInstanceManagerByInstanceRO(obj, false)
+}
+
+func (h *InstanceHandler) handleIfInstanceManagerStarting(imName, instanceName string, status *longhorn.InstanceStatus) {
+	if status.CurrentState != longhorn.InstanceStateError {
+		logrus.Warnf("Marking the instance as state ERROR since the starting instance manager %v shouldn't contain the running instance %v", imName, instanceName)
+	}
+	status.CurrentState = longhorn.InstanceStateError
+	status.CurrentImage = ""
+	status.IP = ""
+	status.TargetIP = ""
+	status.StorageIP = ""
+	status.StorageTargetIP = ""
+	status.Port = 0
+	status.TargetPort = 0
+	h.resetInstanceErrorCondition(status)
+}
+
+func (h *InstanceHandler) isInstanceExist(instanceName string, spec *longhorn.InstanceSpec) (bool, error) {
+	var err error
+
+	im, err := h.ds.GetRunningInstanceManagerByNodeRO(spec.NodeID, spec.DataEngine)
+	if err != nil {
+		return false, err
+	}
+
+	client, err := engineapi.NewInstanceManagerClient(im, false)
+	if err != nil {
+		return false, err
+	}
+	defer client.Close()
+
+	_, err = client.InstanceGet(spec.DataEngine, instanceName, string(longhorn.InstanceManagerTypeEngine))
+	if err != nil {
+		return false, err
+	}
+
+	return true, nil
+}
+
+func (h *InstanceHandler) isSpecImageReady(obj interface{}, spec *longhorn.InstanceSpec) bool {
+	if types.IsDataEngineV1(spec.DataEngine) {
+		return spec.Image != ""
+	}
+
+	if h.instanceManagerHandler.IsEngine(obj) {
+		return spec.Image != ""
+	}
+
+	// spec.image is not required for replica because the image of a v2 replica
+	// is the same as the running instance manager.
+	return true
+}
+
+func (h *InstanceHandler) isTargetInstanceReplacementRunning(instanceName string, spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus) (bool, error) {
+	if spec.TargetNodeID == "" {
+		return false, nil
+	}
+
+	logrus.Infof("Checking whether instance %v is running on target node %v", instanceName, spec.TargetNodeID)
+
+	im, err := h.ds.GetRunningInstanceManagerByNodeRO(spec.TargetNodeID, spec.DataEngine)
+	if err != nil {
+		return false, errors.Wrapf(err, "failed to get instance manager on node %v for checking if target instance is running", spec.TargetNodeID)
+	}
+
+	c, err := engineapi.NewInstanceManagerClient(im, false)
+	if err != nil {
+		return false, errors.Wrapf(err, "failed to create instance manager client for target instance")
+	}
+	defer c.Close()
+
+	instance, err := c.InstanceGet(spec.DataEngine, instanceName, string(longhorn.InstanceManagerTypeEngine))
+	if err != nil {
+		return false, errors.Wrapf(err, "failed to get target instance %v on node %v", instanceName, spec.TargetNodeID)
+	}
+
+	if isVolumeBeingSwitchedBack(spec, status) {
+		return isTargetInstanceRemote(instance) && isTargetInstanceReplacementCreated(instance), nil
+	}
+
+	return true, nil
+}
+
+func (h *InstanceHandler) isEngineOfV2DataEngine(obj interface{}, dataEngine longhorn.DataEngineType) bool {
+	return types.IsDataEngineV2(dataEngine) && h.instanceManagerHandler.IsEngine(obj)
+}
+
+func (h *InstanceHandler) isEngineResumeRequired(spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus) bool {
+	return spec.TargetNodeID == status.CurrentTargetNodeID && status.CurrentState == longhorn.InstanceStateSuspended
+}
+
+func isDataEngineNotBeingLiveUpgraded(spec *longhorn.InstanceSpec, status *longhorn.InstanceStatus) bool {
+	return spec.TargetNodeID == "" && !status.TargetInstanceReplacementCreated
+}
diff --git a/controller/instance_handler_test.go b/controller/instance_handler_test.go
index 68fe69d6a4..2602b4421f 100644
--- a/controller/instance_handler_test.go
+++ b/controller/instance_handler_test.go
@@ -36,7 +36,7 @@ const (
 
 type MockInstanceManagerHandler struct{}
 
-func (imh *MockInstanceManagerHandler) GetInstance(obj interface{}) (*longhorn.InstanceProcess, error) {
+func (imh *MockInstanceManagerHandler) GetInstance(obj interface{}, isInstanceOnRemoteNode bool) (*longhorn.InstanceProcess, error) {
 	metadata, err := meta.Accessor(obj)
 	if err != nil {
 		return nil, err
@@ -48,7 +48,7 @@ func (imh *MockInstanceManagerHandler) GetInstance(obj interface{}) (*longhorn.I
 	return &longhorn.InstanceProcess{}, nil
 }
 
-func (imh *MockInstanceManagerHandler) CreateInstance(obj interface{}) (*longhorn.InstanceProcess, error) {
+func (imh *MockInstanceManagerHandler) CreateInstance(obj interface{}, isInstanceOnRemoteNode bool) (*longhorn.InstanceProcess, error) {
 	metadata, err := meta.Accessor(obj)
 	if err != nil {
 		return nil, err
@@ -72,10 +72,36 @@ func (imh *MockInstanceManagerHandler) DeleteInstance(obj interface{}) error {
 	return nil
 }
 
+func (imh *MockInstanceManagerHandler) SuspendInstance(obj interface{}) error {
+	return fmt.Errorf("SuspendInstance is not mocked")
+}
+
+func (imh *MockInstanceManagerHandler) ResumeInstance(obj interface{}) error {
+	return fmt.Errorf("ResumeInstance is not mocked")
+}
+
+func (imh *MockInstanceManagerHandler) SwitchOverTarget(obj interface{}) error {
+	return fmt.Errorf("SwitchOverTarget is not mocked")
+}
+
+func (imh *MockInstanceManagerHandler) DeleteTarget(obj interface{}) error {
+	// DeleteTarget is not mocked
+	return nil
+}
+
+func (imh *MockInstanceManagerHandler) IsEngine(obj interface{}) bool {
+	_, ok := obj.(*longhorn.Engine)
+	return ok
+}
+
 func (imh *MockInstanceManagerHandler) LogInstance(ctx context.Context, obj interface{}) (*engineapi.InstanceManagerClient, *imapi.LogStream, error) {
 	return nil, nil, fmt.Errorf("LogInstance is not mocked")
 }
 
+func (imh *MockInstanceManagerHandler) RequireRemoteTargetInstance(obj interface{}) (bool, error) {
+	return false, nil
+}
+
 func newEngine(name, currentImage, imName, nodeName, ip string, port int, started bool, currentState, desireState longhorn.InstanceState) *longhorn.Engine {
 	var conditions []longhorn.Condition
 	conditions = types.SetCondition(conditions,
@@ -108,8 +134,11 @@ func newEngine(name, currentImage, imName, nodeName, ip string, port int, starte
 				CurrentImage:        currentImage,
 				InstanceManagerName: imName,
 				IP:                  ip,
+				TargetIP:            ip,
 				StorageIP:           ip,
+				StorageTargetIP:     ip,
 				Port:                port,
+				TargetPort:          0, // v1 volume doesn't set target port
 				Started:             started,
 				Conditions:          conditions,
 			},
diff --git a/controller/utils.go b/controller/utils.go
index 1c27ce1f64..cfecd2855e 100644
--- a/controller/utils.go
+++ b/controller/utils.go
@@ -23,10 +23,6 @@ func hasReplicaEvictionRequested(rs map[string]*longhorn.Replica) bool {
 	return false
 }
 
-func isVolumeUpgrading(v *longhorn.Volume) bool {
-	return v.Status.CurrentImage != v.Spec.Image
-}
-
 // isTargetVolumeOfAnActiveCloning checks if the input volume is the target volume of an on-going cloning process
 func isTargetVolumeOfAnActiveCloning(v *longhorn.Volume) bool {
 	isCloningDesired := types.IsDataFromVolume(v.Spec.DataSource)
diff --git a/controller/volume_controller.go b/controller/volume_controller.go
index 5e32ae1d31..5c64d3a93e 100644
--- a/controller/volume_controller.go
+++ b/controller/volume_controller.go
@@ -1004,9 +1004,12 @@ func (c *VolumeController) cleanupCorruptedOrStaleReplicas(v *longhorn.Volume, r
 				}
 				continue
 			} else if r.Spec.Image != v.Spec.Image {
-				// r.Spec.Active shouldn't be set for the leftover replicas, something must wrong
-				log.WithField("replica", r.Name).Warnf("Replica engine image %v is different from volume engine image %v, "+
-					"but replica spec.Active has been set", r.Spec.Image, v.Spec.Image)
+				// For a v2 volume, the instance manager image of a replica can be different from the one of its volume
+				if types.IsDataEngineV1(v.Spec.DataEngine) {
+					// r.Spec.Active shouldn't be set for the leftover replicas, something must wrong
+					log.WithField("replica", r.Name).Warnf("Replica engine image %v is different from volume engine image %v, "+
+						"but replica spec.Active has been set", r.Spec.Image, v.Spec.Image)
+				}
 			}
 		}
 
@@ -1613,7 +1616,7 @@ func (c *VolumeController) reconcileAttachDetachStateMachine(v *longhorn.Volume,
 						return err
 					}
 					if !c.areVolumeDependentResourcesOpened(e, rs) {
-						log.Warnf("Volume is attached but dependent resources are not opened")
+						log.WithField("e.Status.CurrentState", e.Status.CurrentState).Warn("Volume is attached but dependent resources are not opened")
 					}
 				}
 				return nil
@@ -1821,9 +1824,16 @@ func (c *VolumeController) openVolumeDependentResources(v *longhorn.Volume, e *l
 			return err
 		}
 		if canIMLaunchReplica {
-			if r.Spec.FailedAt == "" && r.Spec.Image == v.Status.CurrentImage {
+			if r.Spec.FailedAt == "" {
 				if r.Status.CurrentState == longhorn.InstanceStateStopped {
-					r.Spec.DesireState = longhorn.InstanceStateRunning
+					if types.IsDataEngineV1(e.Spec.DataEngine) {
+						if r.Spec.Image == v.Status.CurrentImage {
+							r.Spec.DesireState = longhorn.InstanceStateRunning
+						}
+					} else {
+						// For v2 volume, the image of replica is no need to be the same as the volume image
+						r.Spec.DesireState = longhorn.InstanceStateRunning
+					}
 				}
 			}
 		} else {
@@ -1860,8 +1870,11 @@ func (c *VolumeController) openVolumeDependentResources(v *longhorn.Volume, e *l
 		if r.Spec.NodeID == "" {
 			continue
 		}
-		if r.Spec.Image != v.Status.CurrentImage {
-			continue
+		// For v2 volume, the image of replica is no need to be the same as the volume image
+		if types.IsDataEngineV1(v.Spec.DataEngine) {
+			if r.Spec.Image != v.Status.CurrentImage {
+				continue
+			}
 		}
 		if r.Spec.EngineName != e.Name {
 			continue
@@ -1907,7 +1920,15 @@ func (c *VolumeController) openVolumeDependentResources(v *longhorn.Volume, e *l
 	}
 	e.Spec.NodeID = v.Spec.NodeID
 	e.Spec.ReplicaAddressMap = replicaAddressMap
-	e.Spec.DesireState = longhorn.InstanceStateRunning
+
+	if types.IsDataEngineV1(v.Spec.DataEngine) {
+		e.Spec.DesireState = longhorn.InstanceStateRunning
+	} else {
+		if v.Spec.Image == v.Status.CurrentImage && v.Spec.TargetNodeID == v.Status.CurrentTargetNodeID {
+			e.Spec.DesireState = longhorn.InstanceStateRunning
+		}
+	}
+
 	// The volume may be activated
 	e.Spec.DisableFrontend = v.Status.FrontendDisabled
 	e.Spec.Frontend = v.Spec.Frontend
@@ -1950,6 +1971,7 @@ func (c *VolumeController) closeVolumeDependentResources(v *longhorn.Volume, e *
 		}
 		e.Spec.RequestedBackupRestore = ""
 		e.Spec.NodeID = ""
+		e.Spec.TargetNodeID = ""
 		e.Spec.DesireState = longhorn.InstanceStateStopped
 	}
 	// must make sure engine stopped first before stopping replicas
@@ -2061,7 +2083,7 @@ func (c *VolumeController) canInstanceManagerLaunchReplica(r *longhorn.Replica)
 	if r.Status.InstanceManagerName != "" {
 		return true, nil
 	}
-	defaultIM, err := c.ds.GetInstanceManagerByInstanceRO(r)
+	defaultIM, err := c.ds.GetInstanceManagerByInstanceRO(r, false)
 	if err != nil {
 		return false, errors.Wrapf(err, "failed to find instance manager for replica %v", r.Name)
 	}
@@ -2211,6 +2233,20 @@ func hasLocalReplicaOnSameNodeAsEngine(e *longhorn.Engine, rs map[string]*longho
 // It will count all the potentially usable replicas, since some replicas maybe
 // blank or in rebuilding state
 func (c *VolumeController) replenishReplicas(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica, hardNodeAffinity string) error {
+	log := getLoggerForVolume(c.logger, v)
+
+	// Skip the replenishReplicas if the volume is being upgraded
+	if types.IsDataEngineV2(v.Spec.DataEngine) {
+		node, err := c.ds.GetNode(v.Status.OwnerID)
+		if err != nil {
+			return errors.Wrapf(err, "failed to get node %v for checking volume upgrade status", v.Status.OwnerID)
+		}
+		if node.Spec.DataEngineUpgradeRequested {
+			log.Info("Node is being upgraded, skip replenishing replica")
+			return nil
+		}
+	}
+
 	concurrentRebuildingLimit, err := c.ds.GetSettingAsInt(types.SettingNameConcurrentReplicaRebuildPerNodeLimit)
 	if err != nil {
 		return err
@@ -2248,8 +2284,6 @@ func (c *VolumeController) replenishReplicas(v *longhorn.Volume, e *longhorn.Eng
 		return nil
 	}
 
-	log := getLoggerForVolume(c.logger, v)
-
 	replenishCount, updateNodeAffinity := c.getReplenishReplicasCount(v, rs, e)
 	if hardNodeAffinity == "" && updateNodeAffinity != "" {
 		hardNodeAffinity = updateNodeAffinity
@@ -3027,8 +3061,8 @@ func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[str
 	}
 
 	log := getLoggerForVolume(c.logger, v).WithFields(logrus.Fields{
-		"engine":                   e.Name,
-		"volumeDesiredEngineImage": v.Spec.Image,
+		"engine":             e.Name,
+		"volumeDesiredImage": v.Spec.Image,
 	})
 
 	if !isVolumeUpgrading(v) {
@@ -3049,30 +3083,13 @@ func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[str
 	// If volume is detached accidentally during the live upgrade,
 	// the live upgrade info and the inactive replicas are meaningless.
 	if v.Status.State == longhorn.VolumeStateDetached {
-		if e.Spec.Image != v.Spec.Image {
-			e.Spec.Image = v.Spec.Image
-			e.Spec.UpgradedReplicaAddressMap = map[string]string{}
-		}
-		for _, r := range rs {
-			if r.Spec.Image != v.Spec.Image {
-				r.Spec.Image = v.Spec.Image
-				rs[r.Name] = r
-			}
-			if !r.Spec.Active {
-				log.Infof("Removing inactive replica %v when the volume is detached accidentally during the live upgrade", r.Name)
-				if err := c.deleteReplica(r, rs); err != nil {
-					return err
-				}
-			}
-		}
-		// TODO current replicas should be calculated by checking if there is
-		// any other image exists except for the v.Spec.Image
-		v.Status.CurrentImage = v.Spec.Image
-		return nil
+		return c.handleDetachedVolumeUpgrade(v, e, rs, log)
 	}
 
-	// Only start live upgrade if volume is healthy
-	if v.Status.State != longhorn.VolumeStateAttached || v.Status.Robustness != longhorn.VolumeRobustnessHealthy {
+	// Only start live upgrade if
+	// - v1 volume is healthy
+	// - v2 volume is healthy or degraded
+	if isVolumeNotEligibleForLiveUpgrade(v) {
 		if v.Status.State != longhorn.VolumeStateAttached || v.Status.Robustness != longhorn.VolumeRobustnessDegraded {
 			return nil
 		}
@@ -3159,18 +3176,160 @@ func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[str
 				e.Spec.Image = v.Spec.Image
 			}
 		}
-		c.finishLiveEngineUpgrade(v, e, rs, log)
 	} else {
-		// TODO: Implement the logic for data engine v2
+		if isV2DataEngineLiveUpgradeCompleted(v) {
+			return nil
+		}
+
+		log = log.WithField("engine", e.Name)
+
+		if v.Spec.TargetNodeID != "" {
+			if e.Spec.TargetNodeID != v.Spec.TargetNodeID {
+				if e.Spec.Image != v.Spec.Image {
+					log.Infof("Updating image from %s to %s for v2 data engine live upgrade", e.Spec.Image, v.Spec.Image)
+					e.Spec.Image = v.Spec.Image
+				}
+
+				log.Infof("Updating target node from %s to %s for v2 data engine live upgrade", e.Spec.TargetNodeID, v.Spec.TargetNodeID)
+				e.Spec.TargetNodeID = v.Spec.TargetNodeID
+				return nil
+			}
+
+			if !e.Status.TargetInstanceReplacementCreated && e.Status.CurrentTargetNodeID != v.Spec.TargetNodeID {
+				log.Debug("Waiting for target instance replacement to be created")
+				return nil
+			}
+
+			if e.Status.CurrentTargetNodeID != v.Spec.TargetNodeID {
+				if e.Status.CurrentState == longhorn.InstanceStateRunning {
+					log.Infof("Suspending engine for v2 data engine live upgrade")
+					e.Spec.DesireState = longhorn.InstanceStateSuspended
+					return nil
+				} else {
+					// TODO: what if e.Status.CurrentState != longhorn.InstanceStateRunning
+				}
+			}
+
+			// At this moment:
+			// 1. volume is running and healthy
+			// 2. engine is suspended
+			// 3. initiator is correcting to new target
+			// 4. old target is still existing
+
+			if replicaAddressMap, err := c.constructReplicaAddressMap(v, e, rs); err != nil {
+				return nil
+			} else {
+				if !reflect.DeepEqual(e.Spec.UpgradedReplicaAddressMap, replicaAddressMap) {
+					e.Spec.UpgradedReplicaAddressMap = replicaAddressMap
+					return nil
+				}
+			}
+
+			if e.Status.CurrentState == longhorn.InstanceStateSuspended {
+				log.Infof("Resuming engine for live upgrade")
+				e.Spec.DesireState = longhorn.InstanceStateRunning
+				return nil
+			}
+
+			if e.Status.CurrentState != longhorn.InstanceStateRunning {
+				log.Debugf("Engine is in %v, waiting for engine to be running", e.Status.CurrentState)
+				return nil
+			}
+
+			// At this point:
+			// 1. volume is running and healthy
+			// 1. engine is running
+			// 2. initiator is correcting to new target
+			// 4. old target is still deleted
+
+			if v.Status.CurrentTargetNodeID != v.Spec.TargetNodeID {
+				v.Status.CurrentTargetNodeID = v.Spec.TargetNodeID
+				return nil
+			}
+		}
+	}
+
+	c.finishLiveEngineUpgrade(v, e, rs, log)
+
+	return nil
+}
+
+func (c *VolumeController) handleDetachedVolumeUpgrade(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica, log logrus.FieldLogger) error {
+	if e.Spec.Image != v.Spec.Image {
+		e.Spec.Image = v.Spec.Image
+		e.Spec.UpgradedReplicaAddressMap = map[string]string{}
+		e.Spec.TargetNodeID = ""
+	}
+
+	for name, r := range rs {
+		if err := c.updateDetachedReplica(v, r); err != nil {
+			return err
+		}
+		if !r.Spec.Active {
+			log.Infof("Removing inactive replica %v when the volume is detached during live upgrade", r.Name)
+			if err := c.deleteReplica(r, rs); err != nil {
+				return err
+			}
+			continue
+		}
+		rs[name] = r
+	}
+	// TODO current replicas should be calculated by checking if there is
+	// any other image exists except for the v.Spec.Image
+	v.Status.CurrentImage = v.Spec.Image
+	return nil
+}
+
+func (c *VolumeController) updateDetachedReplica(v *longhorn.Volume, r *longhorn.Replica) error {
+	if r.Spec.Image == v.Spec.Image {
 		return nil
 	}
+
+	if types.IsDataEngineV1(v.Spec.DataEngine) {
+		r.Spec.Image = v.Spec.Image
+		return nil
+	}
+
+	im, err := c.ds.GetRunningInstanceManagerByNodeRO(r.Spec.NodeID, longhorn.DataEngineTypeV2)
+	if err != nil {
+		return err
+	}
+	r.Spec.Image = im.Spec.Image
 	return nil
 }
 
+func isVolumeNotEligibleForLiveUpgrade(v *longhorn.Volume) bool {
+	if v.Status.State != longhorn.VolumeStateAttached {
+		return true
+	}
+
+	if types.IsDataEngineV1(v.Spec.DataEngine) {
+		if v.Status.Robustness != longhorn.VolumeRobustnessHealthy {
+			return true
+		}
+	}
+
+	if types.IsDataEngineV2(v.Spec.DataEngine) {
+		if v.Status.Robustness != longhorn.VolumeRobustnessHealthy &&
+			v.Status.Robustness != longhorn.VolumeRobustnessDegraded {
+			return true
+		}
+	}
+
+	return false
+}
+
+func isV2DataEngineLiveUpgradeCompleted(v *longhorn.Volume) bool {
+	return v.Spec.TargetNodeID != "" &&
+		v.Spec.Image == v.Status.CurrentImage &&
+		v.Spec.TargetNodeID == v.Status.CurrentTargetNodeID &&
+		v.Spec.NodeID == v.Status.CurrentNodeID
+}
+
 func (c *VolumeController) constructReplicaAddressMap(v *longhorn.Volume, e *longhorn.Engine, dataPathToNewReplica map[string]*longhorn.Replica) (map[string]string, error) {
 	log := getLoggerForVolume(c.logger, v).WithFields(logrus.Fields{
-		"engine":                   e.Name,
-		"volumeDesiredEngineImage": v.Spec.Image,
+		"engine":             e.Name,
+		"volumeDesiredImage": v.Spec.Image,
 	})
 
 	replicaAddressMap := map[string]string{}
@@ -3200,8 +3359,8 @@ func (c *VolumeController) constructReplicaAddressMap(v *longhorn.Volume, e *lon
 
 func (c *VolumeController) groupReplicasByImageAndState(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica) (unknownReplicas, dataPathToOldRunningReplica, dataPathToNewReplica map[string]*longhorn.Replica) {
 	log := getLoggerForVolume(c.logger, v).WithFields(logrus.Fields{
-		"engine":                   e.Name,
-		"volumeDesiredEngineImage": v.Spec.Image,
+		"engine":             e.Name,
+		"volumeDesiredImage": v.Spec.Image,
 	})
 
 	unknownReplicas = map[string]*longhorn.Replica{}
@@ -3256,20 +3415,29 @@ func (c *VolumeController) checkOldAndNewEngineImagesForLiveUpgrade(v *longhorn.
 }
 
 func (c *VolumeController) finishLiveEngineUpgrade(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica, log *logrus.Entry) {
-	if e.Status.CurrentImage != v.Spec.Image ||
-		e.Status.CurrentState != longhorn.InstanceStateRunning {
+	if e.Status.CurrentImage != v.Spec.Image {
+		return
+	}
+	if e.Status.CurrentState != longhorn.InstanceStateRunning {
 		return
 	}
 
-	c.switchActiveReplicas(rs, func(r *longhorn.Replica, image string) bool {
-		return r.Spec.Image == image && r.DeletionTimestamp.IsZero()
-	}, v.Spec.Image)
+	if types.IsDataEngineV1(v.Spec.DataEngine) {
+		c.switchActiveReplicas(rs, func(r *longhorn.Replica, image string) bool {
+			return r.Spec.Image == image && r.DeletionTimestamp.IsZero()
+		}, v.Spec.Image)
+	}
 
 	e.Spec.ReplicaAddressMap = e.Spec.UpgradedReplicaAddressMap
 	e.Spec.UpgradedReplicaAddressMap = map[string]string{}
 	// cleanupCorruptedOrStaleReplicas() will take care of old replicas
 	log.Infof("Engine %v has been upgraded from %v to %v", e.Name, v.Status.CurrentImage, v.Spec.Image)
 	v.Status.CurrentImage = v.Spec.Image
+
+	if v.Spec.TargetNodeID == "" {
+		v.Status.CurrentTargetNodeID = ""
+		e.Spec.TargetNodeID = ""
+	}
 }
 
 func (c *VolumeController) updateRequestedBackupForVolumeRestore(v *longhorn.Volume, e *longhorn.Engine) (err error) {
@@ -3646,6 +3814,13 @@ func (c *VolumeController) createEngine(v *longhorn.Volume, currentEngineName st
 }
 
 func (c *VolumeController) newReplica(v *longhorn.Volume, e *longhorn.Engine, hardNodeAffinity string) *longhorn.Replica {
+	image := v.Status.CurrentImage
+	if types.IsDataEngineV2(v.Spec.DataEngine) {
+		// spec.image of v2 replica can be empty and different from the volume image,
+		// because the image of a v2 replica is the same as the running instance manager.
+		image = ""
+	}
+
 	return &longhorn.Replica{
 		ObjectMeta: metav1.ObjectMeta{
 			Name:            types.GenerateReplicaNameForVolume(v.Name),
@@ -3655,7 +3830,7 @@ func (c *VolumeController) newReplica(v *longhorn.Volume, e *longhorn.Engine, ha
 			InstanceSpec: longhorn.InstanceSpec{
 				VolumeName:  v.Name,
 				VolumeSize:  v.Spec.Size,
-				Image:       v.Status.CurrentImage,
+				Image:       image,
 				DataEngine:  v.Spec.DataEngine,
 				DesireState: longhorn.InstanceStateStopped,
 			},
@@ -4907,3 +5082,13 @@ func (c *VolumeController) shouldCleanUpFailedReplica(v *longhorn.Volume, r *lon
 	}
 	return false
 }
+
+func isVolumeUpgrading(v *longhorn.Volume) bool {
+	imageNotUpdated := v.Status.CurrentImage != v.Spec.Image
+
+	if types.IsDataEngineV1(v.Spec.DataEngine) {
+		return imageNotUpdated
+	}
+
+	return imageNotUpdated || v.Spec.TargetNodeID != v.Status.CurrentTargetNodeID
+}
diff --git a/controller/volume_controller_test.go b/controller/volume_controller_test.go
index 6e1eab29f0..f85d321f74 100644
--- a/controller/volume_controller_test.go
+++ b/controller/volume_controller_test.go
@@ -503,8 +503,11 @@ func (s *TestSuite) TestVolumeLifeCycle(c *C) {
 		e.Status.OwnerID = TestNode1
 		e.Status.CurrentState = longhorn.InstanceStateStopped
 		e.Status.IP = ""
+		e.Status.TargetIP = ""
 		e.Status.StorageIP = ""
+		e.Status.StorageTargetIP = ""
 		e.Status.Port = 0
+		e.Status.TargetPort = 0
 		e.Status.Endpoint = ""
 		e.Status.CurrentImage = ""
 		e.Status.ReplicaModeMap = map[string]longhorn.ReplicaMode{}