From 4ee161af6b98b7dcc756609c5cfa73d4d6836594 Mon Sep 17 00:00:00 2001 From: Cyclinder Kuo Date: Wed, 15 Jan 2025 15:51:37 +0800 Subject: [PATCH] Detect IPConflicting and gatewayReachable in ipam without coordinator Signed-off-by: Cyclinder Kuo --- Makefile | 4 +- api/v1/agent/models/coordinator_config.go | 6 - api/v1/agent/models/ip_config.go | 6 + api/v1/agent/openapi.yaml | 8 +- api/v1/agent/server/embedded_spec.go | 24 +- charts/spiderpool/templates/configmap.yaml | 2 + charts/spiderpool/values.yaml | 6 + cmd/coordinator/cmd/cni_types.go | 81 +---- cmd/coordinator/cmd/command_add.go | 117 +------ cmd/coordinator/cmd/utils.go | 36 +++ cmd/spiderpool-agent/cmd/coordinator.go | 38 --- cmd/spiderpool-agent/cmd/daemon.go | 8 +- cmd/spiderpool-agent/cmd/ipam.go | 4 +- cmd/spiderpool/cmd/command_add.go | 39 ++- cmd/spiderpool/cmd/command_test.go | 7 +- cmd/spiderpool/cmd/ipam_detection.go | 347 +++++++++++++++++++++ cmd/spiderpool/cmd/utils.go | 23 +- cmd/spiderpool/main.go | 9 + pkg/ipam/allocate.go | 53 +++- pkg/ipam/config.go | 2 + pkg/ippoolmanager/config.go | 6 +- pkg/ippoolmanager/ippool_manager.go | 3 + pkg/networking/gwconnection/connection.go | 182 ----------- pkg/networking/ipchecking/ipchecking.go | 249 --------------- pkg/networking/networking/packet.go | 159 ++++++++++ pkg/types/k8s.go | 2 + pkg/utils/convert/convert.go | 16 +- test/Makefile | 4 + test/Makefile.defs | 1 + 29 files changed, 754 insertions(+), 688 deletions(-) create mode 100644 cmd/spiderpool/cmd/ipam_detection.go delete mode 100644 pkg/networking/gwconnection/connection.go delete mode 100644 pkg/networking/ipchecking/ipchecking.go create mode 100644 pkg/networking/networking/packet.go diff --git a/Makefile b/Makefile index cf43b8731c..5a87b19541 100644 --- a/Makefile +++ b/Makefile @@ -330,12 +330,12 @@ e2e_init_cilium_ebpfservice: .PHONY: e2e_init_calico e2e_init_calico: $(QUIET) make e2e_init -e INSTALL_OVERLAY_CNI=true -e INSTALL_CALICO=true -e INSTALL_CILIUM=false -e E2E_SPIDERPOOL_ENABLE_SUBNET=false \ - -e E2E_SPIDERPOOL_ENABLE_DRA=true -e INSTALL_OVS=false + -e E2E_SPIDERPOOL_ENABLE_DRA=true -e INSTALL_OVS=false -e E2E_SPIDERPOOL_ENABLE_IPAM_DETECTION=true .PHONY: e2e_init_cilium_legacyservice e2e_init_cilium_legacyservice: $(QUIET) make e2e_init -e INSTALL_OVERLAY_CNI=true -e INSTALL_CALICO=false -e INSTALL_CILIUM=true -e DISABLE_KUBE_PROXY=false \ - -e E2E_SPIDERPOOL_ENABLE_SUBNET=false -e INSTALL_OVS=false + -e E2E_SPIDERPOOL_ENABLE_SUBNET=false -e INSTALL_OVS=false -e -e E2E_SPIDERPOOL_ENABLE_IPAM_DETECTION=true .PHONY: e2e_test e2e_test: diff --git a/api/v1/agent/models/coordinator_config.go b/api/v1/agent/models/coordinator_config.go index 3654a25c81..041c56ecad 100644 --- a/api/v1/agent/models/coordinator_config.go +++ b/api/v1/agent/models/coordinator_config.go @@ -22,12 +22,6 @@ import ( // swagger:model CoordinatorConfig type CoordinatorConfig struct { - // detect gateway - DetectGateway bool `json:"detectGateway,omitempty"` - - // detect IP conflict - DetectIPConflict bool `json:"detectIPConflict,omitempty"` - // hijack c ID r HijackCIDR []string `json:"hijackCIDR"` diff --git a/api/v1/agent/models/ip_config.go b/api/v1/agent/models/ip_config.go index 0e06e0c506..938d1c0e95 100644 --- a/api/v1/agent/models/ip_config.go +++ b/api/v1/agent/models/ip_config.go @@ -27,6 +27,12 @@ type IPConfig struct { // Required: true Address *string `json:"address"` + // enable gateway detection + EnableGatewayDetection bool `json:"enableGatewayDetection,omitempty"` + + // enable IP conflict detection + EnableIPConflictDetection bool `json:"enableIPConflictDetection,omitempty"` + // gateway Gateway string `json:"gateway,omitempty"` diff --git a/api/v1/agent/openapi.yaml b/api/v1/agent/openapi.yaml index ea32dcd9da..9580c29f9e 100644 --- a/api/v1/agent/openapi.yaml +++ b/api/v1/agent/openapi.yaml @@ -306,6 +306,10 @@ definitions: type: string vlan: type: integer + enableGatewayDetection: + type: boolean + enableIPConflictDetection: + type: boolean required: - version - address @@ -340,10 +344,6 @@ definitions: type: integer txQueueLen: type: integer - detectIPConflict: - type: boolean - detectGateway: - type: boolean vethLinkAddress: type: string required: diff --git a/api/v1/agent/server/embedded_spec.go b/api/v1/agent/server/embedded_spec.go index dd718cb184..23a0fe9112 100644 --- a/api/v1/agent/server/embedded_spec.go +++ b/api/v1/agent/server/embedded_spec.go @@ -291,12 +291,6 @@ func init() { "tunePodRoutes" ], "properties": { - "detectGateway": { - "type": "boolean" - }, - "detectIPConflict": { - "type": "boolean" - }, "hijackCIDR": { "type": "array", "items": { @@ -396,6 +390,12 @@ func init() { "address": { "type": "string" }, + "enableGatewayDetection": { + "type": "boolean" + }, + "enableIPConflictDetection": { + "type": "boolean" + }, "gateway": { "type": "string" }, @@ -846,12 +846,6 @@ func init() { "tunePodRoutes" ], "properties": { - "detectGateway": { - "type": "boolean" - }, - "detectIPConflict": { - "type": "boolean" - }, "hijackCIDR": { "type": "array", "items": { @@ -951,6 +945,12 @@ func init() { "address": { "type": "string" }, + "enableGatewayDetection": { + "type": "boolean" + }, + "enableIPConflictDetection": { + "type": "boolean" + }, "gateway": { "type": "string" }, diff --git a/charts/spiderpool/templates/configmap.yaml b/charts/spiderpool/templates/configmap.yaml index f51ae95a31..57a2158404 100644 --- a/charts/spiderpool/templates/configmap.yaml +++ b/charts/spiderpool/templates/configmap.yaml @@ -22,6 +22,8 @@ data: enableKubevirtStaticIP: {{ .Values.ipam.enableKubevirtStaticIP }} enableSpiderSubnet: {{ .Values.ipam.spiderSubnet.enable }} enableAutoPoolForApplication: {{ .Values.ipam.spiderSubnet.autoPool.enable }} + enableIPConflictDetection: {{ .Values.ipam.enableIPConflictDetection }} + enableGatewayDetection: {{ .Values.ipam.enableGatewayDetection }} {{- if and .Values.ipam.spiderSubnet.enable .Values.ipam.spiderSubnet.autoPool.enable }} clusterSubnetDefaultFlexibleIPNumber: {{ .Values.ipam.spiderSubnet.autoPool.defaultRedundantIPNumber }} {{- else}} diff --git a/charts/spiderpool/values.yaml b/charts/spiderpool/values.yaml index 70ac21eac0..711913b371 100644 --- a/charts/spiderpool/values.yaml +++ b/charts/spiderpool/values.yaml @@ -53,6 +53,12 @@ ipam: ## @param ipam.enableKubevirtStaticIP the feature to keep kubevirt vm pod static IP enableKubevirtStaticIP: true + ## @param ipam.enableIPConflictDetection enable IP conflict detection + enableIPConflictDetection: false + + ## @param ipam.enableGatewayDetection enable gateway detection + enableGatewayDetection: false + spiderSubnet: ## @param ipam.spiderSubnet.enable SpiderSubnet feature. enable: true diff --git a/cmd/coordinator/cmd/cni_types.go b/cmd/coordinator/cmd/cni_types.go index 430a5dd4b9..5ea005d359 100644 --- a/cmd/coordinator/cmd/cni_types.go +++ b/cmd/coordinator/cmd/cni_types.go @@ -11,7 +11,6 @@ import ( "path/filepath" "regexp" "strings" - "time" "github.com/containernetworking/cni/pkg/types" "github.com/containernetworking/cni/pkg/version" @@ -44,24 +43,21 @@ const ( type Config struct { types.NetConf - DetectGateway *bool `json:"detectGateway,omitempty"` - VethLinkAddress string `json:"vethLinkAddress,omitempty"` - MacPrefix string `json:"podMACPrefix,omitempty"` - MultusNicPrefix string `json:"multusNicPrefix,omitempty"` - PodDefaultCniNic string `json:"podDefaultCniNic,omitempty"` - OverlayPodCIDR []string `json:"overlayPodCIDR,omitempty"` - ServiceCIDR []string `json:"serviceCIDR,omitempty"` - HijackCIDR []string `json:"hijackCIDR,omitempty"` - TunePodRoutes *bool `json:"tunePodRoutes,omitempty"` - PodDefaultRouteNIC string `json:"podDefaultRouteNic,omitempty"` - Mode Mode `json:"mode,omitempty"` - HostRuleTable *int64 `json:"hostRuleTable,omitempty"` - HostRPFilter *int32 `json:"hostRPFilter,omitempty" ` - PodRPFilter *int32 `json:"podRPFilter,omitempty" ` - TxQueueLen *int64 `json:"txQueueLen,omitempty"` - IPConflict *bool `json:"detectIPConflict,omitempty"` - DetectOptions *DetectOptions `json:"detectOptions,omitempty"` - LogOptions *LogOptions `json:"logOptions,omitempty"` + VethLinkAddress string `json:"vethLinkAddress,omitempty"` + MacPrefix string `json:"podMACPrefix,omitempty"` + MultusNicPrefix string `json:"multusNicPrefix,omitempty"` + PodDefaultCniNic string `json:"podDefaultCniNic,omitempty"` + OverlayPodCIDR []string `json:"overlayPodCIDR,omitempty"` + ServiceCIDR []string `json:"serviceCIDR,omitempty"` + HijackCIDR []string `json:"hijackCIDR,omitempty"` + TunePodRoutes *bool `json:"tunePodRoutes,omitempty"` + PodDefaultRouteNIC string `json:"podDefaultRouteNic,omitempty"` + Mode Mode `json:"mode,omitempty"` + HostRuleTable *int64 `json:"hostRuleTable,omitempty"` + HostRPFilter *int32 `json:"hostRPFilter,omitempty" ` + PodRPFilter *int32 `json:"podRPFilter,omitempty" ` + TxQueueLen *int64 `json:"txQueueLen,omitempty"` + LogOptions *LogOptions `json:"logOptions,omitempty"` } // DetectOptions enable ip conflicting check for pod's ip @@ -142,15 +138,6 @@ func ParseConfig(stdin []byte, coordinatorConfig *models.CoordinatorConfig) (*Co return nil, err } - if conf.IPConflict == nil && coordinatorConfig.DetectIPConflict { - conf.IPConflict = ptr.To(true) - } - - conf.DetectOptions, err = ValidateDelectOptions(conf.DetectOptions) - if err != nil { - return nil, err - } - if conf.HostRuleTable == nil && coordinatorConfig.HostRuleTable > 0 { conf.HostRuleTable = ptr.To(coordinatorConfig.HostRuleTable) } @@ -163,10 +150,6 @@ func ParseConfig(stdin []byte, coordinatorConfig *models.CoordinatorConfig) (*Co conf.HostRuleTable = ptr.To(int64(500)) } - if conf.DetectGateway == nil { - conf.DetectGateway = ptr.To(coordinatorConfig.DetectGateway) - } - if conf.TunePodRoutes == nil { conf.TunePodRoutes = coordinatorConfig.TunePodRoutes } @@ -270,37 +253,3 @@ func validateRPFilterConfig(rpfilter *int32, coordinatorConfig int64) (*int32, e } return rpfilter, nil } - -func ValidateDelectOptions(config *DetectOptions) (*DetectOptions, error) { - if config == nil { - return &DetectOptions{ - Interval: "10ms", - TimeOut: "100ms", - Retry: 3, - }, nil - } - - if config.Retry == 0 { - config.Retry = 3 - } - - if config.Interval == "" { - config.Interval = "10ms" - } - - if config.TimeOut == "" { - config.TimeOut = "500ms" - } - - _, err := time.ParseDuration(config.Interval) - if err != nil { - return nil, fmt.Errorf("invalid detectOptions.interval %s: %v, input like: 1s or 1m", config.Interval, err) - } - - _, err = time.ParseDuration(config.TimeOut) - if err != nil { - return nil, fmt.Errorf("invalid detectOptions.timeout %s: %v, input like: 1s or 1m", config.TimeOut, err) - } - - return config, nil -} diff --git a/cmd/coordinator/cmd/command_add.go b/cmd/coordinator/cmd/command_add.go index 1d0057c75c..5c6a165e51 100644 --- a/cmd/coordinator/cmd/command_add.go +++ b/cmd/coordinator/cmd/command_add.go @@ -4,10 +4,7 @@ package cmd import ( - "context" - "errors" "fmt" - "net" "time" "github.com/containernetworking/cni/pkg/skel" @@ -15,17 +12,13 @@ import ( current "github.com/containernetworking/cni/pkg/types/100" "github.com/containernetworking/plugins/pkg/ns" "github.com/vishvananda/netlink" - "go.uber.org/multierr" "go.uber.org/zap" "github.com/spidernet-io/spiderpool/api/v1/agent/client/daemonset" "github.com/spidernet-io/spiderpool/api/v1/agent/models" plugincmd "github.com/spidernet-io/spiderpool/cmd/spiderpool/cmd" "github.com/spidernet-io/spiderpool/pkg/constant" - "github.com/spidernet-io/spiderpool/pkg/errgroup" "github.com/spidernet-io/spiderpool/pkg/logutils" - "github.com/spidernet-io/spiderpool/pkg/networking/gwconnection" - "github.com/spidernet-io/spiderpool/pkg/networking/ipchecking" "github.com/spidernet-io/spiderpool/pkg/networking/networking" "github.com/spidernet-io/spiderpool/pkg/networking/sysctl" "github.com/spidernet-io/spiderpool/pkg/openapi" @@ -118,6 +111,7 @@ func CmdAdd(args *skel.CmdArgs) (err error) { if err != nil { return fmt.Errorf("failed to get current netns: %v", err) } + defer c.hostNs.Close() logger.Sugar().Debugf("Get current host netns: %v", c.hostNs.Path()) // checking if the nic is in up state @@ -189,43 +183,14 @@ func CmdAdd(args *skel.CmdArgs) (err error) { logger.Sugar().Infof("Get coordinator config: %+v", c) - errgConflict := errgroup.Group{} - // IP conflict detection must precede gateway detection, which avoids the - // possibility that gateway detection may update arp table entries first and cause - // communication problems when IP conflict detection fails - // see https://github.com/spidernet-io/spiderpool/issues/4475 - var ipc *ipchecking.IPChecker - if conf.IPConflict != nil && *conf.IPConflict { - logger.Debug("Try to detect ip conflict") - ipc, err = ipchecking.NewIPChecker(conf.DetectOptions.Retry, conf.DetectOptions.Interval, conf.DetectOptions.TimeOut, c.hostNs, c.netns, logger) - if err != nil { - return fmt.Errorf("failed to run NewIPChecker: %w", err) - } - ipc.DoIPConflictChecking(prevResult.IPs, c.currentInterface, &errgConflict) - } else { - logger.Debug("disable detect ip conflict") + // get ips of this interface(preInterfaceName) from, including ipv4 and ipv6 + c.currentAddress, err = networking.IPAddressByName(c.netns, args.IfName, ipFamily) + if err != nil { + logger.Error(err.Error()) + return fmt.Errorf("failed to IPAddressByName for pod %s : %v", args.IfName, err) } - if err = errgConflict.Wait(); err != nil { - logger.Error("failed to detect ip conflict", zap.Error(err)) - if errors.Is(err, constant.ErrIPConflict) { - logger.Info("ip conflict detected, clean up conflict IPs") - _, innerErr := client.Daemonset.DeleteIpamIps(daemonset.NewDeleteIpamIpsParams().WithContext(context.TODO()).WithIpamBatchDelArgs( - &models.IpamBatchDelArgs{ - ContainerID: &args.ContainerID, - NetNamespace: args.Netns, - PodName: (*string)(&k8sArgs.K8S_POD_NAME), - PodNamespace: (*string)(&k8sArgs.K8S_POD_NAMESPACE), - PodUID: (*string)(&k8sArgs.K8S_POD_UID), - }, - )) - if innerErr != nil { - logger.Sugar().Errorf("failed to clean up conflict IPs, error: %v", innerErr) - return multierr.Append(err, innerErr) - } - } - return err - } + logger.Debug("Get currentAddress", zap.Any("currentAddress", c.currentAddress)) // Fixed Mac addresses must come after IP conflict detection, otherwise the switch learns to communicate // with the wrong Mac address when IP conflict detection fails @@ -235,65 +200,10 @@ func CmdAdd(args *skel.CmdArgs) (err error) { return fmt.Errorf("failed to update hardware address for interface %s, maybe hardware_prefix(%s) is invalid: %v", args.IfName, conf.MacPrefix, err) } logger.Info("Fix mac address successfully", zap.String("interface", args.IfName), zap.String("macAddress", hwAddr)) - } - - // we do detect gateway connection lastly - // Finally, there is gateway detection, which updates the correct arp table entries - // once there are no IP address conflicts and fixed Mac addresses - errgGateway := errgroup.Group{} - if conf.DetectGateway != nil && *conf.DetectGateway { - logger.Debug("Try to detect gateway") - var gws []net.IP - err = c.netns.Do(func(netNS ns.NetNS) error { - gws, err = networking.GetDefaultGatewayByName(c.currentInterface, c.ipFamily) - if err != nil { - logger.Error("failed to GetDefaultGatewayByName", zap.Error(err)) - return fmt.Errorf("failed to GetDefaultGatewayByName: %v", err) - } - logger.Debug("Get GetDefaultGatewayByName", zap.Any("Gws", gws)) - p, err := gwconnection.New(conf.DetectOptions.Retry, conf.DetectOptions.Interval, conf.DetectOptions.TimeOut, c.currentInterface, logger) - if err != nil { - return fmt.Errorf("failed to init the gateway client: %v", err) - } - p.ParseAddrFromPreresult(prevResult.IPs) - for _, gw := range gws { - if gw.To4() != nil { - p.V4Gw = gw - errgGateway.Go(c.hostNs, c.netns, p.ArpingOverIface) - } else { - p.V6Gw = gw - errgGateway.Go(c.hostNs, c.netns, p.NDPingOverIface) - } - } - return nil + c.netns.Do(func(_ ns.NetNS) error { + return c.AnnounceIPs(logger) }) - if err != nil { - return err - } - } else { - logger.Debug("disable detect gateway") - } - - if err = errgGateway.Wait(); err != nil { - logger.Error("failed to detect gateway reachable", zap.Error(err)) - if errors.Is(err, constant.ErrGatewayUnreachable) { - logger.Info("gateway unreachable detected, clean up conflict IPs") - _, innerErr := client.Daemonset.DeleteIpamIps(daemonset.NewDeleteIpamIpsParams().WithContext(context.TODO()).WithIpamBatchDelArgs( - &models.IpamBatchDelArgs{ - ContainerID: &args.ContainerID, - NetNamespace: args.Netns, - PodName: (*string)(&k8sArgs.K8S_POD_NAME), - PodNamespace: (*string)(&k8sArgs.K8S_POD_NAMESPACE), - PodUID: (*string)(&k8sArgs.K8S_POD_UID), - }, - )) - if innerErr != nil { - logger.Sugar().Errorf("failed to clean up conflict IPs, error: %v", innerErr) - return multierr.Append(err, innerErr) - } - } - return err } // set txqueuelen @@ -305,15 +215,6 @@ func CmdAdd(args *skel.CmdArgs) (err error) { // ================================= - // get ips of this interface(preInterfaceName) from, including ipv4 and ipv6 - c.currentAddress, err = networking.IPAddressByName(c.netns, args.IfName, ipFamily) - if err != nil { - logger.Error(err.Error()) - return fmt.Errorf("failed to IPAddressByName for pod %s : %v", args.IfName, err) - } - - logger.Debug("Get currentAddress", zap.Any("currentAddress", c.currentAddress)) - if ipFamily != netlink.FAMILY_V4 { // ensure ipv6 is enable if err := sysctl.EnableIpv6Sysctl(c.netns, 0); err != nil { diff --git a/cmd/coordinator/cmd/utils.go b/cmd/coordinator/cmd/utils.go index c7696ca15c..461e4ad26f 100644 --- a/cmd/coordinator/cmd/utils.go +++ b/cmd/coordinator/cmd/utils.go @@ -11,6 +11,7 @@ import ( "github.com/cilium/cilium/pkg/mac" "github.com/containernetworking/plugins/pkg/ip" "github.com/containernetworking/plugins/pkg/ns" + "github.com/mdlayher/ndp" "github.com/vishvananda/netlink" "go.uber.org/zap" "golang.org/x/sys/unix" @@ -703,3 +704,38 @@ OUTER2: return finalNodeIpList, nil } + +func (c *coordinator) AnnounceIPs(logger *zap.Logger) error { + l, err := netlink.LinkByName(c.currentInterface) + if err != nil { + return err + } + + for _, addr := range c.currentAddress { + if addr.IP.To4() != nil { + // send an gratuitous arp to announce the new mac address + if err = networking.SendARPReuqest(l, addr.IP, addr.IP); err != nil { + logger.Error("failed to send gratuitous arps", zap.Error(err)) + } else { + logger.Debug("Send gratuitous arps successfully", zap.String("interface", c.currentInterface)) + } + } else { + ifi, err := net.InterfaceByName(c.currentInterface) + if err != nil { + return fmt.Errorf("failed to InterfaceByName %s: %w", c.currentInterface, err) + } + + ndpClient, _, err := ndp.Listen(ifi, ndp.LinkLocal) + if err != nil { + return fmt.Errorf("failed to init ndp client: %w", err) + } + defer ndpClient.Close() + if err = networking.SendUnsolicitedNeighborAdvertisement(addr.IP, ifi, ndpClient); err != nil { + logger.Error("failed to send unsolicited neighbor advertisements", zap.Error(err)) + } else { + logger.Debug("Send unsolicited neighbor advertisements successfully", zap.String("interface", c.currentInterface)) + } + } + } + return nil +} diff --git a/cmd/spiderpool-agent/cmd/coordinator.go b/cmd/spiderpool-agent/cmd/coordinator.go index 1697d2a8ca..a90a4d0857 100644 --- a/cmd/spiderpool-agent/cmd/coordinator.go +++ b/cmd/spiderpool-agent/cmd/coordinator.go @@ -8,9 +8,6 @@ import ( "github.com/go-openapi/runtime/middleware" corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - kubevirtv1 "kubevirt.io/api/core/v1" "github.com/spidernet-io/spiderpool/api/v1/agent/models" "github.com/spidernet-io/spiderpool/api/v1/agent/server/restapi/daemonset" @@ -28,7 +25,6 @@ func (g *_unixGetCoordinatorConfig) Handle(params daemonset.GetCoordinatorConfig ctx := params.HTTPRequest.Context() crdClient := agentContext.CRDManager.GetClient() podClient := agentContext.PodManager - kubevirtMgr := agentContext.KubevirtManager var coordList spiderpoolv2beta1.SpiderCoordinatorList if err := crdClient.List(ctx, &coordList); err != nil { @@ -52,38 +48,6 @@ func (g *_unixGetCoordinatorConfig) Handle(params daemonset.GetCoordinatorConfig return daemonset.NewGetCoordinatorConfigFailure().WithPayload(models.Error(fmt.Sprintf("failed to get pod %s/%s", params.GetCoordinatorConfig.PodNamespace, params.GetCoordinatorConfig.PodName))) } - isVMPod := false - // kubevirt vm pod corresponding SpiderEndpoint uses kubevirt VM/VMI name - ownerReference := metav1.GetControllerOf(pod) - if ownerReference != nil && agentContext.Cfg.EnableKubevirtStaticIP && ownerReference.APIVersion == kubevirtv1.SchemeGroupVersion.String() && ownerReference.Kind == constant.KindKubevirtVMI { - isVMPod = true - } - - // cancel IP conflict detection for the kubevirt vm live migration new pod - detectIPConflict := *coord.Spec.DetectIPConflict - if detectIPConflict && isVMPod { - // the live migration new pod has the annotation "kubevirt.io/migrationJobName" - // we just only cancel IP conflict detection for the live migration new pod. - podAnnos := pod.GetAnnotations() - vmimName, ok := podAnnos[kubevirtv1.MigrationJobNameAnnotation] - if ok { - _, err := kubevirtMgr.GetVMIMByName(ctx, pod.Namespace, vmimName, false) - if nil != err { - if apierrors.IsNotFound(err) { - logger.Sugar().Warnf("no kubevirt vm pod '%s/%s' corresponding VirtualMachineInstanceMigration '%s/%s' found, still execute IP conflict detection", - pod.Namespace, pod.Name, pod.Namespace, vmimName) - } else { - return daemonset.NewGetCoordinatorConfigFailure().WithPayload(models.Error(fmt.Sprintf("failed to get kubevirt vm pod '%s/%s' corresponding VirtualMachineInstanceMigration '%s/%s', error: %v", - pod.Namespace, pod.Name, pod.Namespace, vmimName, err))) - } - } else { - // cancel IP conflict detection because there's a moment the old vm pod still running during the vm live migration phase - logger.Sugar().Infof("cancel IP conflict detection for live migration new pod '%s/%s'", pod.Namespace, pod.Name) - detectIPConflict = false - } - } - } - var prefix string if coord.Spec.PodMACPrefix != nil { prefix = *coord.Spec.PodMACPrefix @@ -116,8 +80,6 @@ func (g *_unixGetCoordinatorConfig) Handle(params daemonset.GetCoordinatorConfig HostRuleTable: int64(*coord.Spec.HostRuleTable), PodRPFilter: int64(*coord.Spec.PodRPFilter), TxQueueLen: int64(*coord.Spec.TxQueueLen), - DetectGateway: *coord.Spec.DetectGateway, - DetectIPConflict: detectIPConflict, } if config.OverlayPodCIDR == nil { diff --git a/cmd/spiderpool-agent/cmd/daemon.go b/cmd/spiderpool-agent/cmd/daemon.go index 8196bfaef2..0aae5c48d4 100644 --- a/cmd/spiderpool-agent/cmd/daemon.go +++ b/cmd/spiderpool-agent/cmd/daemon.go @@ -170,6 +170,8 @@ func DaemonMain() { EnableStatefulSet: agentContext.Cfg.EnableStatefulSet, EnableKubevirtStaticIP: agentContext.Cfg.EnableKubevirtStaticIP, EnableReleaseConflictIPsForStateless: agentContext.Cfg.EnableReleaseConflictIPsForStateless, + EnableIPConflictDetection: agentContext.Cfg.EnableIPConflictDetection, + EnableGatewayDetection: agentContext.Cfg.EnableGatewayDetection, OperationRetries: agentContext.Cfg.WaitSubnetPoolMaxRetries, OperationGapDuration: time.Duration(agentContext.Cfg.WaitSubnetPoolTime) * time.Second, AgentNamespace: agentContext.Cfg.AgentPodNamespace, @@ -406,8 +408,10 @@ func initAgentServiceManagers(ctx context.Context) { logger.Debug("Begin to initialize IPPool manager") ipPoolManager, err := ippoolmanager.NewIPPoolManager( ippoolmanager.IPPoolManagerConfig{ - MaxAllocatedIPs: &agentContext.Cfg.IPPoolMaxAllocatedIPs, - EnableKubevirtStaticIP: agentContext.Cfg.EnableKubevirtStaticIP, + MaxAllocatedIPs: &agentContext.Cfg.IPPoolMaxAllocatedIPs, + EnableKubevirtStaticIP: agentContext.Cfg.EnableKubevirtStaticIP, + EnableIPConflictDetection: agentContext.Cfg.EnableIPConflictDetection, + EnableGatewayDetection: agentContext.Cfg.EnableGatewayDetection, }, agentContext.CRDManager.GetClient(), agentContext.CRDManager.GetAPIReader(), diff --git a/cmd/spiderpool-agent/cmd/ipam.go b/cmd/spiderpool-agent/cmd/ipam.go index 21cadca475..3615bef37a 100644 --- a/cmd/spiderpool-agent/cmd/ipam.go +++ b/cmd/spiderpool-agent/cmd/ipam.go @@ -123,7 +123,7 @@ type _unixDeleteAgentIpamIps struct{} // Handle handles DELETE requests for /ipam/ips. func (g *_unixDeleteAgentIpamIps) Handle(params daemonset.DeleteIpamIpsParams) middleware.Responder { err := params.IpamBatchDelArgs.Validate(strfmt.Default) - if nil != err { + if err != nil { return daemonset.NewDeleteIpamIpsFailure().WithPayload(models.Error(err.Error())) } @@ -160,6 +160,8 @@ func (g *_unixDeleteAgentIpamIps) Handle(params daemonset.DeleteIpamIpsParams) m return daemonset.NewDeleteIpamIpsOK() } +type _unixGetIpamIPDetectionConfigs struct{} + func gatherIPAMAllocationErrMetric(ctx context.Context, err error) { internal := true if errors.Is(err, constant.ErrWrongInput) { diff --git a/cmd/spiderpool/cmd/command_add.go b/cmd/spiderpool/cmd/command_add.go index 079a747183..c4c30bf320 100644 --- a/cmd/spiderpool/cmd/command_add.go +++ b/cmd/spiderpool/cmd/command_add.go @@ -5,6 +5,7 @@ package cmd import ( "context" + "errors" "fmt" "net" "runtime/debug" @@ -13,20 +14,23 @@ import ( "github.com/containernetworking/cni/pkg/skel" "github.com/containernetworking/cni/pkg/types" current "github.com/containernetworking/cni/pkg/types/100" + "github.com/containernetworking/plugins/pkg/ns" "github.com/go-openapi/strfmt" + "go.uber.org/multierr" "go.uber.org/zap" "github.com/spidernet-io/spiderpool/api/v1/agent/client/connectivity" "github.com/spidernet-io/spiderpool/api/v1/agent/client/daemonset" "github.com/spidernet-io/spiderpool/api/v1/agent/models" + "github.com/spidernet-io/spiderpool/pkg/constant" spiderpoolip "github.com/spidernet-io/spiderpool/pkg/ip" "github.com/spidernet-io/spiderpool/pkg/openapi" ) +var logger *zap.Logger + // CmdAdd follows CNI SPEC cmdAdd. func CmdAdd(args *skel.CmdArgs) (err error) { - var logger *zap.Logger - // Defer a panic recover, so that in case we panic we can still return // a proper error to the runtime. defer func() { @@ -39,7 +43,7 @@ func CmdAdd(args *skel.CmdArgs) (err error) { msg = fmt.Sprintf("%s: error=%v", msg, err.Error()) } - if nil != logger { + if logger != nil { logger.Sugar().Errorf("%s\n\n%s", msg, debug.Stack()) } } @@ -112,7 +116,7 @@ func CmdAdd(args *skel.CmdArgs) (err error) { logger.Debug("Send IPAM request") ipamResponse, err := spiderpoolAgentAPI.Daemonset.PostIpamIP(params) - if nil != err { + if err != nil { err := fmt.Errorf("%w: %v", ErrPostIPAM, err) logger.Error(err.Error()) return err @@ -125,6 +129,33 @@ func CmdAdd(args *skel.CmdArgs) (err error) { return err } + // do ip conflict and gateway detection + logger.Sugar().Debugf("postIpam response: %+v", *ipamResponse.Payload) + netns, err := ns.GetNS(args.Netns) + if err != nil { + logger.Error(err.Error()) + return fmt.Errorf("failed to GetNS %q for pod: %v", args.Netns, err) + } + defer netns.Close() + + hostNs, err := ns.GetCurrentNS() + if err != nil { + return fmt.Errorf("failed to get current netns: %v", err) + } + defer hostNs.Close() + + if err = DetectIPConflictAndGatewayReachable(args.IfName, hostNs, netns, ipamResponse.Payload.Ips); err != nil { + if errors.Is(err, constant.ErrIPConflict) || errors.Is(err, constant.ErrGatewayUnreachable) { + logger.Info("failed to detect IP conflict or gateway unreachable, clean up IPs") + if e := deleteIpamIps(spiderpoolAgentAPI, args, k8sArgs); err != nil { + logger.Sugar().Errorf("failed to clean up conflict IPs, error: %v", e) + return multierr.Append(err, e) + } + logger.Info("Successfully cleaned up IPs") + } + return err + } + // Assemble the result of IPAM request response. result, err := assembleResult(conf.CNIVersion, args.IfName, ipamResponse) if err != nil { diff --git a/cmd/spiderpool/cmd/command_test.go b/cmd/spiderpool/cmd/command_test.go index 6e784989ca..9aec307457 100644 --- a/cmd/spiderpool/cmd/command_test.go +++ b/cmd/spiderpool/cmd/command_test.go @@ -32,7 +32,6 @@ import ( ) const ifName string = "eth0" -const nsPath string = "/some/where" const containerID string = "dummy" const CNITimeoutSec = 120 const CNILogFilePath = "/tmp/spiderpool.log" @@ -49,6 +48,7 @@ var cniVersion string var args *skel.CmdArgs var netConf cmd.NetConf var sockPath string +var nsPath string var addChan, delChan chan struct{} @@ -69,6 +69,11 @@ var _ = Describe("spiderpool plugin", Label("unittest", "ipam_plugin_test"), fun tempDir := GinkgoT().TempDir() sockPath = tempDir + "/tmp.sock" + fakeNs, err := testutils.NewNS() + Expect(err).NotTo(HaveOccurred()) + nsPath = fakeNs.Path() + defer fakeNs.Close() + // cleanup the temp unix file at the end. DeferCleanup(func() { err := os.RemoveAll(sockPath) diff --git a/cmd/spiderpool/cmd/ipam_detection.go b/cmd/spiderpool/cmd/ipam_detection.go new file mode 100644 index 0000000000..75a834cea8 --- /dev/null +++ b/cmd/spiderpool/cmd/ipam_detection.go @@ -0,0 +1,347 @@ +// Copyright 2025 Authors of spidernet-io +// SPDX-License-Identifier: Apache-2.0 + +package cmd + +import ( + "fmt" + "net" + "time" + + "github.com/containernetworking/plugins/pkg/ns" + "github.com/mdlayher/arp" + "github.com/mdlayher/ndp" + "github.com/vishvananda/netlink" + "go.uber.org/zap" + + "github.com/spidernet-io/spiderpool/api/v1/agent/models" + "github.com/spidernet-io/spiderpool/pkg/constant" + "github.com/spidernet-io/spiderpool/pkg/errgroup" + "github.com/spidernet-io/spiderpool/pkg/networking/networking" +) + +var ( + retryNum = 3 + timeOut = 100 * time.Millisecond +) + +type Detector struct { + enableIPConflictDetection bool + enableGatewayReachableDetection bool + retries int + iface string + timeout time.Duration + netns, hostNs ns.NetNS + ip4, ip6, v4Gw, v6Gw net.IP + logger *zap.Logger +} + +func DetectIPConflictAndGatewayReachable(iface string, hostNs ns.NetNS, netns ns.NetNS, ipconfigs []*models.IPConfig) error { + d := &Detector{ + retries: retryNum, + timeout: timeOut, + netns: netns, + hostNs: hostNs, + logger: logger, + } + + errg := errgroup.Group{} + _ = d.netns.Do(func(netNS ns.NetNS) error { + for _, ipa := range ipconfigs { + if *ipa.Nic != iface { + // spiderpool assigns IPs to all NICs in advance of the first call to ipam. + // different NICs come from different pools, so we only need to focus on the current NIC's ipconfig. + continue + } + + d.enableIPConflictDetection = ipa.EnableIPConflictDetection + d.enableGatewayReachableDetection = ipa.EnableGatewayDetection && ipa.Gateway != "" + + if !d.enableIPConflictDetection || !d.enableGatewayReachableDetection { + continue + } + + // When IPAM is invoked, the NIC is down and must be set it up in order to detect IP conflicts and + // gateway reachability. + l, err := netlink.LinkByName(iface) + if err != nil { + return fmt.Errorf("failed to get link: %v", err) + } + + if err = netlink.LinkSetUp(l); err != nil { + return fmt.Errorf("failed to set link up: %v", err) + } + logger.Sugar().Debugf("set link %s up for IP conflict and gateway detection", iface) + + // The interface might not yet have carrier. Wait for it for a short time. + networking.WaitForCarrier(l, 200*time.Second) + + target := net.ParseIP(*ipa.Address) + if target.To4() != nil { + d.ip4 = target + errg.Go(hostNs, netns, d.ARPDetect) + } else { + d.ip6 = target + errg.Go(d.hostNs, d.netns, d.NDPDetect) + } + } + return nil + }) + + return errg.Wait() +} + +func (d *Detector) ARPDetect() error { + l, err := netlink.LinkByName(d.iface) + if err != nil { + return err + } + + ifi, err := net.InterfaceByName(d.iface) + if err != nil { + return fmt.Errorf("failed to InterfaceByName %s: %w", d.iface, err) + } + + arpClient, err := arp.Dial(ifi) + if err != nil { + return err + } + defer arpClient.Close() + + // IP conflict detection must precede gateway detection, which avoids the + // possibility that gateway detection may update arp table entries first and cause + // communication problems when IP conflict detection fails + // see https://github.com/spidernet-io/spiderpool/issues/4475 + // call ip conflict detection + if d.enableIPConflictDetection { + d.logger.Debug("Detect IPAddress If conflict for IPv4", zap.String("IPPool", d.ip4.String())) + err = d.detectIP4Conflicting(l, arpClient) + if err != nil { + return err + } + } else { + d.logger.Debug("IPConflitingDetection is disabled for IPv4", zap.String("IPPool", d.ip4.String())) + } + + // we do detect gateway connection lastly + // Finally, there is gateway detection, which updates the correct arp table entries + // once there are no IP address conflicts and fixed Mac addresses + // call gateway detection + if d.enableGatewayReachableDetection { + d.logger.Debug("Detect Gateway If reachable for IPv4", zap.String("IPPool", d.ip4.String()), zap.String("Gateway", d.ip4.String())) + if err = d.detectGateway4Reachable(l, arpClient); err != nil { + return err + } + } else { + d.logger.Debug("GatewayDetection is disabled for IPv4", zap.String("IPPool", d.ip4.String()), zap.String("Gateway", d.ip4.String())) + } + return nil +} + +func (d *Detector) NDPDetect() error { + ifi, err := net.InterfaceByName(d.iface) + if err != nil { + return fmt.Errorf("failed to InterfaceByName %s: %w", d.iface, err) + } + + ndpClient, _, err := ndp.Listen(ifi, ndp.LinkLocal) + if err != nil { + return fmt.Errorf("failed to init ndp client: %w", err) + } + defer ndpClient.Close() + + // IP conflict detection must precede gateway detection, which avoids the + // possibility that gateway detection may update arp table entries first and cause + // communication problems when IP conflict detection fails + // see https://github.com/spidernet-io/spiderpool/issues/4475 + // call ip conflict detection + if d.enableIPConflictDetection { + d.logger.Debug("Detect IPAddress If conflict for IPv6", zap.String("IPPool", d.ip6.String())) + err = d.detectIP6Conflicting(ifi, ndpClient) + if err != nil { + return err + } + } else { + d.logger.Debug("IPConflitingDetection is disabled for IPv6", zap.String("IPPool", d.ip6.String())) + } + + // we do detect gateway connection lastly + // Finally, there is gateway detection, which updates the correct arp table entries + // once there are no IP address conflicts and fixed Mac addresses + // call gateway detection + if d.enableGatewayReachableDetection { + d.logger.Debug("Detecting Gateway if reachable for IPv6", zap.String("IPPool", d.v6Gw.String()), zap.String("Gateway", d.v6Gw.String())) + d.detectGateway6Reachable(ifi, ndpClient) + } else { + d.logger.Debug("GatewayDetection is disabled for IPv6", zap.String("IPPool", d.v6Gw.String()), zap.String("Gateway", d.v6Gw.String())) + } + return nil +} + +func (d *Detector) detectIP4Conflicting(l netlink.Link, arpClient *arp.Client) error { + var err error + arpClient.SetDeadline(time.Now().Add(d.timeout)) + for i := 0; i < d.retries; i++ { + // we send a gratuitous arp to checking if ip is conflict + // we use dad mode(duplicate address detection mode), so + // we set source ip to 0.0.0.0 + err = networking.SendARPReuqest(l, net.ParseIP("0.0.0.0"), d.ip4) + if err == nil { + d.logger.Info("success to send ARP request") + break + } + d.logger.Error("failed to send ARP request, retrying...", zap.Error(err)) + } + + if err != nil { + d.logger.Error("after failed to send three ARP request packages, can't detect IPv4 address conflicting", zap.Error(err)) + return fmt.Errorf("after failed to send three ARP request packages, can't detect IPv4 address conflicting: %w", err) + } + + for { + packet, _, err := arpClient.Read() + if err == nil { + if packet.Operation != arp.OperationReply || packet.SenderIP.String() != d.ip4.String() { + continue + } + + // found ip conflicting + d.logger.Error("Found IPv4 address conflicting", zap.String("Conflicting IP", d.ip4.String()), zap.String("Host", packet.SenderHardwareAddr.String())) + return fmt.Errorf("%w: pod's interface %s with an conflicting ip %s, %s is located at %s", + constant.ErrIPConflict, d.iface, d.ip4.String(), d.ip4.String(), packet.SenderHardwareAddr.String()) + } + + // no arp response unitil timeout, no ipv4 conflicting + if err.(net.Error).Timeout() { + d.logger.Debug("No ipv4 ipAddress conflicting", zap.String("IPv4 address", d.ip4.String())) + return nil + } + // retry it if is other error + d.logger.Error("failed to receive ARP message, retrying...", zap.Error(err)) + } +} + +func (d *Detector) detectGateway4Reachable(l netlink.Link, arpClient *arp.Client) error { + var err error + arpClient.SetDeadline(time.Now().Add(d.timeout)) + for i := 0; i < d.retries; i++ { + if err = networking.SendARPReuqest(l, d.ip4, d.v4Gw); err == nil { + d.logger.Info("success to send ARP request") + break + } + d.logger.Error("failed to send ARP request, retrying...", zap.Error(err)) + continue + } + + if err != nil { + d.logger.Error("after failed to send three ARP request packages, can't detect gateway reachable", zap.Error(err)) + return fmt.Errorf("after failed to send three ARP request packages, can't detect gateway reachable: %w", err) + } + + // Loop and wait for replies + for { + res, _, err := arpClient.Read() + if err == nil { + if res.Operation != arp.OperationReply || res.SenderIP.String() != d.v4Gw.String() { + continue + } + + d.logger.Sugar().Infof("Gateway %s is reachable, gateway is located at %v", d.v4Gw, res.SenderHardwareAddr.String()) + return nil + } + + if err.(net.Error).Timeout() { + d.logger.Sugar().Errorf("gateway %s is %v, reason: %v", d.v4Gw.String(), constant.ErrGatewayUnreachable, err) + return fmt.Errorf("gateway %s is %v", d.v4Gw.String(), constant.ErrGatewayUnreachable) + } + // retry it if is other error + d.logger.Error("failed to receive ARP message, retrying...", zap.Error(err)) + } +} + +func (d *Detector) detectIP6Conflicting(ifi *net.Interface, ndpClient *ndp.Conn) error { + var err error + ndpClient.SetDeadline(time.Now().Add(d.timeout)) + for i := 0; i < d.retries; i++ { + err = networking.SendUnsolicitedNeighborAdvertisement(d.ip6, ifi, ndpClient) + if err == nil { + d.logger.Info("success to send unsolicited neighbor advertisement") + break + } + d.logger.Error("failed to send unsolicited neighbor advertisement, retrying...", zap.Error(err)) + } + + if err != nil { + d.logger.Error("after failed to send three unsolicited neighbor advertisement packages, can't detect IPv6 address conflicting", zap.Error(err)) + return fmt.Errorf("after failed to send three unsolicited neighbor advertisement packages, can't detect IPv6 address conflicting: %w", err) + } + + for { + msg, _, _, err := ndpClient.ReadFrom() + if err == nil { + na, ok := msg.(*ndp.NeighborAdvertisement) + if !ok || na.TargetAddress.String() != d.ip6.String() || len(na.Options) != 1 { + continue + } + + option, ok := na.Options[0].(*ndp.LinkLayerAddress) + if ok { + d.logger.Error("Found IPv6 address conflicting", zap.String("Conflicting IP", d.ip6.String()), zap.String("Host", option.Addr.String())) + return fmt.Errorf("%w: pod's interface %s with an conflicting ip %s, %s is located at %s", + constant.ErrIPConflict, d.iface, d.ip6.String(), d.ip6.String(), option.Addr.String()) + } + continue + } + + // no ndp response unitil timeout, no ipv6 conflicting + if err.(net.Error).Timeout() { + d.logger.Debug("No ipv6 ipAddress conflicting", zap.String("IPv6 address", d.ip6.String())) + return nil + } + // retry it if is other error + d.logger.Error("failed to receive unsolicited neighbor advertisement message, retrying...", zap.Error(err)) + } +} + +func (d *Detector) detectGateway6Reachable(ifi *net.Interface, ndpClient *ndp.Conn) error { + var err error + ndpClient.SetDeadline(time.Now().Add(d.timeout)) + for i := 0; i < d.retries; i++ { + err = networking.SendUnsolicitedNeighborAdvertisement(d.v6Gw, ifi, ndpClient) + if err == nil { + d.logger.Info("success to send unsolicited neighbor advertisement") + break + } + d.logger.Error("failed to send unsolicited neighbor advertisement, retrying...", zap.Error(err)) + } + + if err != nil { + d.logger.Error("after failed to send three unsolicited neighbor advertisement packages, can't detect IPv6 Gateway if reachable", zap.Error(err)) + return fmt.Errorf("after failed to send three unsolicited neighbor advertisement packages, can't detect IPv6 Gateway if reachable: %w", err) + } + + for { + msg, _, _, err := ndpClient.ReadFrom() + if err == nil { + na, ok := msg.(*ndp.NeighborAdvertisement) + if !ok || na.TargetAddress.String() != d.v6Gw.String() || len(na.Options) != 1 { + continue + } + + option, ok := na.Options[0].(*ndp.LinkLayerAddress) + if ok { + d.logger.Sugar().Infof("gateway %s is reachable, it's located at %s", d.v6Gw.String(), option.Addr.String()) + return nil + } + continue + } + + // no ndp response unitil timeout, indicates gateway unreachable + if err.(net.Error).Timeout() { + d.logger.Sugar().Errorf("gateway %s is %s, reason: %v", d.v6Gw.String(), constant.ErrGatewayUnreachable, err) + return fmt.Errorf("gateway %s is %w", d.v6Gw.String(), constant.ErrGatewayUnreachable) + } + // retry it if is other error + d.logger.Error("failed to receive unsolicited neighbor advertisement message, retrying...", zap.Error(err)) + } +} diff --git a/cmd/spiderpool/cmd/utils.go b/cmd/spiderpool/cmd/utils.go index 5720ace474..515765de10 100644 --- a/cmd/spiderpool/cmd/utils.go +++ b/cmd/spiderpool/cmd/utils.go @@ -1,13 +1,18 @@ -// Copyright 2022 Authors of spidernet-io +// Copyright 2025 Authors of spidernet-io // SPDX-License-Identifier: Apache-2.0 package cmd import ( + "context" "fmt" + "github.com/containernetworking/cni/pkg/skel" "go.uber.org/zap" + agentOpenAPIClient "github.com/spidernet-io/spiderpool/api/v1/agent/client" + "github.com/spidernet-io/spiderpool/api/v1/agent/client/daemonset" + "github.com/spidernet-io/spiderpool/api/v1/agent/models" "github.com/spidernet-io/spiderpool/pkg/logutils" ) @@ -26,3 +31,19 @@ func setupFileLogging(conf *NetConf) (*zap.Logger, error) { conf.IPAM.LogFileMaxCount, ) } + +func deleteIpamIps(spiderpoolAgentAPI *agentOpenAPIClient.SpiderpoolAgentAPI, args *skel.CmdArgs, k8sArgs K8sArgs) error { + _, err := spiderpoolAgentAPI.Daemonset.DeleteIpamIps(daemonset.NewDeleteIpamIpsParams().WithContext(context.TODO()).WithIpamBatchDelArgs( + &models.IpamBatchDelArgs{ + ContainerID: &args.ContainerID, + NetNamespace: args.Netns, + PodName: (*string)(&k8sArgs.K8S_POD_NAME), + PodNamespace: (*string)(&k8sArgs.K8S_POD_NAMESPACE), + PodUID: (*string)(&k8sArgs.K8S_POD_UID), + }, + )) + if err != nil { + return fmt.Errorf("failed to clean up conflict IPs: %v", err) + } + return err +} diff --git a/cmd/spiderpool/main.go b/cmd/spiderpool/main.go index b493042057..a7dd09fa28 100644 --- a/cmd/spiderpool/main.go +++ b/cmd/spiderpool/main.go @@ -4,6 +4,8 @@ package main import ( + "runtime" + "github.com/containernetworking/cni/pkg/skel" cniSpecVersion "github.com/containernetworking/cni/pkg/version" "github.com/spidernet-io/spiderpool/cmd/spiderpool/cmd" @@ -12,6 +14,13 @@ import ( // version means spiderpool released version. var version string +func init() { + // this ensures that main runs only on main thread (thread group leader). + // since namespace ops (unshare, setns) are done for a single thread, we + // must ensure that the goroutine does not jump from OS thread to thread + runtime.LockOSThread() +} + func main() { skel.PluginMain(cmd.CmdAdd, cmdCheck, cmd.CmdDel, cniSpecVersion.PluginSupports(cmd.SupportCNIVersions...), diff --git a/pkg/ipam/allocate.go b/pkg/ipam/allocate.go index b7314cf1dc..f1871252a3 100644 --- a/pkg/ipam/allocate.go +++ b/pkg/ipam/allocate.go @@ -14,6 +14,7 @@ import ( "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" utilerrors "k8s.io/apimachinery/pkg/util/errors" @@ -49,7 +50,7 @@ func (i *ipam) Allocate(ctx context.Context, addArgs *models.IpamAddArgs) (*mode } podTopController, err := i.podManager.GetPodTopController(ctx, pod) - if nil != err { + if err != nil { return nil, fmt.Errorf("failed to get the top controller of the Pod %s/%s: %v", pod.Namespace, pod.Name, err) } logger.Sugar().Debugf("%s %s/%s is the top controller of the Pod", podTopController.Kind, podTopController.Namespace, podTopController.Name) @@ -248,7 +249,12 @@ func (i *ipam) retrieveStaticIPAllocation(ctx context.Context, nic string, pod * return nil, fmt.Errorf("failed to refresh the current IP allocation of %s: %w", endpoint.Status.OwnerControllerType, err) } - ips, routes := convert.ConvertIPDetailsToIPConfigsAndAllRoutes(endpoint.Status.Current.IPs) + enableGatewayDetection, err := i.IsDetectGatewayReachableForKubeVirtPod(ctx, pod) + if err != nil { + return nil, err + } + + ips, routes := convert.ConvertIPDetailsToIPConfigsAndAllRoutes(endpoint.Status.Current.IPs, i.config.EnableIPConflictDetection, enableGatewayDetection) addResp := &models.IpamAddResponse{ Ips: ips, Routes: routes, @@ -338,7 +344,7 @@ func (i *ipam) retrieveExistingIPAllocation(ctx context.Context, uid, nic string } } - ips, routes := convert.ConvertIPDetailsToIPConfigsAndAllRoutes(allocation.IPs) + ips, routes := convert.ConvertIPDetailsToIPConfigsAndAllRoutes(allocation.IPs, i.config.EnableIPConflictDetection, i.config.EnableGatewayDetection) addResp := &models.IpamAddResponse{ Ips: ips, Routes: routes, @@ -819,6 +825,47 @@ func (i *ipam) verifyPoolCandidates(tt ToBeAllocateds) error { return nil } +// IsDetectGatewayReachableForKubeVirtPod disable IP conflict detection for the kubevirt vm live migration pod, +// If we don't do this, it will cause the migration pod never be started. +func (i *ipam) IsDetectGatewayReachableForKubeVirtPod(ctx context.Context, pod *corev1.Pod) (enableGatewayDetection bool, err error) { + if !i.config.EnableIPConflictDetection { + return false, nil + } + + // disable IP conflict detection for the kubevirt vm live migration pod + // return directly if not a kubevirt vm pod + ownerReference := metav1.GetControllerOf(pod) + if ownerReference == nil || !i.config.EnableKubevirtStaticIP || ownerReference.APIVersion != kubevirtv1.SchemeGroupVersion.String() || ownerReference.Kind != constant.KindKubevirtVMI { + return true, nil + } + + logger := logutils.FromContext(ctx) + // the live migration new pod has the annotation "kubevirt.io/migrationJobName" + // we just only cancel IP conflict detection for the live migration new pod. + podAnnos := pod.GetAnnotations() + vmimName, ok := podAnnos[kubevirtv1.MigrationJobNameAnnotation] + if ok { + // kubevirt vm pod corresponding SpiderEndpoint uses kubevirt VM/VMI name + _, err := i.kubevirtManager.GetVMIMByName(ctx, pod.Namespace, vmimName, false) + if err == nil { + // cancel IP conflict detection because there's a moment the old vm pod still running during the vm live migration phase + logger.Sugar().Infof("cancel IP conflict detection for live migration new pod '%s/%s'", pod.Namespace, pod.Name) + return false, nil + } + + if apierrors.IsNotFound(err) { + // if we don't found the kubevirt migrated vm pod, still execute IP conflict detection + logger.Sugar().Warnf("no kubevirt vm pod '%s/%s' corresponding VirtualMachineInstanceMigration '%s/%s' found, still execute IP conflict detection", + pod.Namespace, pod.Name, pod.Namespace, vmimName) + return true, nil + } + + return false, fmt.Errorf("failed to get kubevirt vm pod '%s/%s' corresponding VirtualMachineInstanceMigration '%s/%s', error: %v", + pod.Namespace, pod.Name, pod.Namespace, vmimName, err) + } + return true, nil +} + // sortPoolCandidates would sort IPPool candidates sequence depends on the IPPool multiple affinities. func sortPoolCandidates(preliminary ToBeAllocateds) { for _, toBeAllocate := range preliminary { diff --git a/pkg/ipam/config.go b/pkg/ipam/config.go index 5bd07ca6d8..e89e6f64ff 100644 --- a/pkg/ipam/config.go +++ b/pkg/ipam/config.go @@ -24,6 +24,8 @@ type IPAMConfig struct { EnableStatefulSet bool EnableKubevirtStaticIP bool EnableReleaseConflictIPsForStateless bool + EnableIPConflictDetection bool + EnableGatewayDetection bool OperationRetries int OperationGapDuration time.Duration diff --git a/pkg/ippoolmanager/config.go b/pkg/ippoolmanager/config.go index 56a4a3d139..3a19928b21 100644 --- a/pkg/ippoolmanager/config.go +++ b/pkg/ippoolmanager/config.go @@ -8,8 +8,10 @@ const ( ) type IPPoolManagerConfig struct { - MaxAllocatedIPs *int - EnableKubevirtStaticIP bool + MaxAllocatedIPs *int + EnableKubevirtStaticIP bool + EnableGatewayDetection bool + EnableIPConflictDetection bool } func setDefaultsForIPPoolManagerConfig(config IPPoolManagerConfig) IPPoolManagerConfig { diff --git a/pkg/ippoolmanager/ippool_manager.go b/pkg/ippoolmanager/ippool_manager.go index d63d9997ae..1871199cfd 100644 --- a/pkg/ippoolmanager/ippool_manager.go +++ b/pkg/ippoolmanager/ippool_manager.go @@ -137,6 +137,9 @@ func (im *ipPoolManager) AllocateIP(ctx context.Context, poolName, nic string, p return nil, err } + // TODO: set these values from ippool.spec + ipConfig.EnableGatewayDetection = im.config.EnableGatewayDetection + ipConfig.EnableIPConflictDetection = im.config.EnableIPConflictDetection return ipConfig, nil } diff --git a/pkg/networking/gwconnection/connection.go b/pkg/networking/gwconnection/connection.go deleted file mode 100644 index 4df3970645..0000000000 --- a/pkg/networking/gwconnection/connection.go +++ /dev/null @@ -1,182 +0,0 @@ -// Copyright 2023 Authors of spidernet-io -// SPDX-License-Identifier: Apache-2.0 - -package gwconnection - -import ( - "fmt" - "net" - "net/netip" - "time" - - "go.uber.org/zap" - - types100 "github.com/containernetworking/cni/pkg/types/100" - "github.com/mdlayher/arp" - _ "github.com/mdlayher/ethernet" - "github.com/mdlayher/ndp" - "github.com/spidernet-io/spiderpool/pkg/constant" -) - -type DetectGateway struct { - retries int - iface string - interval time.Duration - timeout time.Duration - v4Addr, v6Addr, V4Gw, V6Gw net.IP - logger *zap.Logger -} - -func New(retries int, interval, timeout, iface string, logger *zap.Logger) (*DetectGateway, error) { - var err error - dg := &DetectGateway{ - retries: retries, - iface: iface, - } - - dg.interval, err = time.ParseDuration(interval) - if err != nil { - return nil, err - } - - dg.timeout, err = time.ParseDuration(timeout) - if err != nil { - return nil, err - } - dg.logger = logger - - return dg, nil -} - -func (dg *DetectGateway) ParseAddrFromPreresult(ipconfigs []*types100.IPConfig) { - for _, ipconfig := range ipconfigs { - if ipconfig.Address.IP.To4() != nil { - dg.v4Addr = ipconfig.Address.IP - } else { - dg.v6Addr = ipconfig.Address.IP - } - } -} - -// PingOverIface sends an arp ping over interface 'iface' to 'dstIP' -func (dg *DetectGateway) ArpingOverIface() error { - ifi, err := net.InterfaceByName(dg.iface) - if err != nil { - return err - } - - client, err := arp.Dial(ifi) - if err != nil { - return err - } - defer client.Close() - - gwNetIP := netip.MustParseAddr(dg.V4Gw.String()) - if err = client.SetDeadline(time.Now().Add(dg.timeout)); err != nil { - dg.logger.Sugar().Errorf("failed to set deadline: %v", err) - return err - } - - for i := 0; i < dg.retries; i++ { - dg.logger.Sugar().Debugf("[Retry: %v]try to send the arp request", i+1) - err := client.Request(gwNetIP) - if err != nil { - dg.logger.Sugar().Errorf("[Retry: %v]failed to send the arp request: %v", i+1, err) - continue - } - - } - - // Loop and wait for replies - for { - res, _, err := client.Read() - if err != nil { - dg.logger.Sugar().Errorf("gateway %s is %v, reason: %v", dg.V4Gw.String(), constant.ErrGatewayUnreachable, err) - return fmt.Errorf("gateway %s is %v", dg.V4Gw.String(), constant.ErrGatewayUnreachable) - } - - if res.Operation != arp.OperationReply || res.SenderIP != gwNetIP { - continue - } - - dg.logger.Sugar().Infof("Gateway %s is reachable, gateway is located at %v", gwNetIP, res.SenderHardwareAddr.String()) - return nil - } -} - -func (dg *DetectGateway) NDPingOverIface() error { - ifi, err := net.InterfaceByName(dg.iface) - if err != nil { - return err - } - - client, _, err := ndp.Listen(ifi, ndp.LinkLocal) - if err != nil { - return err - } - defer client.Close() - - msg := &ndp.NeighborSolicitation{ - TargetAddress: netip.MustParseAddr(dg.V6Gw.String()), - Options: []ndp.Option{ - &ndp.LinkLayerAddress{ - Direction: ndp.Source, - Addr: ifi.HardwareAddr, - }, - }, - } - - var gwHwAddr string - for i := 0; i < dg.retries && gwHwAddr == ""; i++ { - gwHwAddr, err = dg.sendReceive(client, msg) - if err != nil { - dg.logger.Sugar().Errorf("[retry number: %v]error detect if gateway is reachable: %v", i+1, err) - } else if gwHwAddr != "" { - dg.logger.Sugar().Infof("gateway %s is reachable, it is located at %s", dg.V6Gw.String(), gwHwAddr) - return nil - } - } - - dg.logger.Sugar().Errorf("gateway %s is %s, reason: %v", dg.V6Gw.String(), constant.ErrGatewayUnreachable, err) - return fmt.Errorf("gateway %s is %w", dg.V6Gw.String(), constant.ErrGatewayUnreachable) -} - -func (dg *DetectGateway) sendReceive(client *ndp.Conn, m ndp.Message) (string, error) { - gwNetIP := netip.MustParseAddr(dg.V6Gw.String()) - // Always multicast the message to the target's solicited-node multicast - // group as if we have no knowledge of its MAC address. - snm, err := ndp.SolicitedNodeMulticast(gwNetIP) - if err != nil { - dg.logger.Error("[NDP]failed to determine solicited-node multicast address", zap.Error(err)) - return "", fmt.Errorf("failed to determine solicited-node multicast address: %v", err) - } - - if err := client.SetDeadline(time.Now().Add(dg.timeout)); err != nil { - dg.logger.Error("[NDP]failed to set deadline", zap.Error(err)) - return "", fmt.Errorf("failed to set deadline: %v", err) - } - - // we send a gratuitous neighbor solicitation to checking if ip is conflict - err = client.WriteTo(m, nil, snm) - if err != nil { - dg.logger.Error("[NDP]failed to send message", zap.Error(err)) - return "", fmt.Errorf("failed to send message: %v", err) - } - - msg, _, _, err := client.ReadFrom() - if err != nil { - return "", err - } - - gwAddr := netip.MustParseAddr(dg.V6Gw.String()) - na, ok := msg.(*ndp.NeighborAdvertisement) - if ok && na.TargetAddress.Compare(gwAddr) == 0 && len(na.Options) == 1 { - dg.logger.Debug("Detect gateway: found the response", zap.String("TargetAddress", na.TargetAddress.String())) - // found ndp reply what we want - option, ok := na.Options[0].(*ndp.LinkLayerAddress) - if ok { - return option.Addr.String(), nil - } - } - return "", nil -} diff --git a/pkg/networking/ipchecking/ipchecking.go b/pkg/networking/ipchecking/ipchecking.go deleted file mode 100644 index 1ac5c44409..0000000000 --- a/pkg/networking/ipchecking/ipchecking.go +++ /dev/null @@ -1,249 +0,0 @@ -// Copyright 2023 Authors of spidernet-io -// SPDX-License-Identifier: Apache-2.0 - -package ipchecking - -import ( - "errors" - "fmt" - "net" - "net/netip" - "time" - - types100 "github.com/containernetworking/cni/pkg/types/100" - "github.com/containernetworking/plugins/pkg/ns" - "github.com/mdlayher/arp" - "github.com/mdlayher/ethernet" - "github.com/mdlayher/ndp" - "go.uber.org/zap" - - "github.com/spidernet-io/spiderpool/pkg/constant" - "github.com/spidernet-io/spiderpool/pkg/errgroup" -) - -type IPChecker struct { - retries int - interval time.Duration - timeout time.Duration - netns, hostNs ns.NetNS - ip4, ip6 netip.Addr - ifi *net.Interface - arpClient *arp.Client - ndpClient *ndp.Conn - logger *zap.Logger -} - -func NewIPChecker(retries int, interval, timeout string, hostNs, netns ns.NetNS, logger *zap.Logger) (*IPChecker, error) { - var err error - - ipc := new(IPChecker) - ipc.retries = retries - ipc.interval, err = time.ParseDuration(interval) - if err != nil { - return nil, fmt.Errorf("failed to parse interval %v: %v", interval, err) - } - - ipc.timeout, err = time.ParseDuration(timeout) - if err != nil { - return nil, fmt.Errorf("failed to parse timeoute %v: %v", timeout, err) - } - - if err != nil { - return nil, err - } - - ipc.hostNs = hostNs - ipc.netns = netns - ipc.logger = logger - return ipc, nil -} - -func (ipc *IPChecker) DoIPConflictChecking(ipconfigs []*types100.IPConfig, iface string, errg *errgroup.Group) { - ipc.logger.Debug("DoIPConflictChecking", zap.String("interval", ipc.interval.String()), zap.Int("retries", ipc.retries), zap.String("timeout", ipc.timeout.String())) - if len(ipconfigs) == 0 { - ipc.logger.Info("No ips found in pod, ignore pod ip's conflict checking") - return - } - - var err error - _ = ipc.netns.Do(func(netNS ns.NetNS) error { - ipc.ifi, err = net.InterfaceByName(iface) - if err != nil { - return fmt.Errorf("failed to InterfaceByName %s: %w", iface, err) - } - - for idx := range ipconfigs { - target := netip.MustParseAddr(ipconfigs[idx].Address.IP.String()) - if target.Is4() { - ipc.logger.Debug("IPCheckingByARP", zap.String("ipv4 address", target.String())) - ipc.ip4 = target - ipc.arpClient, err = arp.Dial(ipc.ifi) - if err != nil { - return fmt.Errorf("failed to init arp client: %w", err) - } - errg.Go(ipc.hostNs, ipc.netns, ipc.ipCheckingByARP) - } else { - ipc.logger.Debug("IPCheckingByNDP", zap.String("ipv6 address", target.String())) - ipc.ip6 = target - ipc.ndpClient, _, err = ndp.Listen(ipc.ifi, ndp.LinkLocal) - if err != nil { - return fmt.Errorf("failed to init ndp client: %w", err) - } - errg.Go(ipc.hostNs, ipc.netns, ipc.ipCheckingByNDP) - } - } - return nil - }) -} - -func (ipc *IPChecker) ipCheckingByARP() error { - defer ipc.arpClient.Close() - - var err error - for i := 0; i < ipc.retries; i++ { - ipc.logger.Sugar().Debugf("[Retry: %v]try to arping the ip", i+1) - if err = ipc.arpClient.SetDeadline(time.Now().Add(ipc.timeout)); err != nil { - ipc.logger.Error("[ARP]failed to set deadline", zap.Error(err)) - continue - } - - // we send a gratuitous arp to checking if ip is conflict - // we use dad mode(duplicate address detection mode), so - // we set source ip to 0.0.0.0 - packet, err := arp.NewPacket(arp.OperationRequest, ipc.ifi.HardwareAddr, netip.MustParseAddr("0.0.0.0"), ethernet.Broadcast, ipc.ip4) - if err != nil { - return err - } - - err = ipc.arpClient.WriteTo(packet, ethernet.Broadcast) - if err != nil { - ipc.logger.Error("[ARP]failed to send message", zap.Error(err)) - continue - } - - packet, _, err = ipc.arpClient.Read() - if err != nil { - ipc.logger.Error("[ARP]failed to receive message", zap.Error(err)) - continue - } - - if packet.Operation != arp.OperationReply || packet.SenderIP.Compare(ipc.ip4) != 0 { - continue - } - - // found ip conflicting - ipc.logger.Error("Found IPv4 address conflicting", zap.String("Conflicting IP", ipc.ip4.String()), zap.String("Host", packet.SenderHardwareAddr.String())) - return fmt.Errorf("%w: pod's interface %s with an conflicting ip %s, %s is located at %s", - constant.ErrIPConflict, ipc.ifi.Name, ipc.ip4.String(), ipc.ip4.String(), packet.SenderHardwareAddr.String()) - } - - if err != nil { - if neterr, ok := err.(net.Error); ok && !neterr.Timeout() { - return fmt.Errorf("failed to checking ip %s if it's conflicting: %v", ipc.ip4.String(), err) - } - } - - ipc.logger.Debug("No ipv4 address conflict", zap.String("IPv4 address", ipc.ip4.String())) - return nil -} - -var errRetry = errors.New("retry") -var NDPFoundReply error = errors.New("found ndp reply") - -func (ipc *IPChecker) ipCheckingByNDP() error { - var err error - defer ipc.ndpClient.Close() - - m := &ndp.NeighborSolicitation{ - TargetAddress: ipc.ip6, - Options: []ndp.Option{ - &ndp.LinkLayerAddress{ - Direction: ndp.Source, - Addr: ipc.ifi.HardwareAddr, - }, - }, - } - - var replyMac string - replyMac, err = ipc.sendReceiveLoop(m) - if err != nil { - if err.Error() == NDPFoundReply.Error() { - if replyMac != ipc.ifi.HardwareAddr.String() { - ipc.logger.Error("Found IPv6 address conflicting", zap.String("Conflicting IP", ipc.ip6.String()), zap.String("Host", replyMac)) - return fmt.Errorf("%w: pod's interface %s with an conflicting ip %s, %s is located at %s", - constant.ErrIPConflict, ipc.ifi.Name, ipc.ip6.String(), ipc.ip6.String(), replyMac) - } - } - } - - // no ipv6 conflicting - ipc.logger.Debug("No ipv6 address conflicting", zap.String("ipv6 address", ipc.ip6.String())) - return nil -} - -// sendReceiveLoop send ndp message and waiting for receive. -// Copyright Authors of mdlayher/ndp: https://github.com/mdlayher/ndp/ -func (ipc *IPChecker) sendReceiveLoop(msg ndp.Message) (string, error) { - var hwAddr string - var err error - - for i := 0; i < ipc.retries; i++ { - ipc.logger.Sugar().Debugf("[Retry: %v]try to ndping the ip", i+1) - hwAddr, err = ipc.sendReceive(msg) - switch err { - case errRetry: - continue - case nil: - return hwAddr, NDPFoundReply - default: - // Was the error caused by a read timeout, and should the loop continue? - if neterr, ok := err.(net.Error); ok && neterr.Timeout() { - ipc.logger.Error(err.Error()) - continue - } - return "", err - } - } - - return "", nil -} - -// sendReceive send and receive ndp message,return error if error occurred. -// if the returned string isn't empty, it indicates that there are an -// IPv6 address conflict. -// Copyright Authors of mdlayher/ndp: https://github.com/mdlayher/ndp/ -func (ipc *IPChecker) sendReceive(m ndp.Message) (string, error) { - // Always multicast the message to the target's solicited-node multicast - // group as if we have no knowledge of its MAC address. - snm, err := ndp.SolicitedNodeMulticast(ipc.ip6) - if err != nil { - ipc.logger.Error("[NDP]failed to determine solicited-node multicast address", zap.Error(err)) - return "", fmt.Errorf("failed to determine solicited-node multicast address: %v", err) - } - - // we send a gratuitous neighbor solicitation to checking if ip is conflict - err = ipc.ndpClient.WriteTo(m, nil, snm) - if err != nil { - ipc.logger.Error("[NDP]failed to send message", zap.Error(err)) - return "", fmt.Errorf("failed to send message: %v", err) - } - - if err := ipc.ndpClient.SetReadDeadline(time.Now().Add(ipc.timeout)); err != nil { - ipc.logger.Error("[NDP]failed to set deadline", zap.Error(err)) - return "", fmt.Errorf("failed to set deadline: %v", err) - } - - msg, _, _, err := ipc.ndpClient.ReadFrom() - if err == nil { - na, ok := msg.(*ndp.NeighborAdvertisement) - if ok && na.TargetAddress.Compare(ipc.ip6) == 0 && len(na.Options) == 1 { - // found ndp reply what we want - option, ok := na.Options[0].(*ndp.LinkLayerAddress) - if ok { - return option.Addr.String(), nil - } - } - return "", errRetry - } - return "", err -} diff --git a/pkg/networking/networking/packet.go b/pkg/networking/networking/packet.go new file mode 100644 index 0000000000..0c5619dd19 --- /dev/null +++ b/pkg/networking/networking/packet.go @@ -0,0 +1,159 @@ +// Copyright 2025 Authors of spidernet-io +// SPDX-License-Identifier: Apache-2.0 + +// Note: The following source files are come from the latest version of https://github.com/k8snetworkplumbingwg/sriov-cni/blob/master/pkg/utils/packet.go. +// We can't directly go mod import package, it reports error: +// "require github.com/k8snetworkplumbingwg/sriov-cni: version “v2.8.0” invalid: should be v0 or v1, not v2."" + +// So we copied the source files here and made some code changes. + +package networking + +import ( + "bytes" + "encoding/binary" + "fmt" + "golang.org/x/sys/unix" + "net" + "net/netip" + "syscall" + "time" + + "github.com/vishvananda/netlink" + + "github.com/mdlayher/ndp" +) + +var ( + arpPacketName = "ARP" + icmpV6PacketName = "ICMPv6" +) + +// SendARPReuqest sends a gratuitous ARP packet with the provided source IP over the provided interface. +// UPDATE: the golang arp library requires an IPv4 address to exist for the NIC to send ARP request packets. +func SendARPReuqest(l netlink.Link, srcIP, dstIP net.IP) error { + /* As per RFC 5944 section 4.6, a gratuitous ARP packet can be sent by a node in order to spontaneously cause other nodes to update + * an entry in their ARP cache. In the case of SRIOV-CNI, an address can be reused for different pods. Each pod could likely have a + * different link-layer address in this scenario, which makes the ARP cache entries residing in the other nodes to be an invalid. + * The gratuitous ARP packet should update the link-layer address accordingly for the invalid ARP cache. + */ + + // Construct the ARP packet following RFC 5944 section 4.6. + arpPacket := new(bytes.Buffer) + if writeErr := binary.Write(arpPacket, binary.BigEndian, uint16(1)); writeErr != nil { // Hardware Type: 1 is Ethernet + return formatPacketFieldWriteError("Hardware Type", arpPacketName, writeErr) + } + if writeErr := binary.Write(arpPacket, binary.BigEndian, uint16(syscall.ETH_P_IP)); writeErr != nil { // Protocol Type: 0x0800 is IPv4 + return formatPacketFieldWriteError("Protocol Type", arpPacketName, writeErr) + } + if writeErr := binary.Write(arpPacket, binary.BigEndian, uint8(6)); writeErr != nil { // Hardware address Length: 6 bytes for MAC address + return formatPacketFieldWriteError("Hardware address Length", arpPacketName, writeErr) + } + if writeErr := binary.Write(arpPacket, binary.BigEndian, uint8(4)); writeErr != nil { // Protocol address length: 4 bytes for IPv4 address + return formatPacketFieldWriteError("Protocol address length", arpPacketName, writeErr) + } + if writeErr := binary.Write(arpPacket, binary.BigEndian, uint16(1)); writeErr != nil { // Operation: 1 is request, 2 is response + return formatPacketFieldWriteError("Operation", arpPacketName, writeErr) + } + if _, writeErr := arpPacket.Write(l.Attrs().HardwareAddr); writeErr != nil { // Sender hardware address + return formatPacketFieldWriteError("Sender hardware address", arpPacketName, writeErr) + } + if _, writeErr := arpPacket.Write(srcIP); writeErr != nil { // Sender protocol address + return formatPacketFieldWriteError("Sender protocol address", arpPacketName, writeErr) + } + if _, writeErr := arpPacket.Write([]byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff}); writeErr != nil { // Target hardware address is the Broadcast MAC. + return formatPacketFieldWriteError("Target hardware address", arpPacketName, writeErr) + } + if _, writeErr := arpPacket.Write(dstIP); writeErr != nil { // Target protocol address + return formatPacketFieldWriteError("Target protocol address", arpPacketName, writeErr) + } + + sockAddr := syscall.SockaddrLinklayer{ + Protocol: htons(syscall.ETH_P_ARP), // Ethertype of ARP (0x0806) + Ifindex: l.Attrs().Index, // Interface Index + Hatype: 1, // Hardware Type: 1 is Ethernet + Pkttype: 0, // Packet Type. + Halen: 6, // Hardware address Length: 6 bytes for MAC address + Addr: [8]byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}, // Address is the broadcast MAC address. + } + + // Create a socket such that the Ethernet header would constructed by the OS. The arpPacket only contains the ARP payload. + soc, err := syscall.Socket(syscall.AF_PACKET, syscall.SOCK_DGRAM, int(htons(syscall.ETH_P_ARP))) + if err != nil { + return fmt.Errorf("failed to create AF_PACKET datagram socket: %v", err) + } + defer syscall.Close(soc) + + if err := syscall.Sendto(soc, arpPacket.Bytes(), 0, &sockAddr); err != nil { + return fmt.Errorf("failed to send ARP request for IPv4 %s on Interface %s: %v", srcIP.String(), l.Attrs().Name, err) + } + + return nil +} + +func SendUnsolicitedNeighborAdvertisement(dstIP net.IP, ifi *net.Interface, ndpClient *ndp.Conn) error { + nDstIP := netip.MustParseAddr(dstIP.String()) + m := &ndp.NeighborSolicitation{ + TargetAddress: nDstIP, + Options: []ndp.Option{ + &ndp.LinkLayerAddress{ + Direction: ndp.Source, + Addr: ifi.HardwareAddr, + }, + }, + } + + // Always multicast the message to the target's solicited-node multicast + // group as if we have no knowledge of its MAC address. + snm, err := ndp.SolicitedNodeMulticast(nDstIP) + if err != nil { + return err + } + + // we send a gratuitous neighbor solicitation to checking if ip is conflict + err = ndpClient.WriteTo(m, nil, snm) + if err != nil { + return fmt.Errorf("failed to send ndp message: %v", err) + } + + return nil +} + +// Blocking wait for interface ifName to have carrier (!NO_CARRIER flag). +func WaitForCarrier(l netlink.Link, waitTime time.Duration) bool { + var nextSleepDuration time.Duration + + start := time.Now() + + for nextSleepDuration == 0 || time.Since(start) < waitTime { + if nextSleepDuration == 0 { + nextSleepDuration = 2 * time.Millisecond + } else { + time.Sleep(nextSleepDuration) + /* Grow wait time exponentionally (factor 1.5). */ + nextSleepDuration += nextSleepDuration / 2 + } + + /* Wait for carrier, i.e. IFF_UP|IFF_RUNNING. Note that there is also + * IFF_LOWER_UP, but we follow iproute2 ([1]). + * + * [1] https://git.kernel.org/pub/scm/network/iproute2/iproute2.git/tree/ip/ipaddress.c?id=f9601b10c21145f76c3d46c163bac39515ed2061#n86 + */ + + if l.Attrs().RawFlags&(unix.IFF_UP|unix.IFF_RUNNING) == (unix.IFF_UP | unix.IFF_RUNNING) { + return true + } + } + + return false +} + +// htons converts an uint16 from host to network byte order. +func htons(i uint16) uint16 { + return (i<<8)&0xff00 | i>>8 +} + +// formatPacketFieldWriteError builds an error string for the cases when writing to a field of a packet fails. +func formatPacketFieldWriteError(field string, packetType string, writeErr error) error { + return fmt.Errorf("failed to write the %s field in the %s packet: %v", field, packetType, writeErr) +} diff --git a/pkg/types/k8s.go b/pkg/types/k8s.go index f034de56ba..67b57856c7 100644 --- a/pkg/types/k8s.go +++ b/pkg/types/k8s.go @@ -117,6 +117,8 @@ type SpiderpoolConfigmapConfig struct { EnableKubevirtStaticIP bool `yaml:"enableKubevirtStaticIP"` EnableSpiderSubnet bool `yaml:"enableSpiderSubnet"` EnableAutoPoolForApplication bool `yaml:"enableAutoPoolForApplication"` + EnableIPConflictDetection bool `yaml:"enableIPConflictDetection"` + EnableGatewayDetection bool `yaml:"enableGatewayDetection"` ClusterSubnetAutoPoolDefaultRedundantIPNumber int `yaml:"clusterSubnetAutoPoolDefaultRedundantIPNumber"` PodResourceInjectConfig PodResourceInjectConfig `yaml:"podResourceInject"` } diff --git a/pkg/utils/convert/convert.go b/pkg/utils/convert/convert.go index ad565437e2..8480af685a 100644 --- a/pkg/utils/convert/convert.go +++ b/pkg/utils/convert/convert.go @@ -20,7 +20,7 @@ import ( "github.com/spidernet-io/spiderpool/pkg/types" ) -func ConvertIPDetailsToIPConfigsAndAllRoutes(details []spiderpoolv2beta1.IPAllocationDetail) ([]*models.IPConfig, []*models.Route) { +func ConvertIPDetailsToIPConfigsAndAllRoutes(details []spiderpoolv2beta1.IPAllocationDetail, enableIPConflictDetection, enableGatewayDetection bool) ([]*models.IPConfig, []*models.Route) { var ips []*models.IPConfig var routes []*models.Route for _, d := range details { @@ -36,12 +36,14 @@ func ConvertIPDetailsToIPConfigsAndAllRoutes(details []spiderpoolv2beta1.IPAlloc } } ips = append(ips, &models.IPConfig{ - Address: d.IPv4, - Gateway: ipv4Gateway, - IPPool: *d.IPv4Pool, - Nic: &nic, - Version: &version, - Vlan: *d.Vlan, + Address: d.IPv4, + Gateway: ipv4Gateway, + IPPool: *d.IPv4Pool, + Nic: &nic, + Version: &version, + Vlan: *d.Vlan, + EnableGatewayDetection: enableGatewayDetection, + EnableIPConflictDetection: enableIPConflictDetection, }) } diff --git a/test/Makefile b/test/Makefile index 4c3e405e49..50f30d39c6 100644 --- a/test/Makefile +++ b/test/Makefile @@ -280,6 +280,10 @@ setup_spiderpool: else \ HELM_OPTION+=" --set ipam.spiderSubnet.enable=false " ; \ fi ; \ + if [ "$(E2E_SPIDERPOOL_ENABLE_IPAM_DETECTION)" == "true" ] ; then \ + HELM_OPTION+=" --set ipam.enableIPConflictDetection=true " ; \ + HELM_OPTION+=" --set ipam.enableGatewayDetection=true " ; \ + fi ; \ if [ "$(INSTALL_SRIOV)" == "true" ] ; then \ HELM_OPTION+=" --set sriov.install=true " ; \ else \ diff --git a/test/Makefile.defs b/test/Makefile.defs index fb26baa785..1a655a90ec 100644 --- a/test/Makefile.defs +++ b/test/Makefile.defs @@ -86,6 +86,7 @@ E2E_SPIDERPOOL_ENABLE_SUBNET ?= true E2E_SPIDERPOOL_ENABLE_COORDINATOR ?= true E2E_SPIDERPOOL_ENABLE_MULTUSCONFIG ?= true E2E_SPIDERPOOL_ENABLE_DRA ?= false +E2E_SPIDERPOOL_ENABLE_IPAM_DETECTION ?= false E2E_SPIDERPOOL_DRA_SOLIBRARY_PATH ?= /usr/lib/libtest.so E2E_HELM_ADDITIONAL_OPTIONS ?=