Skip to content

Commit

Permalink
ambient: support service without selector workloads (istio#51829) (is…
Browse files Browse the repository at this point in the history
…tio#51921)

(cherry picked from commit fea8abb)
  • Loading branch information
howardjohn authored Jul 9, 2024
1 parent 0dad71e commit 1e0dc8d
Show file tree
Hide file tree
Showing 2 changed files with 279 additions and 3 deletions.
111 changes: 109 additions & 2 deletions pilot/pkg/serviceregistry/kube/controller/ambient/workloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ import (
"istio.io/istio/pkg/workloadapi"
)

// WorkloadsCollection builds out the core Workload object type used in ambient mode.
// A Workload represents a single addressable unit of compute -- typically a Pod or a VM.
// Workloads can come from a variety of sources; these are joined together to build one complete `Collection[WorkloadInfo]`.
func (a *index) WorkloadsCollection(
Pods krt.Collection[*v1.Pod],
Nodes krt.Collection[*v1.Node],
Expand All @@ -59,16 +62,32 @@ func (a *index) WorkloadsCollection(
Namespaces krt.Collection[*v1.Namespace],
) krt.Collection[model.WorkloadInfo] {
WorkloadServicesNamespaceIndex := krt.NewNamespaceIndex(WorkloadServices)
EndpointSlicesByIPIndex := endpointSliceAddressIndex(EndpointSlices)
// Workloads coming from Pods. There should be one workload for each (running) Pod.
PodWorkloads := krt.NewCollection(
Pods,
a.podWorkloadBuilder(MeshConfig, AuthorizationPolicies, PeerAuths, Waypoints, WorkloadServices, WorkloadServicesNamespaceIndex, Namespaces, Nodes),
a.podWorkloadBuilder(
MeshConfig,
AuthorizationPolicies,
PeerAuths,
Waypoints,
WorkloadServices,
WorkloadServicesNamespaceIndex,
EndpointSlices,
EndpointSlicesByIPIndex,
Namespaces,
Nodes,
),
krt.WithName("PodWorkloads"),
)
// Workloads coming from WorkloadEntries. These are 1:1 with WorkloadEntry.
WorkloadEntryWorkloads := krt.NewCollection(
WorkloadEntries,
a.workloadEntryWorkloadBuilder(MeshConfig, AuthorizationPolicies, PeerAuths, Waypoints, WorkloadServices, WorkloadServicesNamespaceIndex, Namespaces),
krt.WithName("WorkloadEntryWorkloads"),
)
// Workloads coming from ServiceEntries. These are inlined WorkloadEntries (under `spec.endpoints`); these ServiceEntries will
// also be generating `workloadapi.Service` definitions in the `ServicesCollection` logic.
ServiceEntryWorkloads := krt.NewManyCollection(ServiceEntries, func(ctx krt.HandlerContext, se *networkingclient.ServiceEntry) []model.WorkloadInfo {
if len(se.Spec.Endpoints) == 0 {
return nil
Expand Down Expand Up @@ -143,6 +162,11 @@ func (a *index) WorkloadsCollection(
}
return res
}, krt.WithName("ServiceEntryWorkloads"))
// Workloads coming from EndpointSlices. These are for *manually added* endpoints. Typically, Kubernetes will insert each pod
// into the EndpointSlice. This is because Kubernetes has 3 APIs in its model: Service, Pod, and EndpointSlice.
// In our API, we only have two: Service and Workload.
// Pod provides much more information than EndpointSlice, so typically we just consume that directly; see method for more details
// on when we will build from an EndpointSlice.
EndpointSliceWorkloads := krt.NewManyCollection(
EndpointSlices,
a.endpointSlicesBuilder(MeshConfig, WorkloadServices),
Expand Down Expand Up @@ -229,6 +253,8 @@ func (a *index) podWorkloadBuilder(
Waypoints krt.Collection[Waypoint],
WorkloadServices krt.Collection[model.ServiceInfo],
WorkloadServicesNamespaceIndex *krt.Index[model.ServiceInfo, string],
EndpointSlices krt.Collection[*discovery.EndpointSlice],
EndpointSlicesAddressIndex *krt.Index[*discovery.EndpointSlice, string],
Namespaces krt.Collection[*v1.Namespace],
Nodes krt.Collection[*v1.Node],
) func(ctx krt.HandlerContext, p *v1.Pod) *model.WorkloadInfo {
Expand All @@ -253,7 +279,8 @@ func (a *index) podWorkloadBuilder(
}))
}
services := krt.Fetch(ctx, WorkloadServices, fo...)
// Logic from https://github.com/howardjohn/kubernetes/blob/7c873327b679a70337288da62b96dd610858181d/staging/src/k8s.io/endpointslice/utils.go#L37
services = append(services, a.matchingServicesWithoutSelectors(ctx, p, services, WorkloadServices, EndpointSlices, EndpointSlicesAddressIndex)...)
// Logic from https://github.com/kubernetes/kubernetes/blob/7c873327b679a70337288da62b96dd610858181d/staging/src/k8s.io/endpointslice/utils.go#L37
// Kubernetes has Ready, Serving, and Terminating. We only have a boolean, which is sufficient for our cases
status := workloadapi.WorkloadStatus_HEALTHY
if !IsPodReady(p) || p.DeletionTimestamp != nil {
Expand Down Expand Up @@ -304,6 +331,72 @@ func (a *index) podWorkloadBuilder(
}
}

func getPodIPs(p *v1.Pod) []v1.PodIP {
k8sPodIPs := p.Status.PodIPs
if len(k8sPodIPs) == 0 && p.Status.PodIP != "" {
k8sPodIPs = []v1.PodIP{{IP: p.Status.PodIP}}
}
return k8sPodIPs
}

// matchingServicesWithoutSelectors finds all Services that match a given pod that do not use selectors.
// See https://kubernetes.io/docs/concepts/services-networking/service/#services-without-selectors for more info.
// For selector service, we query by the selector elsewhere, so this only handles the services that are NOT already found
// by a selector.
func (a *index) matchingServicesWithoutSelectors(
ctx krt.HandlerContext,
p *v1.Pod,
alreadyMatchingServices []model.ServiceInfo,
WorkloadServices krt.Collection[model.ServiceInfo],
EndpointSlices krt.Collection[*discovery.EndpointSlice],
EndpointSlicesAddressIndex *krt.Index[*discovery.EndpointSlice, string],
) []model.ServiceInfo {
k8sPodIPs := getPodIPs(p)
if len(k8sPodIPs) == 0 {
return nil
}
var res []model.ServiceInfo
// Build out our set of already-matched services to avoid double-selecting a service
seen := sets.NewWithLength[string](len(alreadyMatchingServices))
for _, s := range alreadyMatchingServices {
seen.Insert(s.Hostname)
}
for _, ip := range k8sPodIPs {
// For each IP, find any EndpointSlices referencing it.
matchedSlices := krt.Fetch(ctx, EndpointSlices, krt.FilterIndex(EndpointSlicesAddressIndex, ip.IP))
for _, es := range matchedSlices {
if es.AddressType == discovery.AddressTypeFQDN {
// Currently we do not support FQDN.
continue
}
serviceName, f := es.Labels[discovery.LabelServiceName]
if !f {
// Not for a service; we don't care about it.
continue
}
hostname := string(kube.ServiceHostname(serviceName, es.Namespace, a.DomainSuffix))
if seen.Contains(hostname) {
// We already know about this service
continue
}
// This pod is included in the EndpointSlice. We need to fetch the Service object for it, by key.
serviceKey := es.Namespace + "/" + hostname
svcs := krt.Fetch(ctx, WorkloadServices, krt.FilterKey(serviceKey), krt.FilterGeneric(func(a any) bool {
// Only find Service, not Service Entry
return a.(model.ServiceInfo).Source == kind.Service
}))
if len(svcs) == 0 {
// no service found
continue
}
// There SHOULD only be one. This is only for `Service` which has unique hostnames.
svc := svcs[0]
res = append(res, svc)
}
}
return res
}

func (a *index) buildWorkloadPolicies(
ctx krt.HandlerContext,
AuthorizationPolicies krt.Collection[model.WorkloadAuthorization],
Expand Down Expand Up @@ -650,3 +743,17 @@ func implicitWaypointPolicies(ctx krt.HandlerContext, Waypoints krt.Collection[W
return ptr.Of(w.Namespace + "/" + policy)
})
}

func endpointSliceAddressIndex(EndpointSlices krt.Collection[*discovery.EndpointSlice]) *krt.Index[*discovery.EndpointSlice, string] {
return krt.NewIndex(EndpointSlices, func(es *discovery.EndpointSlice) []string {
if es.AddressType == discovery.AddressTypeFQDN {
// Currently we do not support FQDN.
return nil
}
res := make([]string, 0, len(es.Endpoints))
for _, ep := range es.Endpoints {
res = append(res, ep.Addresses...)
}
return res
})
}
171 changes: 170 additions & 1 deletion pilot/pkg/serviceregistry/kube/controller/ambient/workloads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

meshapi "istio.io/api/mesh/v1alpha1"
Expand All @@ -32,6 +33,7 @@ import (
"istio.io/istio/pkg/config/schema/kind"
"istio.io/istio/pkg/kube/krt"
"istio.io/istio/pkg/network"
"istio.io/istio/pkg/ptr"
"istio.io/istio/pkg/slices"
"istio.io/istio/pkg/test/util/assert"
"istio.io/istio/pkg/workloadapi"
Expand Down Expand Up @@ -354,6 +356,98 @@ func TestPodWorkloads(t *testing.T) {
},
},
},
{
name: "pod as part of selectorless service",
inputs: []any{
model.ServiceInfo{
Service: &workloadapi.Service{
Name: "svc",
Namespace: "default",
Hostname: "svc.default.svc.domain.suffix",
Ports: []*workloadapi.Port{
{
ServicePort: 80,
TargetPort: 80,
},
},
},
PortNames: map[int32]model.ServicePortName{
80: {PortName: "80"},
},
// no selector!
LabelSelector: model.LabelSelector{},
Source: kind.Service,
},
// EndpointSlice manually associates the pod with a service
&discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: "svc-123",
Namespace: "default",
Labels: map[string]string{
discovery.LabelServiceName: "svc",
},
},
AddressType: discovery.AddressTypeIPv4,
Endpoints: []discovery.Endpoint{
{
Addresses: []string{"1.2.3.4"},
Conditions: discovery.EndpointConditions{
Ready: ptr.Of(true),
},
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "pod-123",
Namespace: "default",
},
},
},
Ports: []discovery.EndpointPort{
{
Name: ptr.Of("http"),
Protocol: ptr.Of(v1.ProtocolTCP),
Port: ptr.Of(int32(80)),
},
},
},
},
pod: &v1.Pod{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "pod-123",
Namespace: "default",
Labels: map[string]string{
"app": "foo",
},
},
Spec: v1.PodSpec{},
Status: v1.PodStatus{
Phase: v1.PodRunning,
Conditions: podReady,
PodIP: "1.2.3.4",
},
},
result: &workloadapi.Workload{
Uid: "cluster0//Pod/default/pod-123",
Name: "pod-123",
Namespace: "default",
Addresses: [][]byte{netip.AddrFrom4([4]byte{1, 2, 3, 4}).AsSlice()},
Network: testNW,
CanonicalName: "foo",
CanonicalRevision: "latest",
WorkloadType: workloadapi.WorkloadType_POD,
WorkloadName: "pod-123",
Status: workloadapi.WorkloadStatus_HEALTHY,
ClusterId: testC,
Services: map[string]*workloadapi.PortList{
"default/svc.default.svc.domain.suffix": {
Ports: []*workloadapi.Port{{
ServicePort: 80,
TargetPort: 80,
}},
},
},
},
},
}
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -370,9 +464,22 @@ func TestPodWorkloads(t *testing.T) {
MeshConfig := krt.NewStatic(&MeshConfig{m})
Namespaces := krt.NewStaticCollection(extractType[*v1.Namespace](&inputs))
Nodes := krt.NewStaticCollection(extractType[*v1.Node](&inputs))
EndpointSlices := krt.NewStaticCollection(extractType[*discovery.EndpointSlice](&inputs))
assert.Equal(t, len(inputs), 0, fmt.Sprintf("some inputs were not consumed: %v", inputs))
WorkloadServicesNamespaceIndex := krt.NewNamespaceIndex(WorkloadServices)
builder := a.podWorkloadBuilder(MeshConfig, AuthorizationPolicies, PeerAuths, Waypoints, WorkloadServices, WorkloadServicesNamespaceIndex, Namespaces, Nodes)
EndpointSlicesAddressIndex := endpointSliceAddressIndex(EndpointSlices)
builder := a.podWorkloadBuilder(
MeshConfig,
AuthorizationPolicies,
PeerAuths,
Waypoints,
WorkloadServices,
WorkloadServicesNamespaceIndex,
EndpointSlices,
EndpointSlicesAddressIndex,
Namespaces,
Nodes,
)
wrapper := builder(krt.TestingDummyContext{}, tt.pod)
var res *workloadapi.Workload
if wrapper != nil {
Expand Down Expand Up @@ -642,13 +749,75 @@ func TestWorkloadEntryWorkloads(t *testing.T) {
}
}

func TestEndpointSliceWorkloads(t *testing.T) {
cases := []struct {
name string
inputs []any
slice *discovery.EndpointSlice
result []*workloadapi.Workload
}{
{
name: "api server",
inputs: []any{},
slice: &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: "kubernetes",
Namespace: "default",
Labels: map[string]string{
discovery.LabelServiceName: "kubernetes",
},
},
AddressType: discovery.AddressTypeIPv4,
Endpoints: []discovery.Endpoint{
{
Addresses: []string{"172.18.0.5"},
Conditions: discovery.EndpointConditions{
Ready: ptr.Of(true),
},
},
},
Ports: []discovery.EndpointPort{
{
Name: ptr.Of("https"),
Protocol: ptr.Of(v1.ProtocolTCP),
Port: ptr.Of(int32(6443)),
},
},
},
result: nil,
},
}
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
inputs := tt.inputs
a := newAmbientUnitTest()
WorkloadServices := krt.NewStaticCollection(extractType[model.ServiceInfo](&inputs))
m := slices.First(extractType[meshapi.MeshConfig](&inputs))
if m == nil {
m = mesh.DefaultMeshConfig()
}
MeshConfig := krt.NewStatic(&MeshConfig{m})
builder := a.endpointSlicesBuilder(
MeshConfig,
WorkloadServices,
)
res := builder(krt.TestingDummyContext{}, tt.slice)
wl := slices.Map(res, func(e model.WorkloadInfo) *workloadapi.Workload {
return e.Workload
})
assert.Equal(t, wl, tt.result)
})
}
}

func newAmbientUnitTest() *index {
return &index{
networkUpdateTrigger: krt.NewRecomputeTrigger(),
ClusterID: testC,
Network: func(endpointIP string, labels labels.Instance) network.ID {
return testNW
},
DomainSuffix: "domain.suffix",
}
}

Expand Down

0 comments on commit 1e0dc8d

Please sign in to comment.