Skip to content

Commit

Permalink
fixup! Reconcile per-container Endpoints and EndpointSlices for membe…
Browse files Browse the repository at this point in the history
…r Services
  • Loading branch information
zimnx committed Jul 9, 2024
1 parent d8c0574 commit 48d9bb0
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 84 deletions.
23 changes: 12 additions & 11 deletions pkg/controller/scyllacluster/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,10 @@ func MemberService(sc *scyllav1.ScyllaCluster, rackName, name string, oldService
Annotations: svcAnnotations,
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
Ports: servicePorts(sc),
Type: corev1.ServiceTypeClusterIP,
Ports: servicePorts(sc),
// Disable Kubernetes Endpoints reconciliation by setting `nil` Selector.
// Operator reconciles Endpoints of these Services. Check sync loop for EndpointSlice/Endpoints for more details.
Selector: nil,
},
}
Expand Down Expand Up @@ -780,7 +782,7 @@ func StatefulSetForRack(r scyllav1.RackSpec, c *scyllav1.ScyllaCluster, existing
},
},
{
Name: naming.InterNodeTrafficProbeSidecarContainerName,
Name: naming.InterNodeTrafficProbeContainerName,
Image: sidecarImage,
ImagePullPolicy: corev1.PullIfNotPresent,
Ports: []corev1.ContainerPort{
Expand Down Expand Up @@ -1520,30 +1522,29 @@ func MakeEndpointSlices(sc *scyllav1.ScyllaCluster, services map[string]*corev1.
pod, err := podLister.Pods(sc.Namespace).Get(svc.Name)
if err != nil {
if apierrors.IsNotFound(err) {
klog.V(4).InfoS("Pod behind %q Member Service doesn't exists yet", naming.ObjRef(svc))
continue
}
return endpointSlices, fmt.Errorf("can't get Pod %q: %w", naming.ManualRef(sc.Namespace, svc.Name), err)
}

// Don't publish endpoints for Pod that are being deleted.
// Removing endpoints prevents from new connections being established while still allowing
// Don't publish endpoints for Pods that are being deleted.
// Removing endpoints prevents new connections being established while still allowing
// for existing connections to survive and finish their requests.
// We need to do this early, as gap between when Pod is actually deleted and Endpoint
// is reconciled into forwarding rules, may cause that connection will get stuck on SYN,
// and stale tuple entry will prevent retransmission from timing out early and leading
// to next reconnection attempt (#1077).
// We need to do this early, as a gap between when Pod is actually deleted and Endpoint
// is reconciled into forwarding rules, may cause connection to get stuck on SYN.
// A stale tuple entry prevents retransmissions from timing out early and leading
// to the next reconnection attempt (#1077).
if pod.DeletionTimestamp != nil {
continue
}

endpointSliceLabels := map[string]string{}
maps.Copy(endpointSliceLabels, sc.Labels)
maps.Copy(endpointSliceLabels, svc.Labels)
endpointSliceLabels[discoveryv1.LabelManagedBy] = naming.OperatorAppNameWithDomain
endpointSliceLabels[discoveryv1.LabelServiceName] = svc.Name

endpointSliceAnnotations := map[string]string{}
maps.Copy(endpointSliceAnnotations, sc.Annotations)
maps.Copy(endpointSliceAnnotations, svc.Annotations)

podReference := &corev1.ObjectReference{
Expand Down
11 changes: 5 additions & 6 deletions pkg/controller/scyllacluster/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1035,7 +1035,7 @@ func TestStatefulSetForRack(t *testing.T) {
},
},
{
Name: "inter-node-traffic-probe-sidecar",
Name: "inter-node-traffic-probe",
Image: "scylladb/scylla-operator:latest",
ImagePullPolicy: corev1.PullIfNotPresent,
Ports: []corev1.ContainerPort{
Expand Down Expand Up @@ -3686,7 +3686,7 @@ func TestMakeEndpointSlicesAndEndpoints(t *testing.T) {
},
},
{
name: "EndpointSlices to Service ports backed by container having readiness probe are ready when container is ready regardless of Pod readiness",
name: "EndpointSlices for Service ports backed by container having readiness probe are ready when container is ready regardless of Pod readiness",
cluster: basicScyllaCluster,
pods: []*corev1.Pod{
{
Expand Down Expand Up @@ -4100,7 +4100,7 @@ func TestMakeEndpointSlicesAndEndpoints(t *testing.T) {
},
},
{
name: "EndpointSlice have IPV6 AddressType when PodIP from Pod.Status is an IPv6 address",
name: "EndpointSlice has IPV6 AddressType when PodIP from Pod.Status is an IPv6 address",
cluster: basicScyllaCluster,
pods: []*corev1.Pod{
{
Expand Down Expand Up @@ -4239,8 +4239,7 @@ func TestMakeEndpointSlicesAndEndpoints(t *testing.T) {
},
}

for i := range tt {
tc := tt[i]
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -4332,7 +4331,7 @@ func Test_NoEndpointHashCollisionOnPodContainersAndPorts(t *testing.T) {

collidedContainerName, collision := hashes[hash]
if collision {
t.Errorf("found collision on container endpoint ports, both pod and it's controlled ports and %q container with its serving port generate same hash", collidedContainerName)
t.Errorf("found collision on container endpoint ports, both pod and its controlled ports and %q container with its serving port generate same hash", collidedContainerName)
}
hashes[hash] = ""

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/scyllacluster/sync_endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// Endpoints are reconciled additionally to EndpointSlices because Prometheus Operator we rely in ScyllaDBMonitoring
// Endpoints are reconciled additionally to EndpointSlices because Prometheus Operator which we rely on in ScyllaDBMonitoring
// doesn't support EndpointSlices yet.
func (scc *Controller) syncEndpoints(
ctx context.Context,
Expand Down
7 changes: 3 additions & 4 deletions pkg/controllerhelpers/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func Test_ConvertEndpointSlicesToEndpoints(t *testing.T) {
},
},
{
name: "multi EndpointSlices per Service",
name: "multiple EndpointSlices per Service",
endpointSlices: []*discoveryv1.EndpointSlice{
{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -289,8 +289,7 @@ func Test_ConvertEndpointSlicesToEndpoints(t *testing.T) {
},
}

for i := range tt {
tc := tt[i]
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

Expand All @@ -299,7 +298,7 @@ func Test_ConvertEndpointSlicesToEndpoints(t *testing.T) {
t.Fatal(err)
}
if !apiequality.Semantic.DeepEqual(endpoints, tc.expectedEndpoints) {
t.Errorf("expected and actual Endpoints(s) differ: %s", cmp.Diff(tc.expectedEndpoints, endpoints))
t.Errorf("expected and actual Endpoints differ: %s", cmp.Diff(tc.expectedEndpoints, endpoints))
}
})
}
Expand Down
26 changes: 8 additions & 18 deletions pkg/controllerhelpers/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,28 +252,18 @@ func GetScyllaContainerID(pod *corev1.Pod) (string, error) {
return cs.ContainerID, nil
}

func FindContainerServingPort(port corev1.ServicePort, containers []corev1.Container) (corev1.Container, bool, error) {
func FindContainerServingPort(svcPort corev1.ServicePort, containers []corev1.Container) (corev1.Container, bool, error) {
var err error

containerServingPort, _, ok := slices.Find(containers, func(c corev1.Container) bool {
_, _, ok := slices.Find(c.Ports, func(cp corev1.ContainerPort) bool {
if port.Name == cp.Name {
return true
}

if port.Port == cp.ContainerPort {
return true
}

if port.TargetPort.String() != "" {
switch port.TargetPort.Type {
case intstr.String:
return cp.Name == port.TargetPort.StrVal
case intstr.Int:
return cp.ContainerPort == port.TargetPort.IntVal
default:
err = fmt.Errorf("unsupported type of intstr.IntOrString %d", port.TargetPort.Type)
}
switch svcPort.TargetPort.Type {
case intstr.String:
return cp.Name == svcPort.TargetPort.StrVal && cp.Protocol == svcPort.Protocol
case intstr.Int:
return cp.ContainerPort == svcPort.TargetPort.IntVal
default:
err = fmt.Errorf("unsupported type of intstr.IntOrString %d", svcPort.TargetPort.Type)
}

return false
Expand Down
43 changes: 4 additions & 39 deletions pkg/controllerhelpers/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,9 @@ func TestFindContainerServingPort(t *testing.T) {
expectedError error
}{
{
name: "finds container serving port by port name",
name: "finds container serving port by target port name",
port: corev1.ServicePort{
Name: "foo-port",
TargetPort: intstr.FromString("foo-port"),
},
containers: []corev1.Container{
{
Expand Down Expand Up @@ -408,40 +408,6 @@ func TestFindContainerServingPort(t *testing.T) {
},
{
name: "finds container serving port by port number",
port: corev1.ServicePort{
Port: 1234,
},
containers: []corev1.Container{
{
Name: "foo-container",
Ports: []corev1.ContainerPort{
{
ContainerPort: 1234,
},
},
},
{
Name: "bar-container",
Ports: []corev1.ContainerPort{
{
Name: "bar-port",
},
},
},
},
expectedContainer: corev1.Container{
Name: "foo-container",
Ports: []corev1.ContainerPort{
{
ContainerPort: 1234,
},
},
},
expectedFound: true,
expectedError: nil,
},
{
name: "finds container serving port by target port",
port: corev1.ServicePort{
TargetPort: intstr.FromInt32(1234),
},
Expand Down Expand Up @@ -476,8 +442,7 @@ func TestFindContainerServingPort(t *testing.T) {
},
}

for i := range tt {
tc := tt[i]
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

Expand All @@ -490,7 +455,7 @@ func TestFindContainerServingPort(t *testing.T) {
t.Errorf("expected found %#v, got %#v", tc.expectedFound, found)
}
if !reflect.DeepEqual(container, tc.expectedContainer) {
t.Errorf("expected container %#v, got %#v", tc.expectedContainer, container)
t.Errorf("expected and got container differ , diff: %s", cmp.Diff(tc.expectedContainer, container))
}
})
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/naming/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ const (

// Configuration Values
const (
ScyllaContainerName = "scylla"
SidecarInjectorContainerName = "sidecar-injection"
PerftuneContainerName = "perftune"
CleanupContainerName = "cleanup"
InterNodeTrafficProbeSidecarContainerName = "inter-node-traffic-probe-sidecar"
ScyllaContainerName = "scylla"
SidecarInjectorContainerName = "sidecar-injection"
PerftuneContainerName = "perftune"
CleanupContainerName = "cleanup"
InterNodeTrafficProbeContainerName = "inter-node-traffic-probe"

PVCTemplateName = "data"

Expand Down

0 comments on commit 48d9bb0

Please sign in to comment.