diff --git a/deploy/operator.yaml b/deploy/operator.yaml index 329d93e3937..441d0c1990e 100644 --- a/deploy/operator.yaml +++ b/deploy/operator.yaml @@ -27,7 +27,6 @@ rules: - "" resources: - nodes - - endpoints verbs: - get - list @@ -283,6 +282,30 @@ rules: - patch - update - delete +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - "" + resources: + - endpoints + verbs: + - create + - delete + - get + - list + - patch + - update + - watch --- apiVersion: v1 diff --git a/deploy/operator/00_clusterrole_def.yaml b/deploy/operator/00_clusterrole_def.yaml index d4a0c2c0ed9..9a48ebcbb04 100644 --- a/deploy/operator/00_clusterrole_def.yaml +++ b/deploy/operator/00_clusterrole_def.yaml @@ -17,7 +17,6 @@ rules: - "" resources: - nodes - - endpoints verbs: - get - list @@ -273,3 +272,27 @@ rules: - patch - update - delete +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - "" + resources: + - endpoints + verbs: + - create + - delete + - get + - list + - patch + - update + - watch diff --git a/helm/scylla-operator/templates/clusterrole_def.yaml b/helm/scylla-operator/templates/clusterrole_def.yaml index d4a0c2c0ed9..9a48ebcbb04 100644 --- a/helm/scylla-operator/templates/clusterrole_def.yaml +++ b/helm/scylla-operator/templates/clusterrole_def.yaml @@ -17,7 +17,6 @@ rules: - "" resources: - nodes - - endpoints verbs: - get - list @@ -273,3 +272,27 @@ rules: - patch - update - delete +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - "" + resources: + - endpoints + verbs: + - create + - delete + - get + - list + - patch + - update + - watch diff --git a/pkg/cmd/operator/operator.go b/pkg/cmd/operator/operator.go index f991db4b9d7..1dcb5d9e3ed 100644 --- a/pkg/cmd/operator/operator.go +++ b/pkg/cmd/operator/operator.go @@ -267,6 +267,8 @@ func (o *OperatorOptions) run(ctx context.Context, streams genericclioptions.IOS kubeInformers.Policy().V1().PodDisruptionBudgets(), kubeInformers.Networking().V1().Ingresses(), kubeInformers.Batch().V1().Jobs(), + kubeInformers.Discovery().V1().EndpointSlices(), + kubeInformers.Core().V1().Endpoints(), scyllaInformers.Scylla().V1().ScyllaClusters(), o.OperatorImage, o.CQLSIngressPort, diff --git a/pkg/controller/scyllacluster/conditions.go b/pkg/controller/scyllacluster/conditions.go index f47a284d26d..a5814797a2a 100644 --- a/pkg/controller/scyllacluster/conditions.go +++ b/pkg/controller/scyllacluster/conditions.go @@ -22,4 +22,8 @@ const ( jobControllerDegradedCondition = "JobControllerDegraded" configControllerProgressingCondition = "ConfigControllerProgressing" configControllerDegradedCondition = "ConfigControllerDegraded" + endpointSliceControllerProgressingCondition = "EndpointSliceControllerProgressing" + endpointSliceControllerDegradedCondition = "EndpointSliceControllerDegraded" + endpointsControllerProgressingCondition = "EndpointsControllerProgressing" + endpointsControllerDegradedCondition = "EndpointsControllerDegraded" ) diff --git a/pkg/controller/scyllacluster/controller.go b/pkg/controller/scyllacluster/controller.go index b84c1b4de8b..6a36f7fe493 100644 --- a/pkg/controller/scyllacluster/controller.go +++ b/pkg/controller/scyllacluster/controller.go @@ -17,6 +17,7 @@ import ( appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" networkingv1 "k8s.io/api/networking/v1" policyv1 "k8s.io/api/policy/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -29,6 +30,7 @@ import ( appsv1informers "k8s.io/client-go/informers/apps/v1" batchv1informers "k8s.io/client-go/informers/batch/v1" corev1informers "k8s.io/client-go/informers/core/v1" + discoveryv1informers "k8s.io/client-go/informers/discovery/v1" networkingv1informers "k8s.io/client-go/informers/networking/v1" policyv1informers "k8s.io/client-go/informers/policy/v1" rbacv1informers "k8s.io/client-go/informers/rbac/v1" @@ -37,6 +39,7 @@ import ( appsv1listers "k8s.io/client-go/listers/apps/v1" batchv1listers "k8s.io/client-go/listers/batch/v1" corev1listers "k8s.io/client-go/listers/core/v1" + discoveryv1listers "k8s.io/client-go/listers/discovery/v1" networkingv1listers "k8s.io/client-go/listers/networking/v1" policyv1listers "k8s.io/client-go/listers/policy/v1" rbacv1listers "k8s.io/client-go/listers/rbac/v1" @@ -76,6 +79,8 @@ type Controller struct { ingressLister networkingv1listers.IngressLister scyllaLister scyllav1listers.ScyllaClusterLister jobLister batchv1listers.JobLister + endpointSliceLister discoveryv1listers.EndpointSliceLister + endpointsLister corev1listers.EndpointsLister cachesToSync []cache.InformerSynced @@ -100,6 +105,8 @@ func NewController( pdbInformer policyv1informers.PodDisruptionBudgetInformer, ingressInformer networkingv1informers.IngressInformer, jobInformer batchv1informers.JobInformer, + endpointSliceInformer discoveryv1informers.EndpointSliceInformer, + endpointsInformer corev1informers.EndpointsInformer, scyllaClusterInformer scyllav1informers.ScyllaClusterInformer, operatorImage string, cqlsIngressPort int, @@ -127,6 +134,8 @@ func NewController( ingressLister: ingressInformer.Lister(), scyllaLister: scyllaClusterInformer.Lister(), jobLister: jobInformer.Lister(), + endpointSliceLister: endpointSliceInformer.Lister(), + endpointsLister: endpointsInformer.Lister(), cachesToSync: []cache.InformerSynced{ podInformer.Informer().HasSynced, @@ -140,6 +149,8 @@ func NewController( ingressInformer.Informer().HasSynced, scyllaClusterInformer.Informer().HasSynced, jobInformer.Informer().HasSynced, + endpointSliceInformer.Informer().HasSynced, + endpointsInformer.Informer().HasSynced, }, eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "scyllacluster-controller"}), @@ -235,6 +246,18 @@ func NewController( DeleteFunc: scc.deleteJob, }) + endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: scc.addEndpointSlice, + UpdateFunc: scc.updateEndpointSlice, + DeleteFunc: scc.deleteEndpointSlice, + }) + + endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: scc.addEndpoints, + UpdateFunc: scc.updateEndpoints, + DeleteFunc: scc.deleteEndpoints, + }) + return scc, nil } @@ -625,3 +648,49 @@ func (scc *Controller) deleteJob(obj interface{}) { scc.handlers.EnqueueOwner, ) } + +func (scc *Controller) addEndpointSlice(obj interface{}) { + scc.handlers.HandleAdd( + obj.(*discoveryv1.EndpointSlice), + scc.handlers.EnqueueOwner, + ) +} + +func (scc *Controller) updateEndpointSlice(old, cur interface{}) { + scc.handlers.HandleUpdate( + old.(*discoveryv1.EndpointSlice), + cur.(*discoveryv1.EndpointSlice), + scc.handlers.EnqueueOwner, + scc.deleteEndpointSlice, + ) +} + +func (scc *Controller) deleteEndpointSlice(obj interface{}) { + scc.handlers.HandleDelete( + obj, + scc.handlers.EnqueueOwner, + ) +} + +func (scc *Controller) addEndpoints(obj interface{}) { + scc.handlers.HandleAdd( + obj.(*corev1.Endpoints), + scc.handlers.EnqueueOwner, + ) +} + +func (scc *Controller) updateEndpoints(old, cur interface{}) { + scc.handlers.HandleUpdate( + old.(*corev1.Endpoints), + cur.(*corev1.Endpoints), + scc.handlers.EnqueueOwner, + scc.deleteEndpoints, + ) +} + +func (scc *Controller) deleteEndpoints(obj interface{}) { + scc.handlers.HandleDelete( + obj, + scc.handlers.EnqueueOwner, + ) +} diff --git a/pkg/controller/scyllacluster/resource.go b/pkg/controller/scyllacluster/resource.go index acfbbfe0b5a..5b18864cccc 100644 --- a/pkg/controller/scyllacluster/resource.go +++ b/pkg/controller/scyllacluster/resource.go @@ -10,22 +10,29 @@ import ( scylladbassets "github.com/scylladb/scylla-operator/assets/scylladb" scyllav1 "github.com/scylladb/scylla-operator/pkg/api/scylla/v1" + "github.com/scylladb/scylla-operator/pkg/controllerhelpers" "github.com/scylladb/scylla-operator/pkg/features" "github.com/scylladb/scylla-operator/pkg/helpers" "github.com/scylladb/scylla-operator/pkg/helpers/slices" "github.com/scylladb/scylla-operator/pkg/naming" "github.com/scylladb/scylla-operator/pkg/pointer" + "github.com/scylladb/scylla-operator/pkg/util/hash" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" networkingv1 "k8s.io/api/networking/v1" policyv1 "k8s.io/api/policy/v1" rbacv1 "k8s.io/api/rbac/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/rand" utilfeature "k8s.io/apiserver/pkg/util/feature" + corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/klog/v2" + utilnet "k8s.io/utils/net" ) const ( @@ -146,10 +153,11 @@ func MemberService(sc *scyllav1.ScyllaCluster, rackName, name string, oldService Annotations: svcAnnotations, }, Spec: corev1.ServiceSpec{ - Type: corev1.ServiceTypeClusterIP, - Selector: naming.StatefulSetPodLabel(name), - Ports: servicePorts(sc), - PublishNotReadyAddresses: true, + Type: corev1.ServiceTypeClusterIP, + Ports: servicePorts(sc), + // Disable Kubernetes Endpoints reconciliation by setting `nil` Selector. + // Operator reconciles Endpoints of these Services. Check sync loop for EndpointSlice/Endpoints for more details. + Selector: nil, }, } @@ -181,11 +189,11 @@ func MemberService(sc *scyllav1.ScyllaCluster, rackName, name string, oldService func servicePorts(cluster *scyllav1.ScyllaCluster) []corev1.ServicePort { ports := []corev1.ServicePort{ { - Name: "inter-node-communication", + Name: "inter-node", Port: 7000, }, { - Name: "ssl-inter-node-communication", + Name: "ssl-inter-node", Port: 7001, }, { @@ -209,7 +217,7 @@ func servicePorts(cluster *scyllav1.ScyllaCluster) []corev1.ServicePort { Port: 7199, }, { - Name: "agent-api", + Name: "agent-rest-api", Port: 10001, }, { @@ -539,7 +547,7 @@ func StatefulSetForRack(r scyllav1.RackSpec, c *scyllav1.ScyllaCluster, existing Name: naming.ScyllaContainerName, Image: ImageForCluster(c), ImagePullPolicy: corev1.PullIfNotPresent, - Ports: containerPorts(c), + Ports: scyllaContainerPorts(c), // TODO: unprivileged entrypoint Command: func() []string { cmd := []string{ @@ -773,6 +781,60 @@ func StatefulSetForRack(r scyllav1.RackSpec, c *scyllav1.ScyllaCluster, existing }, }, }, + { + Name: naming.InterNodeTrafficProbeContainerName, + Image: sidecarImage, + ImagePullPolicy: corev1.PullIfNotPresent, + Ports: []corev1.ContainerPort{ + { + Name: "inter-node", + ContainerPort: naming.StoragePort, + }, + { + Name: "ssl-inter-node", + ContainerPort: 7001, + }, + }, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("10m"), + corev1.ResourceMemory: resource.MustParse("40Mi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("10m"), + corev1.ResourceMemory: resource.MustParse("40Mi"), + }, + }, + Command: []string{ + "/usr/bin/bash", + "-euExo", + "pipefail", + "-O", + "inherit_errexit", + "-c", + }, + Args: []string{ + ` +function handle-exit { + echo "Shutting down" +} + +trap handle-exit EXIT + +sleep infinity +`, + }, + ReadinessProbe: &corev1.Probe{ + TimeoutSeconds: int32(30), + FailureThreshold: int32(1), + PeriodSeconds: int32(5), + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.FromInt32(naming.StoragePort), + }, + }, + }, + }, }, ServiceAccountName: naming.MemberServiceAccountNameForScyllaCluster(c.Name), Affinity: &corev1.Affinity{ @@ -840,16 +902,8 @@ func StatefulSetForRack(r scyllav1.RackSpec, c *scyllav1.ScyllaCluster, existing return sts, nil } -func containerPorts(c *scyllav1.ScyllaCluster) []corev1.ContainerPort { +func scyllaContainerPorts(c *scyllav1.ScyllaCluster) []corev1.ContainerPort { ports := []corev1.ContainerPort{ - { - Name: "intra-node", - ContainerPort: 7000, - }, - { - Name: "tls-intra-node", - ContainerPort: 7001, - }, { Name: "cql", ContainerPort: 9042, @@ -858,6 +912,14 @@ func containerPorts(c *scyllav1.ScyllaCluster) []corev1.ContainerPort { Name: "cql-ssl", ContainerPort: 9142, }, + { + Name: "cql-sa", + ContainerPort: 19042, + }, + { + Name: "cql-ssl-sa", + ContainerPort: 19142, + }, { Name: "jmx", ContainerPort: 7199, @@ -946,6 +1008,10 @@ func agentContainer(r scyllav1.RackSpec, c *scyllav1.ScyllaCluster) corev1.Conta Name: "agent-rest-api", ContainerPort: 10001, }, + { + Name: "agent-metrics", + ContainerPort: 5090, + }, }, VolumeMounts: []corev1.VolumeMount{ { @@ -1440,3 +1506,199 @@ func MakeManagedScyllaDBConfig(sc *scyllav1.ScyllaCluster) (*corev1.ConfigMap, e return cm, nil } + +func MakeEndpointSlices(sc *scyllav1.ScyllaCluster, services map[string]*corev1.Service, podLister corev1listers.PodLister) ([]*discoveryv1.EndpointSlice, error) { + var endpointSlices []*discoveryv1.EndpointSlice + + for _, svc := range services { + if svc.Labels[naming.ScyllaServiceTypeLabel] != string(naming.ScyllaServiceTypeMember) { + continue + } + + if svc.Spec.Selector != nil { + return endpointSlices, fmt.Errorf("member service %q has unexpected non-nil Selector %v", naming.ObjRef(svc), svc.Spec.Selector) + } + + pod, err := podLister.Pods(sc.Namespace).Get(svc.Name) + if err != nil { + if apierrors.IsNotFound(err) { + klog.V(4).InfoS("Pod behind %q Member Service doesn't exists yet", naming.ObjRef(svc)) + continue + } + return endpointSlices, fmt.Errorf("can't get Pod %q: %w", naming.ManualRef(sc.Namespace, svc.Name), err) + } + + // Don't publish endpoints for Pods that are being deleted. + // Removing endpoints prevents new connections being established while still allowing + // for existing connections to survive and finish their requests. + // We need to do this early, as a gap between when Pod is actually deleted and Endpoint + // is reconciled into forwarding rules, may cause connection to get stuck on SYN. + // A stale tuple entry prevents retransmissions from timing out early and leading + // to the next reconnection attempt (#1077). + if pod.DeletionTimestamp != nil { + continue + } + + endpointSliceLabels := map[string]string{} + maps.Copy(endpointSliceLabels, svc.Labels) + endpointSliceLabels[discoveryv1.LabelManagedBy] = naming.OperatorAppNameWithDomain + endpointSliceLabels[discoveryv1.LabelServiceName] = svc.Name + + endpointSliceAnnotations := map[string]string{} + maps.Copy(endpointSliceAnnotations, svc.Annotations) + + podReference := &corev1.ObjectReference{ + Kind: "Pod", + Namespace: pod.Namespace, + Name: pod.Name, + UID: pod.UID, + } + + containerPortMap, podPorts, err := mapServicePortsToReadinessResponsibleContainer(svc, pod.Spec.Containers) + if err != nil { + return endpointSlices, fmt.Errorf("can't map service ports to readiness responsible ") + } + + type endpointPortsConditions struct { + Conditions discoveryv1.EndpointConditions + Ports []discoveryv1.EndpointPort + } + + epPortsConditions := make(map[string]*endpointPortsConditions) + + for containerName, ports := range containerPortMap { + containerPortsHash, err := endpointPortsHash(containerName, ports) + if err != nil { + return endpointSlices, fmt.Errorf("can't hash container ports: %w", err) + } + + var ready, serving bool + + containerStatus, _, ok := slices.Find(pod.Status.ContainerStatuses, func(status corev1.ContainerStatus) bool { + return status.Name == containerName + }) + if ok { + ready = containerStatus.Ready && pod.DeletionTimestamp == nil + serving = containerStatus.Ready + } + + epPortsConditions[containerPortsHash] = &endpointPortsConditions{ + Conditions: discoveryv1.EndpointConditions{ + Ready: pointer.Ptr(ready), + Serving: pointer.Ptr(serving), + Terminating: pointer.Ptr(pod.DeletionTimestamp != nil), + }, + Ports: ports, + } + } + + if len(podPorts) != 0 { + podPortsHash, err := endpointPortsHash("", podPorts) + if err != nil { + return endpointSlices, fmt.Errorf("can't hash pod ports: %w", err) + } + + epPortsConditions[podPortsHash] = &endpointPortsConditions{ + Conditions: discoveryv1.EndpointConditions{ + Ready: pointer.Ptr(controllerhelpers.IsPodReady(pod) && pod.DeletionTimestamp == nil), + Serving: pointer.Ptr(controllerhelpers.IsPodReady(pod)), + Terminating: pointer.Ptr(pod.DeletionTimestamp != nil), + }, + Ports: podPorts, + } + } + + for epHash, epc := range epPortsConditions { + addressType := discoveryv1.AddressTypeIPv4 + + if utilnet.IsIPv6String(pod.Status.PodIP) { + addressType = discoveryv1.AddressTypeIPv6 + } + + endpointSlices = append(endpointSlices, &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", svc.Name, epHash), + Namespace: sc.Namespace, + Labels: endpointSliceLabels, + Annotations: endpointSliceAnnotations, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(sc, scyllaClusterControllerGVK), + }, + }, + AddressType: addressType, + Ports: epc.Ports, + Endpoints: func() []discoveryv1.Endpoint { + if len(pod.Status.PodIP) != 0 { + ep := discoveryv1.Endpoint{ + Addresses: []string{pod.Status.PodIP}, + Conditions: epc.Conditions, + TargetRef: podReference, + Hostname: func() *string { + if shouldSetHostname(pod, svc) { + return pointer.Ptr(pod.Spec.Hostname) + } + return nil + }(), + } + + if len(pod.Spec.NodeName) != 0 { + ep.NodeName = pointer.Ptr(pod.Spec.NodeName) + } + + return []discoveryv1.Endpoint{ep} + } + return nil + }(), + }) + } + } + + sort.Slice(endpointSlices, func(i, j int) bool { + return endpointSlices[i].Name < endpointSlices[j].Name + }) + + return endpointSlices, nil +} + +// shouldSetHostname returns true if the Hostname attribute should be set on an +// Endpoints Address or EndpointSlice Endpoint. +func shouldSetHostname(pod *corev1.Pod, svc *corev1.Service) bool { + return len(pod.Spec.Hostname) > 0 && pod.Spec.Subdomain == svc.Name && svc.Namespace == pod.Namespace +} + +func endpointPortsHash(name string, ports []discoveryv1.EndpointPort) (string, error) { + h, err := hash.HashObjectsShort(name, ports) + if err != nil { + return h, err + } + + return rand.SafeEncodeString(h), nil +} + +func mapServicePortsToReadinessResponsibleContainer(svc *corev1.Service, containers []corev1.Container) (map[string][]discoveryv1.EndpointPort, []discoveryv1.EndpointPort, error) { + var podPorts []discoveryv1.EndpointPort + containerPortMap := map[string][]discoveryv1.EndpointPort{} + + for _, servicePort := range svc.Spec.Ports { + ep := discoveryv1.EndpointPort{ + Name: pointer.Ptr(servicePort.Name), + Port: pointer.Ptr(servicePort.Port), + Protocol: pointer.Ptr(servicePort.Protocol), + AppProtocol: servicePort.AppProtocol, + } + + containerServingPort, ok, err := controllerhelpers.FindContainerServingPort(servicePort, containers) + if err != nil { + return containerPortMap, podPorts, fmt.Errorf("can't find container serving port: %w", err) + } + + // Container serves given Service port and decides about readiness + if ok && containerServingPort.ReadinessProbe != nil { + containerPortMap[containerServingPort.Name] = append(containerPortMap[containerServingPort.Name], ep) + } else { + podPorts = append(podPorts, ep) + } + } + + return containerPortMap, podPorts, nil +} diff --git a/pkg/controller/scyllacluster/resource_test.go b/pkg/controller/scyllacluster/resource_test.go index fcf5d4ca7a9..46997ebd11b 100644 --- a/pkg/controller/scyllacluster/resource_test.go +++ b/pkg/controller/scyllacluster/resource_test.go @@ -14,12 +14,15 @@ import ( appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" networkingv1 "k8s.io/api/networking/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" utilfeature "k8s.io/apiserver/pkg/util/feature" + corev1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" featuregatetesting "k8s.io/component-base/featuregate/testing" ) @@ -56,9 +59,6 @@ func TestMemberService(t *testing.T) { } basicRackName := "rack" basicSVCName := "member" - basicSVCSelector := map[string]string{ - "statefulset.kubernetes.io/pod-name": "member", - } basicSVCLabels := func() map[string]string { return map[string]string{ "app": "scylla", @@ -78,11 +78,11 @@ func TestMemberService(t *testing.T) { } basicPorts := []corev1.ServicePort{ { - Name: "inter-node-communication", + Name: "inter-node", Port: 7000, }, { - Name: "ssl-inter-node-communication", + Name: "ssl-inter-node", Port: 7001, }, { @@ -106,7 +106,7 @@ func TestMemberService(t *testing.T) { Port: 7199, }, { - Name: "agent-api", + Name: "agent-rest-api", Port: 10001, }, { @@ -152,8 +152,8 @@ func TestMemberService(t *testing.T) { }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, - Selector: basicSVCSelector, - PublishNotReadyAddresses: true, + Selector: nil, + PublishNotReadyAddresses: false, Ports: basicPorts, }, }, @@ -186,8 +186,8 @@ func TestMemberService(t *testing.T) { }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, - Selector: basicSVCSelector, - PublishNotReadyAddresses: true, + Selector: nil, + PublishNotReadyAddresses: false, Ports: basicPorts, }, }, @@ -226,8 +226,8 @@ func TestMemberService(t *testing.T) { }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, - Selector: basicSVCSelector, - PublishNotReadyAddresses: true, + Selector: nil, + PublishNotReadyAddresses: false, Ports: basicPorts, }, }, @@ -256,8 +256,8 @@ func TestMemberService(t *testing.T) { }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, - Selector: basicSVCSelector, - PublishNotReadyAddresses: true, + Selector: nil, + PublishNotReadyAddresses: false, Ports: basicPorts, }, }, @@ -292,8 +292,8 @@ func TestMemberService(t *testing.T) { }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, - Selector: basicSVCSelector, - PublishNotReadyAddresses: true, + Selector: nil, + PublishNotReadyAddresses: false, Ports: basicPorts, }, }, @@ -318,8 +318,8 @@ func TestMemberService(t *testing.T) { }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, - Selector: basicSVCSelector, - PublishNotReadyAddresses: true, + Selector: nil, + PublishNotReadyAddresses: false, Ports: basicPorts, }, }, @@ -346,8 +346,8 @@ func TestMemberService(t *testing.T) { }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, - Selector: basicSVCSelector, - PublishNotReadyAddresses: true, + Selector: nil, + PublishNotReadyAddresses: false, Ports: basicPorts, }, }, @@ -377,8 +377,8 @@ func TestMemberService(t *testing.T) { }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, - Selector: basicSVCSelector, - PublishNotReadyAddresses: true, + Selector: nil, + PublishNotReadyAddresses: false, Ports: basicPorts, }, }, @@ -414,8 +414,8 @@ func TestMemberService(t *testing.T) { }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, - Selector: basicSVCSelector, - PublishNotReadyAddresses: true, + Selector: nil, + PublishNotReadyAddresses: false, Ports: basicPorts, }, }, @@ -468,8 +468,8 @@ func TestMemberService(t *testing.T) { }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeLoadBalancer, - Selector: basicSVCSelector, - PublishNotReadyAddresses: true, + Selector: nil, + PublishNotReadyAddresses: false, Ports: basicPorts, ExternalTrafficPolicy: corev1.ServiceExternalTrafficPolicyLocal, AllocateLoadBalancerNodePorts: pointer.Ptr(true), @@ -503,8 +503,8 @@ func TestMemberService(t *testing.T) { }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, - Selector: basicSVCSelector, - PublishNotReadyAddresses: true, + Selector: nil, + PublishNotReadyAddresses: false, Ports: basicPorts, ClusterIP: corev1.ClusterIPNone, }, @@ -535,8 +535,8 @@ func TestMemberService(t *testing.T) { }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, - Selector: basicSVCSelector, - PublishNotReadyAddresses: true, + Selector: nil, + PublishNotReadyAddresses: false, Ports: basicPorts, }, }, @@ -566,8 +566,8 @@ func TestMemberService(t *testing.T) { }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeLoadBalancer, - Selector: basicSVCSelector, - PublishNotReadyAddresses: true, + Selector: nil, + PublishNotReadyAddresses: false, Ports: basicPorts, }, }, @@ -811,14 +811,6 @@ func TestStatefulSetForRack(t *testing.T) { Image: ":", ImagePullPolicy: corev1.PullIfNotPresent, Ports: []corev1.ContainerPort{ - { - Name: "intra-node", - ContainerPort: 7000, - }, - { - Name: "tls-intra-node", - ContainerPort: 7001, - }, { Name: "cql", ContainerPort: 9042, @@ -827,6 +819,14 @@ func TestStatefulSetForRack(t *testing.T) { Name: "cql-ssl", ContainerPort: 9142, }, + { + Name: "cql-sa", + ContainerPort: 19042, + }, + { + Name: "cql-ssl-sa", + ContainerPort: 19142, + }, { Name: "jmx", ContainerPort: 7199, @@ -1033,6 +1033,60 @@ func TestStatefulSetForRack(t *testing.T) { }, }, }, + { + Name: "inter-node-traffic-probe", + Image: "scylladb/scylla-operator:latest", + ImagePullPolicy: corev1.PullIfNotPresent, + Ports: []corev1.ContainerPort{ + { + Name: "inter-node", + ContainerPort: 7000, + }, + { + Name: "ssl-inter-node", + ContainerPort: 7001, + }, + }, + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("10m"), + corev1.ResourceMemory: resource.MustParse("40Mi"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("10m"), + corev1.ResourceMemory: resource.MustParse("40Mi"), + }, + }, + Command: []string{ + "/usr/bin/bash", + "-euExo", + "pipefail", + "-O", + "inherit_errexit", + "-c", + }, + Args: []string{ + ` +function handle-exit { + echo "Shutting down" +} + +trap handle-exit EXIT + +sleep infinity +`, + }, + ReadinessProbe: &corev1.Probe{ + TimeoutSeconds: int32(30), + FailureThreshold: int32(1), + PeriodSeconds: int32(5), + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.FromInt32(7000), + }, + }, + }, + }, { Name: "scylla-manager-agent", Image: ":", @@ -1050,6 +1104,10 @@ func TestStatefulSetForRack(t *testing.T) { Name: "agent-rest-api", ContainerPort: 10001, }, + { + Name: "agent-metrics", + ContainerPort: 5090, + }, }, VolumeMounts: []corev1.VolumeMount{ { @@ -3380,3 +3438,899 @@ alternator_encryption_options: }) } } + +func TestMakeEndpointSlicesAndEndpoints(t *testing.T) { + t.Parallel() + + basicScyllaCluster := &scyllav1.ScyllaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "basic", + Namespace: "default", + UID: "the-uid", + }, + Spec: scyllav1.ScyllaClusterSpec{ + Datacenter: scyllav1.DatacenterSpec{ + Name: "dc", + Racks: []scyllav1.RackSpec{ + { + Name: "rack", + Storage: scyllav1.Storage{ + Capacity: "1Gi", + }, + Members: 1, + }, + }, + }, + }, + } + + tt := []struct { + name string + cluster *scyllav1.ScyllaCluster + pods []*corev1.Pod + services map[string]*corev1.Service + expectedEndpointSlices []*discoveryv1.EndpointSlice + expectedEndpoints []*corev1.Endpoints + }{ + { + name: "EndpointSlices for non-member Service is not reconciled", + cluster: basicScyllaCluster, + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0", + Namespace: "default", + UID: "pod-uid", + }, + Spec: corev1.PodSpec{ + NodeName: "node-a", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + PodIP: "1.1.1.1", + }, + }, + }, + services: map[string]*corev1.Service{ + "basic-dc-rack-0": { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0", + }, + }, + }, + expectedEndpointSlices: []*discoveryv1.EndpointSlice{}, + expectedEndpoints: []*corev1.Endpoints{}, + }, + { + name: "no EndpointSlice when member Service doesn't have a backing Pod", + cluster: basicScyllaCluster, + pods: []*corev1.Pod{}, + services: map[string]*corev1.Service{ + "basic-dc-rack-0": { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0", + Labels: map[string]string{ + "scylla-operator.scylladb.com/scylla-service-type": "member", + }, + }, + }, + }, + expectedEndpointSlices: []*discoveryv1.EndpointSlice{}, + expectedEndpoints: []*corev1.Endpoints{}, + }, + { + name: "no EndpointSlice for Service which backing Pod is being terminated", + cluster: basicScyllaCluster, + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0", + Namespace: "default", + DeletionTimestamp: pointer.Ptr(metav1.Now()), + }, + }, + }, + services: map[string]*corev1.Service{ + "basic-dc-rack-0": { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0", + Labels: map[string]string{ + "scylla-operator.scylladb.com/scylla-service-type": "member", + }, + }, + }, + }, + expectedEndpointSlices: []*discoveryv1.EndpointSlice{}, + expectedEndpoints: []*corev1.Endpoints{}, + }, + { + name: "EndpointSlice for Service ports not backed by any container having readiness probe are ready when entire Pod is ready", + cluster: basicScyllaCluster, + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0", + Namespace: "default", + UID: "pod-uid", + }, + Spec: corev1.PodSpec{ + NodeName: "node-a", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + PodIP: "1.1.1.1", + }, + }, + }, + services: map[string]*corev1.Service{ + "basic-dc-rack-0": { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0", + Labels: map[string]string{ + "scylla-operator.scylladb.com/scylla-service-type": "member", + }, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "port-1", + Port: 666, + Protocol: corev1.ProtocolTCP, + }, + }, + }, + }, + }, + expectedEndpointSlices: []*discoveryv1.EndpointSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0-8fw4fbkk", + Namespace: "default", + Labels: map[string]string{ + "scylla-operator.scylladb.com/scylla-service-type": "member", + "kubernetes.io/service-name": "basic-dc-rack-0", + "endpointslice.kubernetes.io/managed-by": "scylla-operator.scylladb.com", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "scylla.scylladb.com/v1", + Kind: "ScyllaCluster", + Name: "basic", + UID: "the-uid", + Controller: pointer.Ptr(true), + BlockOwnerDeletion: pointer.Ptr(true), + }, + }, + }, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"1.1.1.1"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: pointer.Ptr(true), + Serving: pointer.Ptr(true), + Terminating: pointer.Ptr(false), + }, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "basic-dc-rack-0", + UID: "pod-uid", + }, + NodeName: pointer.Ptr("node-a"), + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Name: pointer.Ptr("port-1"), + Protocol: pointer.Ptr(corev1.ProtocolTCP), + Port: pointer.Ptr(int32(666)), + }, + }, + }, + }, + expectedEndpoints: []*corev1.Endpoints{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0", + Namespace: "default", + Labels: map[string]string{ + "scylla-operator.scylladb.com/scylla-service-type": "member", + "endpointslice.kubernetes.io/skip-mirror": "true", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "scylla.scylladb.com/v1", + Kind: "ScyllaCluster", + Name: "basic", + UID: "the-uid", + Controller: pointer.Ptr(true), + BlockOwnerDeletion: pointer.Ptr(true), + }, + }, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "1.1.1.1", + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "basic-dc-rack-0", + UID: "pod-uid", + }, + NodeName: pointer.Ptr("node-a"), + }, + }, + NotReadyAddresses: nil, + Ports: []corev1.EndpointPort{ + { + Name: "port-1", + Port: 666, + Protocol: "TCP", + }, + }, + }, + }, + }, + }, + }, + { + name: "EndpointSlices for Service ports backed by container having readiness probe are ready when container is ready regardless of Pod readiness", + cluster: basicScyllaCluster, + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0", + Namespace: "default", + UID: "pod-uid", + }, + Spec: corev1.PodSpec{ + NodeName: "node-a", + Containers: []corev1.Container{ + { + Name: "container-1", + Ports: []corev1.ContainerPort{ + { + Name: "port-2", + ContainerPort: 777, + Protocol: corev1.ProtocolTCP, + }, + }, + ReadinessProbe: &corev1.Probe{}, + }, + }, + }, + Status: corev1.PodStatus{ + PodIP: "1.1.1.1", + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "container-1", + Ready: true, + }, + }, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionFalse, + }, + }, + }, + }, + }, + services: map[string]*corev1.Service{ + "basic-dc-rack-0": { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0", + Labels: map[string]string{ + "scylla-operator.scylladb.com/scylla-service-type": "member", + }, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "port-1", + Port: 666, + Protocol: corev1.ProtocolTCP, + TargetPort: intstr.FromInt32(666), + }, + { + Name: "port-2", + Port: 777, + Protocol: corev1.ProtocolTCP, + TargetPort: intstr.FromInt32(777), + }, + }, + }, + }, + }, + expectedEndpointSlices: []*discoveryv1.EndpointSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0-8fw4fbkk", + Namespace: "default", + Labels: map[string]string{ + "scylla-operator.scylladb.com/scylla-service-type": "member", + "kubernetes.io/service-name": "basic-dc-rack-0", + "endpointslice.kubernetes.io/managed-by": "scylla-operator.scylladb.com", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "scylla.scylladb.com/v1", + Kind: "ScyllaCluster", + Name: "basic", + UID: "the-uid", + Controller: pointer.Ptr(true), + BlockOwnerDeletion: pointer.Ptr(true), + }, + }, + }, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"1.1.1.1"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: pointer.Ptr(false), + Serving: pointer.Ptr(false), + Terminating: pointer.Ptr(false), + }, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "basic-dc-rack-0", + UID: "pod-uid", + }, + NodeName: pointer.Ptr("node-a"), + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Name: pointer.Ptr("port-1"), + Protocol: pointer.Ptr(corev1.ProtocolTCP), + Port: pointer.Ptr(int32(666)), + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0-zdd9fpkk", + Namespace: "default", + Labels: map[string]string{ + "scylla-operator.scylladb.com/scylla-service-type": "member", + "kubernetes.io/service-name": "basic-dc-rack-0", + "endpointslice.kubernetes.io/managed-by": "scylla-operator.scylladb.com", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "scylla.scylladb.com/v1", + Kind: "ScyllaCluster", + Name: "basic", + UID: "the-uid", + Controller: pointer.Ptr(true), + BlockOwnerDeletion: pointer.Ptr(true), + }, + }, + }, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"1.1.1.1"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: pointer.Ptr(true), + Serving: pointer.Ptr(true), + Terminating: pointer.Ptr(false), + }, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "basic-dc-rack-0", + UID: "pod-uid", + }, + NodeName: pointer.Ptr("node-a"), + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Name: pointer.Ptr("port-2"), + Protocol: pointer.Ptr(corev1.ProtocolTCP), + Port: pointer.Ptr(int32(777)), + }, + }, + }, + }, + expectedEndpoints: []*corev1.Endpoints{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0", + Namespace: "default", + Labels: map[string]string{ + "scylla-operator.scylladb.com/scylla-service-type": "member", + "endpointslice.kubernetes.io/skip-mirror": "true", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "scylla.scylladb.com/v1", + Kind: "ScyllaCluster", + Name: "basic", + UID: "the-uid", + Controller: pointer.Ptr(true), + BlockOwnerDeletion: pointer.Ptr(true), + }, + }, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "1.1.1.1", + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "basic-dc-rack-0", + UID: "pod-uid", + }, + NodeName: pointer.Ptr("node-a"), + }, + }, + NotReadyAddresses: nil, + Ports: []corev1.EndpointPort{ + { + Name: "port-2", + Port: 777, + Protocol: "TCP", + }, + }, + }, + { + Addresses: nil, + NotReadyAddresses: []corev1.EndpointAddress{ + { + IP: "1.1.1.1", + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "basic-dc-rack-0", + UID: "pod-uid", + }, + NodeName: pointer.Ptr("node-a"), + }, + }, + Ports: []corev1.EndpointPort{ + { + Name: "port-1", + Port: 666, + Protocol: "TCP", + }, + }, + }, + }, + }, + }, + }, + { + name: "Single EndpointSlice when all Service ports are backed by container having readiness probe in multi-container Pod", + cluster: basicScyllaCluster, + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0", + Namespace: "default", + UID: "pod-uid", + }, + Spec: corev1.PodSpec{ + NodeName: "node-a", + Containers: []corev1.Container{ + { + Name: "container-1", + Ports: []corev1.ContainerPort{ + { + Name: "port-1", + ContainerPort: 666, + Protocol: corev1.ProtocolTCP, + }, + { + Name: "port-2", + ContainerPort: 777, + Protocol: corev1.ProtocolTCP, + }, + }, + ReadinessProbe: &corev1.Probe{}, + }, + { + Name: "container-2", + }, + }, + }, + Status: corev1.PodStatus{ + PodIP: "1.1.1.1", + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "container-1", + Ready: true, + }, + { + Name: "container-2", + Ready: true, + }, + }, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionFalse, + }, + }, + }, + }, + }, + services: map[string]*corev1.Service{ + "basic-dc-rack-0": { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0", + Labels: map[string]string{ + "scylla-operator.scylladb.com/scylla-service-type": "member", + }, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "port-1", + Port: 666, + Protocol: corev1.ProtocolTCP, + TargetPort: intstr.FromInt32(666), + }, + { + Name: "port-2", + Port: 777, + Protocol: corev1.ProtocolTCP, + TargetPort: intstr.FromInt32(777), + }, + }, + }, + }, + }, + expectedEndpointSlices: []*discoveryv1.EndpointSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0-nwwwppkk", + Namespace: "default", + Labels: map[string]string{ + "scylla-operator.scylladb.com/scylla-service-type": "member", + "kubernetes.io/service-name": "basic-dc-rack-0", + "endpointslice.kubernetes.io/managed-by": "scylla-operator.scylladb.com", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "scylla.scylladb.com/v1", + Kind: "ScyllaCluster", + Name: "basic", + UID: "the-uid", + Controller: pointer.Ptr(true), + BlockOwnerDeletion: pointer.Ptr(true), + }, + }, + }, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"1.1.1.1"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: pointer.Ptr(true), + Serving: pointer.Ptr(true), + Terminating: pointer.Ptr(false), + }, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "basic-dc-rack-0", + UID: "pod-uid", + }, + NodeName: pointer.Ptr("node-a"), + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Name: pointer.Ptr("port-1"), + Protocol: pointer.Ptr(corev1.ProtocolTCP), + Port: pointer.Ptr(int32(666)), + }, + { + Name: pointer.Ptr("port-2"), + Protocol: pointer.Ptr(corev1.ProtocolTCP), + Port: pointer.Ptr(int32(777)), + }, + }, + }, + }, + expectedEndpoints: []*corev1.Endpoints{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0", + Namespace: "default", + Labels: map[string]string{ + "scylla-operator.scylladb.com/scylla-service-type": "member", + "endpointslice.kubernetes.io/skip-mirror": "true", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "scylla.scylladb.com/v1", + Kind: "ScyllaCluster", + Name: "basic", + UID: "the-uid", + Controller: pointer.Ptr(true), + BlockOwnerDeletion: pointer.Ptr(true), + }, + }, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "1.1.1.1", + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "basic-dc-rack-0", + UID: "pod-uid", + }, + NodeName: pointer.Ptr("node-a"), + }, + }, + NotReadyAddresses: nil, + Ports: []corev1.EndpointPort{ + { + Name: "port-1", + Port: 666, + Protocol: "TCP", + }, + { + Name: "port-2", + Port: 777, + Protocol: "TCP", + }, + }, + }, + }, + }, + }, + }, + { + name: "EndpointSlice has IPV6 AddressType when PodIP from Pod.Status is an IPv6 address", + cluster: basicScyllaCluster, + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0", + Namespace: "default", + UID: "pod-uid", + }, + Spec: corev1.PodSpec{ + NodeName: "node-a", + }, + Status: corev1.PodStatus{ + PodIP: "2001:db8::", + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + }, + services: map[string]*corev1.Service{ + "basic-dc-rack-0": { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0", + Labels: map[string]string{ + "scylla-operator.scylladb.com/scylla-service-type": "member", + }, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "port-1", + Port: 666, + Protocol: corev1.ProtocolTCP, + }, + }, + }, + }, + }, + expectedEndpointSlices: []*discoveryv1.EndpointSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0-8fw4fbkk", + Namespace: "default", + Labels: map[string]string{ + "scylla-operator.scylladb.com/scylla-service-type": "member", + "kubernetes.io/service-name": "basic-dc-rack-0", + "endpointslice.kubernetes.io/managed-by": "scylla-operator.scylladb.com", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "scylla.scylladb.com/v1", + Kind: "ScyllaCluster", + Name: "basic", + UID: "the-uid", + Controller: pointer.Ptr(true), + BlockOwnerDeletion: pointer.Ptr(true), + }, + }, + }, + AddressType: discoveryv1.AddressTypeIPv6, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"2001:db8::"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: pointer.Ptr(true), + Serving: pointer.Ptr(true), + Terminating: pointer.Ptr(false), + }, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "basic-dc-rack-0", + UID: "pod-uid", + }, + NodeName: pointer.Ptr("node-a"), + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Name: pointer.Ptr("port-1"), + Protocol: pointer.Ptr(corev1.ProtocolTCP), + Port: pointer.Ptr(int32(666)), + }, + }, + }, + }, + expectedEndpoints: []*corev1.Endpoints{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0", + Namespace: "default", + Labels: map[string]string{ + "scylla-operator.scylladb.com/scylla-service-type": "member", + "endpointslice.kubernetes.io/skip-mirror": "true", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "scylla.scylladb.com/v1", + Kind: "ScyllaCluster", + Name: "basic", + UID: "the-uid", + Controller: pointer.Ptr(true), + BlockOwnerDeletion: pointer.Ptr(true), + }, + }, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "2001:db8::", + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "basic-dc-rack-0", + UID: "pod-uid", + }, + NodeName: pointer.Ptr("node-a"), + }, + }, + NotReadyAddresses: nil, + Ports: []corev1.EndpointPort{ + { + Name: "port-1", + Port: 666, + Protocol: "TCP", + }, + }, + }, + }, + }, + }, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + podCache := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + + for _, obj := range tc.pods { + err := podCache.Add(obj) + if err != nil { + t.Fatal(err) + } + } + + podLister := corev1listers.NewPodLister(podCache) + + gotEndpointSlices, err := MakeEndpointSlices(tc.cluster, tc.services, podLister) + if err != nil { + t.Error(err) + } + if !apiequality.Semantic.DeepEqual(gotEndpointSlices, tc.expectedEndpointSlices) { + t.Errorf("expected and actual EndpointSlice(s) differ: %s", cmp.Diff(tc.expectedEndpointSlices, gotEndpointSlices)) + + } + }) + } +} + +func Test_NoEndpointHashCollisionOnPodContainersAndPorts(t *testing.T) { + rack := scyllav1.RackSpec{ + Name: "rack", + Storage: scyllav1.Storage{ + Capacity: "1Gi", + }, + Members: 123, + } + + sc := &scyllav1.ScyllaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "basic", + }, + Spec: scyllav1.ScyllaClusterSpec{ + Datacenter: scyllav1.DatacenterSpec{ + Name: "dc", + Racks: []scyllav1.RackSpec{ + rack, + }, + }, + }, + } + + sts, err := StatefulSetForRack(rack, sc, nil, "operator-image", 0, "") + if err != nil { + t.Fatal(err) + } + + svc, err := MemberService(sc, rack.Name, "name", nil, nil) + if err != nil { + t.Fatal(err) + } + + containerPortMap, podPorts, err := mapServicePortsToReadinessResponsibleContainer(svc, sts.Spec.Template.Spec.Containers) + if err != nil { + t.Fatal(err) + } + hashes := make(map[string]string) + + hash, err := endpointPortsHash("", podPorts) + if err != nil { + t.Fatal(err) + } + + collidedContainerName, collision := hashes[hash] + if collision { + t.Errorf("found collision on container endpoint ports, both pod and its controlled ports and %q container with its serving port generate same hash", collidedContainerName) + } + hashes[hash] = "" + + for containerName, servingPorts := range containerPortMap { + hash, err := endpointPortsHash(containerName, servingPorts) + if err != nil { + t.Fatal(err) + } + + colliedContainerName, collision := hashes[hash] + if collision { + t.Errorf("found collision on container endpoint ports, both %q and %q container and their serving port generate same hash", containerName, colliedContainerName) + } + hashes[hash] = containerName + } +} diff --git a/pkg/controller/scyllacluster/sync.go b/pkg/controller/scyllacluster/sync.go index 8ad8ce895b0..ea7861bc5f0 100644 --- a/pkg/controller/scyllacluster/sync.go +++ b/pkg/controller/scyllacluster/sync.go @@ -11,6 +11,7 @@ import ( appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" networkingv1 "k8s.io/api/networking/v1" policyv1 "k8s.io/api/policy/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -186,6 +187,42 @@ func (scc *Controller) sync(ctx context.Context, key string) error { objectErrs = append(objectErrs, err) } + endpointSliceMap, err := controllerhelpers.GetObjects[CT, *discoveryv1.EndpointSlice]( + ctx, + sc, + scyllaClusterControllerGVK, + labels.SelectorFromSet(labels.Set{ + naming.ClusterNameLabel: sc.Name, + discoveryv1.LabelManagedBy: naming.OperatorAppNameWithDomain, + }), + controllerhelpers.ControlleeManagerGetObjectsFuncs[CT, *discoveryv1.EndpointSlice]{ + GetControllerUncachedFunc: scc.scyllaClient.ScyllaClusters(sc.Namespace).Get, + ListObjectsFunc: scc.endpointSliceLister.EndpointSlices(sc.Namespace).List, + PatchObjectFunc: scc.kubeClient.DiscoveryV1().EndpointSlices(sc.Namespace).Patch, + }, + ) + if err != nil { + objectErrs = append(objectErrs, err) + } + + endpointsMap, err := controllerhelpers.GetObjects[CT, *corev1.Endpoints]( + ctx, + sc, + scyllaClusterControllerGVK, + labels.SelectorFromSet(labels.Set{ + naming.ClusterNameLabel: sc.Name, + discoveryv1.LabelManagedBy: naming.OperatorAppNameWithDomain, + }), + controllerhelpers.ControlleeManagerGetObjectsFuncs[CT, *corev1.Endpoints]{ + GetControllerUncachedFunc: scc.scyllaClient.ScyllaClusters(sc.Namespace).Get, + ListObjectsFunc: scc.endpointsLister.Endpoints(sc.Namespace).List, + PatchObjectFunc: scc.kubeClient.CoreV1().Endpoints(sc.Namespace).Patch, + }, + ) + if err != nil { + objectErrs = append(objectErrs, err) + } + objectErr := utilerrors.NewAggregate(objectErrs) if objectErr != nil { return objectErr @@ -340,6 +377,32 @@ func (scc *Controller) sync(ctx context.Context, key string) error { errs = append(errs, fmt.Errorf("can't sync jobs: %w", err)) } + err = controllerhelpers.RunSync( + &status.Conditions, + endpointSliceControllerProgressingCondition, + endpointSliceControllerDegradedCondition, + sc.Generation, + func() ([]metav1.Condition, error) { + return scc.syncEndpointSlices(ctx, sc, endpointSliceMap, serviceMap) + }, + ) + if err != nil { + errs = append(errs, fmt.Errorf("can't sync endpointslices: %w", err)) + } + + err = controllerhelpers.RunSync( + &status.Conditions, + endpointsControllerProgressingCondition, + endpointsControllerDegradedCondition, + sc.Generation, + func() ([]metav1.Condition, error) { + return scc.syncEndpoints(ctx, sc, endpointsMap, serviceMap) + }, + ) + if err != nil { + errs = append(errs, fmt.Errorf("can't sync endpoints: %w", err)) + } + // Aggregate conditions. err = controllerhelpers.SetAggregatedWorkloadConditions(&status.Conditions, sc.Generation) if err != nil { diff --git a/pkg/controller/scyllacluster/sync_endpoints.go b/pkg/controller/scyllacluster/sync_endpoints.go new file mode 100644 index 00000000000..f88df45c58c --- /dev/null +++ b/pkg/controller/scyllacluster/sync_endpoints.go @@ -0,0 +1,74 @@ +package scyllacluster + +import ( + "context" + "fmt" + + scyllav1 "github.com/scylladb/scylla-operator/pkg/api/scylla/v1" + "github.com/scylladb/scylla-operator/pkg/controllerhelpers" + "github.com/scylladb/scylla-operator/pkg/helpers/slices" + "github.com/scylladb/scylla-operator/pkg/naming" + "github.com/scylladb/scylla-operator/pkg/resourceapply" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// Endpoints are reconciled additionally to EndpointSlices because Prometheus Operator which we rely on in ScyllaDBMonitoring +// doesn't support EndpointSlices yet. +func (scc *Controller) syncEndpoints( + ctx context.Context, + sc *scyllav1.ScyllaCluster, + endpoints map[string]*corev1.Endpoints, + services map[string]*corev1.Service, +) ([]metav1.Condition, error) { + var progressingConditions []metav1.Condition + + // EndpointSlices supersedes Endpoints, so to make sure reconciling logic is the same, + // convert from one to the other. + requiredEndpointSlices, err := MakeEndpointSlices(sc, services, scc.podLister) + if err != nil { + return progressingConditions, fmt.Errorf("can't make endpointslices: %w", err) + } + + requiredEndpoints, err := controllerhelpers.ConvertEndpointSlicesToEndpoints(requiredEndpointSlices) + if err != nil { + return progressingConditions, fmt.Errorf("can't convert endpointslices to endpoints: %w", err) + } + + err = controllerhelpers.Prune( + ctx, + requiredEndpoints, + endpoints, + &controllerhelpers.PruneControlFuncs{ + DeleteFunc: scc.kubeClient.CoreV1().Endpoints(sc.Namespace).Delete, + }, + scc.eventRecorder) + if err != nil { + return progressingConditions, fmt.Errorf("can't prune endpoints: %w", err) + } + + for _, rs := range requiredEndpoints { + updated, changed, err := resourceapply.ApplyEndpoints(ctx, scc.kubeClient.CoreV1(), scc.endpointsLister, scc.eventRecorder, rs, resourceapply.ApplyOptions{}) + if changed { + controllerhelpers.AddGenericProgressingStatusCondition(&progressingConditions, endpointsControllerProgressingCondition, rs, "apply", sc.Generation) + } + if err != nil { + return progressingConditions, fmt.Errorf("can't apply endpoints: %w", err) + } + + _, _, hasUnreadySubset := slices.Find(updated.Subsets, func(sn corev1.EndpointSubset) bool { + return len(sn.NotReadyAddresses) != 0 + }) + if len(updated.Subsets) == 0 || hasUnreadySubset { + progressingConditions = append(progressingConditions, metav1.Condition{ + Type: endpointsControllerProgressingCondition, + Status: metav1.ConditionTrue, + Reason: "EndpointsNotReady", + Message: fmt.Sprintf("Endpoints %q is not yet ready", naming.ObjRef(updated)), + ObservedGeneration: sc.Generation, + }) + } + } + + return progressingConditions, nil +} diff --git a/pkg/controller/scyllacluster/sync_endpointslices.go b/pkg/controller/scyllacluster/sync_endpointslices.go new file mode 100644 index 00000000000..c36b42edd1b --- /dev/null +++ b/pkg/controller/scyllacluster/sync_endpointslices.go @@ -0,0 +1,67 @@ +package scyllacluster + +import ( + "context" + "fmt" + + scyllav1 "github.com/scylladb/scylla-operator/pkg/api/scylla/v1" + "github.com/scylladb/scylla-operator/pkg/controllerhelpers" + "github.com/scylladb/scylla-operator/pkg/helpers/slices" + "github.com/scylladb/scylla-operator/pkg/naming" + "github.com/scylladb/scylla-operator/pkg/resourceapply" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func (scc *Controller) syncEndpointSlices( + ctx context.Context, + sc *scyllav1.ScyllaCluster, + endpointSlices map[string]*discoveryv1.EndpointSlice, + services map[string]*corev1.Service, +) ([]metav1.Condition, error) { + var progressingConditions []metav1.Condition + + requiredEndpointSlices, err := MakeEndpointSlices(sc, services, scc.podLister) + if err != nil { + return progressingConditions, fmt.Errorf("can't make endpointslices: %w", err) + } + + err = controllerhelpers.Prune( + ctx, + requiredEndpointSlices, + endpointSlices, + &controllerhelpers.PruneControlFuncs{ + DeleteFunc: scc.kubeClient.DiscoveryV1().EndpointSlices(sc.Namespace).Delete, + }, + scc.eventRecorder) + if err != nil { + return progressingConditions, fmt.Errorf("can't prune endpointslice(s): %w", err) + } + + for _, requiredEndpointSlice := range requiredEndpointSlices { + updated, changed, err := resourceapply.ApplyEndpointSlice(ctx, scc.kubeClient.DiscoveryV1(), scc.endpointSliceLister, scc.eventRecorder, requiredEndpointSlice, resourceapply.ApplyOptions{}) + if changed { + controllerhelpers.AddGenericProgressingStatusCondition(&progressingConditions, endpointSliceControllerProgressingCondition, requiredEndpointSlice, "apply", sc.Generation) + } + if err != nil { + return progressingConditions, fmt.Errorf("can't apply endpointslice: %w", err) + } + + _, _, hasUnreadyEndpoint := slices.Find(updated.Endpoints, func(ep discoveryv1.Endpoint) bool { + return ep.Conditions.Ready == nil || !*ep.Conditions.Ready + }) + + if len(updated.Endpoints) == 0 || hasUnreadyEndpoint { + progressingConditions = append(progressingConditions, metav1.Condition{ + Type: endpointSliceControllerProgressingCondition, + Status: metav1.ConditionTrue, + Reason: "EndpointSliceNotReady", + Message: fmt.Sprintf("EndpointSlice %q is not yet ready", naming.ObjRef(updated)), + ObservedGeneration: sc.Generation, + }) + } + } + + return progressingConditions, nil +} diff --git a/pkg/controllerhelpers/convert.go b/pkg/controllerhelpers/convert.go new file mode 100644 index 00000000000..1155628f8bf --- /dev/null +++ b/pkg/controllerhelpers/convert.go @@ -0,0 +1,102 @@ +// Copyright (c) 2024 ScyllaDB. + +package controllerhelpers + +import ( + "fmt" + "maps" + + "github.com/scylladb/scylla-operator/pkg/helpers/slices" + "github.com/scylladb/scylla-operator/pkg/naming" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func ConvertEndpointSlicesToEndpoints(endpointSlices []*discoveryv1.EndpointSlice) ([]*corev1.Endpoints, error) { + serviceToEndpointSlices := map[string][]*discoveryv1.EndpointSlice{} + for _, es := range endpointSlices { + svcName, ok := es.Labels[discoveryv1.LabelServiceName] + if !ok { + return nil, fmt.Errorf("EndpointSlice is missing service name in label %q", naming.ObjRef(es)) + } + serviceToEndpointSlices[svcName] = append(serviceToEndpointSlices[svcName], es) + } + + var endpoints []*corev1.Endpoints + for svcName, ess := range serviceToEndpointSlices { + var subsets []corev1.EndpointSubset + for _, es := range ess { + var addresses []corev1.EndpointAddress + var notReadyAddresses []corev1.EndpointAddress + + for _, ep := range es.Endpoints { + for _, epa := range ep.Addresses { + ea := corev1.EndpointAddress{ + IP: epa, + Hostname: func() string { + if ep.Hostname != nil { + return *ep.Hostname + } + return "" + }(), + NodeName: ep.NodeName, + TargetRef: ep.TargetRef, + } + if ep.Conditions.Ready != nil && *ep.Conditions.Ready { + addresses = append(addresses, ea) + } else { + notReadyAddresses = append(notReadyAddresses, ea) + } + } + } + + ports := slices.ConvertSlice[corev1.EndpointPort, discoveryv1.EndpointPort](es.Ports, func(port discoveryv1.EndpointPort) corev1.EndpointPort { + ep := corev1.EndpointPort{ + AppProtocol: port.AppProtocol, + } + if port.Name != nil { + ep.Name = *port.Name + } + if port.Port != nil { + ep.Port = *port.Port + } + if port.Protocol != nil { + ep.Protocol = *port.Protocol + } + return ep + }) + + if len(addresses) == 0 && len(notReadyAddresses) == 0 { + continue + } + + subsets = append(subsets, corev1.EndpointSubset{ + Addresses: addresses, + NotReadyAddresses: notReadyAddresses, + Ports: ports, + }) + } + + // Copy metadata from first EndpointSlice of given Service, they should all be the same. + es := ess[0] + endpointsLabels := map[string]string{} + maps.Copy(endpointsLabels, es.Labels) + endpointsLabels[discoveryv1.LabelSkipMirror] = "true" + + endpointsAnnotations := maps.Clone(es.Annotations) + + endpoints = append(endpoints, &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: svcName, + Namespace: es.Namespace, + Labels: endpointsLabels, + Annotations: endpointsAnnotations, + OwnerReferences: es.OwnerReferences, + }, + Subsets: subsets, + }) + } + + return endpoints, nil +} diff --git a/pkg/controllerhelpers/convert_test.go b/pkg/controllerhelpers/convert_test.go new file mode 100644 index 00000000000..8cde5d00087 --- /dev/null +++ b/pkg/controllerhelpers/convert_test.go @@ -0,0 +1,305 @@ +// Copyright (c) 2024 ScyllaDB. + +package controllerhelpers + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/scylladb/scylla-operator/pkg/pointer" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func Test_ConvertEndpointSlicesToEndpoints(t *testing.T) { + t.Parallel() + + tt := []struct { + name string + endpointSlices []*discoveryv1.EndpointSlice + expectedEndpoints []*corev1.Endpoints + }{ + { + name: "single EndpointSlice per Service", + endpointSlices: []*discoveryv1.EndpointSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0-12345678", + Namespace: "default", + Labels: map[string]string{ + "foo": "bar", + "kubernetes.io/service-name": "basic-dc-rack-0", + "endpointslice.kubernetes.io/managed-by": "scylla-operator.scylladb.com/scylla-operator", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "scylla.scylladb.com/v1", + Kind: "ScyllaCluster", + Name: "basic", + UID: "the-uid", + Controller: pointer.Ptr(true), + BlockOwnerDeletion: pointer.Ptr(true), + }, + }, + }, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"1.2.3.4"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: pointer.Ptr(true), + Serving: pointer.Ptr(true), + Terminating: pointer.Ptr(false), + }, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "basic-dc-rack-0", + UID: "pod-uid", + }, + NodeName: pointer.Ptr("node-a"), + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Name: pointer.Ptr("port-1"), + Protocol: pointer.Ptr(corev1.ProtocolTCP), + Port: pointer.Ptr(int32(777)), + }, + }, + }, + }, + expectedEndpoints: []*corev1.Endpoints{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0", + Namespace: "default", + Labels: map[string]string{ + "foo": "bar", + "kubernetes.io/service-name": "basic-dc-rack-0", + "endpointslice.kubernetes.io/managed-by": "scylla-operator.scylladb.com/scylla-operator", + "endpointslice.kubernetes.io/skip-mirror": "true", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "scylla.scylladb.com/v1", + Kind: "ScyllaCluster", + Name: "basic", + UID: "the-uid", + Controller: pointer.Ptr(true), + BlockOwnerDeletion: pointer.Ptr(true), + }, + }, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "1.2.3.4", + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "basic-dc-rack-0", + UID: "pod-uid", + }, + NodeName: pointer.Ptr("node-a"), + }, + }, + NotReadyAddresses: nil, + Ports: []corev1.EndpointPort{ + { + Name: "port-1", + Port: 777, + Protocol: "TCP", + }, + }, + }, + }, + }, + }, + }, + { + name: "multiple EndpointSlices per Service", + endpointSlices: []*discoveryv1.EndpointSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0-abc", + Namespace: "default", + Labels: map[string]string{ + "foo": "bar", + "kubernetes.io/service-name": "basic-dc-rack-0", + "endpointslice.kubernetes.io/managed-by": "scylla-operator.scylladb.com/scylla-operator", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "scylla.scylladb.com/v1", + Kind: "ScyllaCluster", + Name: "basic", + UID: "the-uid", + Controller: pointer.Ptr(true), + BlockOwnerDeletion: pointer.Ptr(true), + }, + }, + }, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"1.2.3.4"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: pointer.Ptr(true), + Serving: pointer.Ptr(true), + Terminating: pointer.Ptr(false), + }, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "basic-dc-rack-0", + UID: "pod-uid", + }, + NodeName: pointer.Ptr("node-a"), + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Name: pointer.Ptr("port-1"), + Protocol: pointer.Ptr(corev1.ProtocolTCP), + Port: pointer.Ptr(int32(666)), + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0-def", + Namespace: "default", + Labels: map[string]string{ + "foo": "bar", + "kubernetes.io/service-name": "basic-dc-rack-0", + "endpointslice.kubernetes.io/managed-by": "scylla-operator.scylladb.com/scylla-operator", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "scylla.scylladb.com/v1", + Kind: "ScyllaCluster", + Name: "basic", + UID: "the-uid", + Controller: pointer.Ptr(true), + BlockOwnerDeletion: pointer.Ptr(true), + }, + }, + }, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"1.2.3.4"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: pointer.Ptr(false), + Serving: pointer.Ptr(false), + Terminating: pointer.Ptr(false), + }, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "basic-dc-rack-0", + UID: "pod-uid", + }, + NodeName: pointer.Ptr("node-a"), + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Name: pointer.Ptr("port-2"), + Protocol: pointer.Ptr(corev1.ProtocolTCP), + Port: pointer.Ptr(int32(777)), + }, + }, + }, + }, + expectedEndpoints: []*corev1.Endpoints{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-dc-rack-0", + Namespace: "default", + Labels: map[string]string{ + "foo": "bar", + "kubernetes.io/service-name": "basic-dc-rack-0", + "endpointslice.kubernetes.io/managed-by": "scylla-operator.scylladb.com/scylla-operator", + "endpointslice.kubernetes.io/skip-mirror": "true", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "scylla.scylladb.com/v1", + Kind: "ScyllaCluster", + Name: "basic", + UID: "the-uid", + Controller: pointer.Ptr(true), + BlockOwnerDeletion: pointer.Ptr(true), + }, + }, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "1.2.3.4", + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "basic-dc-rack-0", + UID: "pod-uid", + }, + NodeName: pointer.Ptr("node-a"), + }, + }, + NotReadyAddresses: nil, + Ports: []corev1.EndpointPort{ + { + Name: "port-1", + Port: 666, + Protocol: "TCP", + }, + }, + }, + { + Addresses: nil, + NotReadyAddresses: []corev1.EndpointAddress{ + { + IP: "1.2.3.4", + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "basic-dc-rack-0", + UID: "pod-uid", + }, + NodeName: pointer.Ptr("node-a"), + }, + }, + Ports: []corev1.EndpointPort{ + { + Name: "port-2", + Port: 777, + Protocol: "TCP", + }, + }, + }, + }, + }, + }, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + endpoints, err := ConvertEndpointSlicesToEndpoints(tc.endpointSlices) + if err != nil { + t.Fatal(err) + } + if !apiequality.Semantic.DeepEqual(endpoints, tc.expectedEndpoints) { + t.Errorf("expected and actual Endpoints differ: %s", cmp.Diff(tc.expectedEndpoints, endpoints)) + } + }) + } +} diff --git a/pkg/controllerhelpers/core.go b/pkg/controllerhelpers/core.go index a140f89f56b..db45e435ddd 100644 --- a/pkg/controllerhelpers/core.go +++ b/pkg/controllerhelpers/core.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" + "github.com/scylladb/scylla-operator/pkg/helpers/slices" "github.com/scylladb/scylla-operator/pkg/internalapi" "github.com/scylladb/scylla-operator/pkg/naming" "github.com/scylladb/scylla-operator/pkg/resource" @@ -13,6 +14,7 @@ import ( apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" corev1schedulinghelpers "k8s.io/component-helpers/scheduling/corev1" "k8s.io/klog/v2" @@ -249,3 +251,28 @@ func GetScyllaContainerID(pod *corev1.Pod) (string, error) { return cs.ContainerID, nil } + +func FindContainerServingPort(svcPort corev1.ServicePort, containers []corev1.Container) (corev1.Container, bool, error) { + var err error + + containerServingPort, _, ok := slices.Find(containers, func(c corev1.Container) bool { + _, _, ok := slices.Find(c.Ports, func(cp corev1.ContainerPort) bool { + switch svcPort.TargetPort.Type { + case intstr.String: + return cp.Name == svcPort.TargetPort.StrVal && cp.Protocol == svcPort.Protocol + case intstr.Int: + return cp.ContainerPort == svcPort.TargetPort.IntVal + default: + err = fmt.Errorf("unsupported type of intstr.IntOrString %d", svcPort.TargetPort.Type) + } + + return false + }) + return ok + }) + if err != nil { + return corev1.Container{}, false, fmt.Errorf("can't find container serving port: %w", err) + } + + return containerServingPort, ok, nil +} diff --git a/pkg/controllerhelpers/core_test.go b/pkg/controllerhelpers/core_test.go index cd67040eeec..4b9090c2eb4 100644 --- a/pkg/controllerhelpers/core_test.go +++ b/pkg/controllerhelpers/core_test.go @@ -6,8 +6,10 @@ import ( "testing" "github.com/google/go-cmp/cmp" + corev1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" ) func TestFindStatusConditionsWithSuffix(t *testing.T) { @@ -358,3 +360,103 @@ func TestAggregateStatusConditions(t *testing.T) { }) } } + +func TestFindContainerServingPort(t *testing.T) { + t.Parallel() + + tt := []struct { + name string + port corev1.ServicePort + containers []corev1.Container + expectedContainer corev1.Container + expectedFound bool + expectedError error + }{ + { + name: "finds container serving port by target port name", + port: corev1.ServicePort{ + TargetPort: intstr.FromString("foo-port"), + }, + containers: []corev1.Container{ + { + Name: "foo-container", + Ports: []corev1.ContainerPort{ + { + Name: "foo-port", + }, + }, + }, + { + Name: "bar-container", + Ports: []corev1.ContainerPort{ + { + Name: "bar-port", + }, + }, + }, + }, + expectedContainer: corev1.Container{ + Name: "foo-container", + Ports: []corev1.ContainerPort{ + { + Name: "foo-port", + }, + }, + }, + expectedFound: true, + expectedError: nil, + }, + { + name: "finds container serving port by port number", + port: corev1.ServicePort{ + TargetPort: intstr.FromInt32(1234), + }, + containers: []corev1.Container{ + { + Name: "foo-container", + Ports: []corev1.ContainerPort{ + { + ContainerPort: 1234, + }, + }, + }, + { + Name: "bar-container", + Ports: []corev1.ContainerPort{ + { + Name: "bar-port", + }, + }, + }, + }, + expectedContainer: corev1.Container{ + Name: "foo-container", + Ports: []corev1.ContainerPort{ + { + ContainerPort: 1234, + }, + }, + }, + expectedFound: true, + expectedError: nil, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + container, found, err := FindContainerServingPort(tc.port, tc.containers) + + if !reflect.DeepEqual(err, tc.expectedError) { + t.Errorf("expected error %#v, got %#v", tc.expectedError, err) + } + if !reflect.DeepEqual(found, tc.expectedFound) { + t.Errorf("expected found %#v, got %#v", tc.expectedFound, found) + } + if !reflect.DeepEqual(container, tc.expectedContainer) { + t.Errorf("expected and got container differ , diff: %s", cmp.Diff(tc.expectedContainer, container)) + } + }) + } +} diff --git a/pkg/naming/constants.go b/pkg/naming/constants.go index 38388f311b8..89ed83bc9e3 100644 --- a/pkg/naming/constants.go +++ b/pkg/naming/constants.go @@ -80,10 +80,11 @@ const ( NodeJobLabel = "scylla-operator.scylladb.com/node-job" NodeJobTypeLabel = "scylla-operator.scylladb.com/node-job-type" - AppName = "scylla" - OperatorAppName = "scylla-operator" - ManagerAppName = "scylla-manager" - NodeConfigAppName = "scylla-node-config" + AppName = "scylla" + OperatorAppName = "scylla-operator" + ManagerAppName = "scylla-manager" + NodeConfigAppName = "scylla-node-config" + OperatorAppNameWithDomain = "scylla-operator.scylladb.com" PrometheusScrapeAnnotation = "prometheus.io/scrape" PrometheusPortAnnotation = "prometheus.io/port" @@ -98,10 +99,11 @@ const ( // Configuration Values const ( - ScyllaContainerName = "scylla" - SidecarInjectorContainerName = "sidecar-injection" - PerftuneContainerName = "perftune" - CleanupContainerName = "cleanup" + ScyllaContainerName = "scylla" + SidecarInjectorContainerName = "sidecar-injection" + PerftuneContainerName = "perftune" + CleanupContainerName = "cleanup" + InterNodeTrafficProbeContainerName = "inter-node-traffic-probe" PVCTemplateName = "data" @@ -126,6 +128,7 @@ const ( LivenessProbePath = "/healthz" ScyllaDBAPIStatusProbePort = 8080 ScyllaAPIPort = 10000 + StoragePort = 7000 OperatorEnvVarPrefix = "SCYLLA_OPERATOR_" ) diff --git a/pkg/util/hash/hash.go b/pkg/util/hash/hash.go index ea4c60f118c..2de97dfbe68 100644 --- a/pkg/util/hash/hash.go +++ b/pkg/util/hash/hash.go @@ -6,10 +6,19 @@ import ( "crypto/sha512" "encoding/base64" "encoding/json" + "hash" + "hash/fnv" ) +func HashObjectsShort(objs ...interface{}) (string, error) { + return hashObjects(fnv.New32a(), objs...) +} + func HashObjects(objs ...interface{}) (string, error) { - hasher := sha512.New() + return hashObjects(sha512.New(), objs...) +} + +func hashObjects(hasher hash.Hash, objs ...interface{}) (string, error) { encoder := json.NewEncoder(hasher) for _, obj := range objs { if err := encoder.Encode(obj); err != nil { diff --git a/pkg/util/hash/hash_test.go b/pkg/util/hash/hash_test.go index 7f7a4c11f2b..ff40a4ebb59 100644 --- a/pkg/util/hash/hash_test.go +++ b/pkg/util/hash/hash_test.go @@ -48,3 +48,47 @@ func TestHashObjects(t *testing.T) { }) } } + +func TestHashObjectsShort(t *testing.T) { + tt := []struct { + name string + sets [][]interface{} + expectedHash string + expectedError error + }{ + { + name: "object's map order doesn't matter", + sets: [][]interface{}{ + { + map[string]string{ + "key_1": "val_1", + "key_2": "val_2", + }, + }, + { + map[string]string{ + "key_2": "val_2", + "key_1": "val_1", + }, + }, + }, + expectedHash: "SK1zSQ==", + expectedError: nil, + }, + } + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + for _, objs := range tc.sets { + got, err := HashObjectsShort(objs...) + + if !reflect.DeepEqual(err, tc.expectedError) { + t.Errorf("expected error %v, got %v", tc.expectedError, err) + } + + if got != tc.expectedHash { + t.Errorf("expected hash %q, got %q", tc.expectedHash, got) + } + } + }) + } +} diff --git a/test/e2e/set/scyllacluster/verify.go b/test/e2e/set/scyllacluster/verify.go index d7f94e7c96f..0c8dd85b3b9 100644 --- a/test/e2e/set/scyllacluster/verify.go +++ b/test/e2e/set/scyllacluster/verify.go @@ -16,8 +16,10 @@ import ( "github.com/scylladb/scylla-operator/test/e2e/utils" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" policyv1 "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes" @@ -201,6 +203,22 @@ func verifyScyllaCluster(ctx context.Context, kubeClient kubernetes.Interface, s condType: "ConfigControllerDegraded", status: metav1.ConditionFalse, }, + { + condType: "EndpointSliceControllerProgressing", + status: metav1.ConditionFalse, + }, + { + condType: "EndpointSliceControllerDegraded", + status: metav1.ConditionFalse, + }, + { + condType: "EndpointsControllerProgressing", + status: metav1.ConditionFalse, + }, + { + condType: "EndpointsControllerDegraded", + status: metav1.ConditionFalse, + }, } if utilfeature.DefaultMutableFeatureGate.Enabled(features.AutomaticTLSCertificates) || sc.Spec.Alternator != nil { @@ -248,6 +266,67 @@ func verifyScyllaCluster(ctx context.Context, kubeClient kubernetes.Interface, s o.Expect(sc.Status.Racks[r.Name].ReadyMembers).To(o.Equal(s.Status.ReadyReplicas)) o.Expect(sc.Status.Racks[r.Name].UpdatedMembers).NotTo(o.BeNil()) o.Expect(*sc.Status.Racks[r.Name].UpdatedMembers).To(o.Equal(s.Status.UpdatedReplicas)) + + for idx := 0; idx < int(r.Members); idx++ { + serviceName := naming.MemberServiceName(r, sc, idx) + service, err := kubeClient.CoreV1().Services(sc.Namespace).Get(ctx, serviceName, metav1.GetOptions{}) + o.Expect(err).NotTo(o.HaveOccurred()) + + pod, err := kubeClient.CoreV1().Pods(sc.Namespace).Get(ctx, service.Name, metav1.GetOptions{}) + o.Expect(err).NotTo(o.HaveOccurred()) + + o.Expect(pod.Status.PodIP).ToNot(o.BeEmpty()) + + endpointSlices, err := kubeClient.DiscoveryV1().EndpointSlices(sc.Namespace).List(ctx, metav1.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{ + discoveryv1.LabelManagedBy: naming.OperatorAppNameWithDomain, + discoveryv1.LabelServiceName: service.Name, + }).String(), + }) + o.Expect(err).NotTo(o.HaveOccurred()) + + o.Expect(endpointSlices.Items).ToNot(o.BeEmpty()) + + for _, es := range endpointSlices.Items { + o.Expect(es.Endpoints).NotTo(o.BeEmpty()) + + for _, ep := range es.Endpoints { + o.Expect(ep.Addresses).To(o.HaveLen(1)) + o.Expect(ep.Addresses[0]).To(o.Equal(pod.Status.PodIP)) + + o.Expect(ep.Conditions.Ready).NotTo(o.BeNil()) + o.Expect(*ep.Conditions.Ready).To(o.BeTrue()) + + o.Expect(ep.Conditions.Serving).NotTo(o.BeNil()) + o.Expect(*ep.Conditions.Serving).To(o.BeTrue()) + + o.Expect(ep.NodeName).NotTo(o.BeNil()) + o.Expect(*ep.NodeName).To(o.Equal(pod.Spec.NodeName)) + + o.Expect(ep.TargetRef).NotTo(o.BeNil()) + o.Expect(ep.TargetRef.Namespace).To(o.Equal(pod.Namespace)) + o.Expect(ep.TargetRef.Name).To(o.Equal(pod.Name)) + o.Expect(ep.TargetRef.UID).To(o.Equal(pod.UID)) + } + } + + endpoints, err := kubeClient.CoreV1().Endpoints(sc.Namespace).Get(ctx, service.Name, metav1.GetOptions{}) + o.Expect(err).NotTo(o.HaveOccurred()) + + o.Expect(endpoints.Subsets).NotTo(o.BeEmpty()) + + for _, subset := range endpoints.Subsets { + o.Expect(subset.NotReadyAddresses).To(o.BeEmpty()) + + o.Expect(subset.Addresses).To(o.HaveLen(1)) + o.Expect(subset.Addresses[0].IP).To(o.Equal(pod.Status.PodIP)) + o.Expect(*subset.Addresses[0].NodeName).To(o.Equal(pod.Spec.NodeName)) + o.Expect(subset.Addresses[0].TargetRef).NotTo(o.BeNil()) + o.Expect(subset.Addresses[0].TargetRef.Namespace).To(o.Equal(pod.Namespace)) + o.Expect(subset.Addresses[0].TargetRef.Name).To(o.Equal(pod.Name)) + o.Expect(subset.Addresses[0].TargetRef.UID).To(o.Equal(pod.UID)) + } + } } if sc.Status.Upgrade != nil {