Skip to content

Commit

Permalink
feat(v2 upgrade/engineapi): get replica addresses according to initia…
Browse files Browse the repository at this point in the history
…tor and target IPs

If initiatorIP is not equal to targetIP, exclude the replica on the node
where the initiator is running, because the node is going to be upgraded
and the instance-manager will be deleted.

Longhorn 9104

Signed-off-by: Derek Su <[email protected]>
  • Loading branch information
derekbit committed Dec 15, 2024
1 parent aad1878 commit 8ca5078
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 9 deletions.
138 changes: 129 additions & 9 deletions engineapi/instance_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package engineapi
import (
"context"
"fmt"
"net"
"path/filepath"
"strconv"

Expand Down Expand Up @@ -279,14 +280,16 @@ func parseInstance(p *imapi.Instance) *longhorn.InstanceProcess {
DataEngine: getDataEngineFromInstanceProcess(p),
},
Status: longhorn.InstanceProcessStatus{
Type: getTypeForInstance(longhorn.InstanceType(p.Type), p.Name),
State: longhorn.InstanceState(p.InstanceStatus.State),
ErrorMsg: p.InstanceStatus.ErrorMsg,
Conditions: p.InstanceStatus.Conditions,
PortStart: p.InstanceStatus.PortStart,
PortEnd: p.InstanceStatus.PortEnd,
TargetPortStart: p.InstanceStatus.TargetPortStart,
TargetPortEnd: p.InstanceStatus.TargetPortEnd,
Type: getTypeForInstance(longhorn.InstanceType(p.Type), p.Name),
State: longhorn.InstanceState(p.InstanceStatus.State),
ErrorMsg: p.InstanceStatus.ErrorMsg,
Conditions: p.InstanceStatus.Conditions,
PortStart: p.InstanceStatus.PortStart,
PortEnd: p.InstanceStatus.PortEnd,
TargetPortStart: p.InstanceStatus.TargetPortStart,
TargetPortEnd: p.InstanceStatus.TargetPortEnd,
StandbyTargetPortEnd: p.InstanceStatus.StandbyTargetPortEnd,
StandbyTargetPortStart: p.InstanceStatus.StandbyTargetPortStart,

// FIXME: These fields are not used, maybe we can deprecate them later.
Listen: "",
Expand Down Expand Up @@ -482,7 +485,10 @@ func (c *InstanceManagerClient) EngineInstanceCreate(req *EngineInstanceCreateRe
return nil, err
}
case longhorn.DataEngineTypeV2:
replicaAddresses = req.Engine.Status.CurrentReplicaAddressMap
replicaAddresses, err = getReplicaAddresses(req.Engine.Status.CurrentReplicaAddressMap, req.InitiatorAddress, req.TargetAddress)
if err != nil {
return nil, err
}
}

if c.GetAPIVersion() < 4 {
Expand Down Expand Up @@ -523,6 +529,31 @@ func (c *InstanceManagerClient) EngineInstanceCreate(req *EngineInstanceCreateRe
return parseInstance(instance), nil
}

func getReplicaAddresses(replicaAddresses map[string]string, initiatorAddress, targetAddress string) (map[string]string, error) {
initiatorIP, _, err := net.SplitHostPort(initiatorAddress)
if err != nil {
return nil, errors.New("invalid initiator address format")
}

targetIP, _, err := net.SplitHostPort(targetAddress)
if err != nil {
return nil, errors.New("invalid target address format")
}

addresses := make(map[string]string)
for name, addr := range replicaAddresses {
replicaIP, _, err := net.SplitHostPort(addr)
if err != nil {
return nil, errors.New("invalid replica address format")
}
if initiatorIP != targetIP && initiatorIP == replicaIP {
continue
}
addresses[name] = addr
}
return addresses, nil
}

type ReplicaInstanceCreateRequest struct {
Replica *longhorn.Replica
DiskName string
Expand Down Expand Up @@ -829,3 +860,92 @@ func (c *InstanceManagerClient) LogSetFlags(dataEngine longhorn.DataEngineType,

return c.instanceServiceGrpcClient.LogSetFlags(string(dataEngine), component, flags)
}

type EngineInstanceSuspendRequest struct {
Engine *longhorn.Engine
}

// EngineInstanceSuspend suspends engine instance
func (c *InstanceManagerClient) EngineInstanceSuspend(req *EngineInstanceSuspendRequest) error {
if req.Engine == nil {
return errors.New("EngineInstanceSuspend: engine is nil")
}

engine := req.Engine
switch engine.Spec.DataEngine {
case longhorn.DataEngineTypeV1:
return fmt.Errorf("engine suspension for data engine %v is not supported yet", longhorn.DataEngineTypeV1)
case longhorn.DataEngineTypeV2:
return c.instanceServiceGrpcClient.InstanceSuspend(string(engine.Spec.DataEngine), req.Engine.Name, string(longhorn.InstanceManagerTypeEngine))
default:
return fmt.Errorf("unknown data engine %v", engine.Spec.DataEngine)
}
}

type EngineInstanceResumeRequest struct {
Engine *longhorn.Engine
}

// EngineInstanceResume resumes engine instance
func (c *InstanceManagerClient) EngineInstanceResume(req *EngineInstanceResumeRequest) error {
if req.Engine == nil {
return errors.New("EngineInstanceResume: engine is nil")
}

engine := req.Engine
switch engine.Spec.DataEngine {
case longhorn.DataEngineTypeV1:
return fmt.Errorf("engine resumption for data engine %v is not supported yet", longhorn.DataEngineTypeV1)
case longhorn.DataEngineTypeV2:
return c.instanceServiceGrpcClient.InstanceResume(string(engine.Spec.DataEngine), req.Engine.Name, string(longhorn.InstanceManagerTypeEngine))
default:
return fmt.Errorf("unknown data engine %v", engine.Spec.DataEngine)
}
}

type EngineInstanceSwitchOverTargetRequest struct {
Engine *longhorn.Engine
TargetAddress string
}

// EngineInstanceSwitchOverTarget switches over target for engine instance
func (c *InstanceManagerClient) EngineInstanceSwitchOverTarget(req *EngineInstanceSwitchOverTargetRequest) error {
if req.Engine == nil {
return errors.New("EngineInstanceSwitchOverTarget: engine is nil")
}

if req.TargetAddress == "" {
return errors.New("EngineInstanceSwitchOverTarget: targetAddress is empty")
}

engine := req.Engine
switch engine.Spec.DataEngine {
case longhorn.DataEngineTypeV1:
return fmt.Errorf("target switchover for data engine %v is not supported yet", longhorn.DataEngineTypeV1)
case longhorn.DataEngineTypeV2:
return c.instanceServiceGrpcClient.InstanceSwitchOverTarget(string(engine.Spec.DataEngine), req.Engine.Name, string(longhorn.InstanceManagerTypeEngine), req.TargetAddress)
default:
return fmt.Errorf("unknown data engine %v", engine.Spec.DataEngine)
}
}

type EngineInstanceDeleteTargetRequest struct {
Engine *longhorn.Engine
}

// EngineInstanceDeleteTarget deletes target for engine instance
func (c *InstanceManagerClient) EngineInstanceDeleteTarget(req *EngineInstanceDeleteTargetRequest) error {
if req.Engine == nil {
return errors.New("EngineInstanceDeleteTarget: engine is nil")
}

engine := req.Engine
switch engine.Spec.DataEngine {
case longhorn.DataEngineTypeV1:
return fmt.Errorf("target deletion for data engine %v is not supported yet", longhorn.DataEngineTypeV1)
case longhorn.DataEngineTypeV2:
return c.instanceServiceGrpcClient.InstanceDeleteTarget(string(engine.Spec.DataEngine), req.Engine.Name, string(longhorn.InstanceManagerTypeEngine))
default:
return fmt.Errorf("unknown data engine %v", engine.Spec.DataEngine)
}
}
96 changes: 96 additions & 0 deletions engineapi/instance_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package engineapi

import "testing"

func TestGetReplicaAddresses(t *testing.T) {
tests := []struct {
name string
replicaAddresses map[string]string
initiatorAddress string
targetAddress string
expected map[string]string
expectError bool
}{
{
name: "No filtering needed",
replicaAddresses: map[string]string{
"replica1": "192.168.1.1:9502",
"replica2": "192.168.1.2:9502",
},
initiatorAddress: "192.168.1.3:9502",
targetAddress: "192.168.1.3:9502",
expected: map[string]string{
"replica1": "192.168.1.1:9502",
"replica2": "192.168.1.2:9502",
},
expectError: false,
},
{
name: "Filter out initiator IP",
replicaAddresses: map[string]string{
"replica1": "192.168.1.1:9502",
"replica2": "192.168.1.2:9502",
},
initiatorAddress: "192.168.1.1:9502",
targetAddress: "192.168.1.3:9502",
expected: map[string]string{
"replica2": "192.168.1.2:9502",
},
expectError: false,
},
{
name: "Invalid initiator address format",
replicaAddresses: map[string]string{
"replica1": "192.168.1.1:9502",
},
initiatorAddress: "192.168.1.1",
targetAddress: "192.168.1.3:9502",
expected: nil,
expectError: true,
},
{
name: "Invalid target address format",
replicaAddresses: map[string]string{
"replica1": "192.168.1.1:9502",
},
initiatorAddress: "192.168.1.3:9502",
targetAddress: "192.168.1.3",
expected: nil,
expectError: true,
},
{
name: "Invalid replica address format",
replicaAddresses: map[string]string{
"replica1": "192.168.1.1",
},
initiatorAddress: "192.168.1.3:9502",
targetAddress: "192.168.1.3:9502",
expected: nil,
expectError: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := getReplicaAddresses(tt.replicaAddresses, tt.initiatorAddress, tt.targetAddress)
if (err != nil) != tt.expectError {
t.Errorf("expected error: %v, got: %v", tt.expectError, err)
}
if !tt.expectError && !equalMaps(result, tt.expected) {
t.Errorf("expected: %v, got: %v", tt.expected, result)
}
})
}
}

func equalMaps(a, b map[string]string) bool {
if len(a) != len(b) {
return false
}
for k, v := range a {
if b[k] != v {
return false
}
}
return true
}

0 comments on commit 8ca5078

Please sign in to comment.