Skip to content

Commit

Permalink
improve pod matching (#188)
Browse files Browse the repository at this point in the history
  • Loading branch information
matheuscscp authored Jul 12, 2024
1 parent 3aaf703 commit 541a823
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 56 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

FROM golang:1.22.2-alpine3.19 as builder
FROM golang:1.22.2-alpine3.19 AS builder

WORKDIR /app

Expand Down
4 changes: 2 additions & 2 deletions helm/gke-metadata-server/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ config:
webhookAddr: :8081 # Network address where the Mutating Webhook server will listen on.
watchPods:
enabled: true # Whether or not to watch and cache the Pods running on the same Node.
disableFallback: false # Whether or not to disable the simple fallback method for retrieving Pods upon cache misses.
disableFallback: false # Whether or not to disable the simple fallback method for looking up Pods upon cache misses.
resyncPeriod: 10m # How often to fully resync.
watchServiceAccounts:
enabled: true # Whether or not to watch and cache all the Service Accounts of the cluster.
disableFallback: false # Whether or not to disable the simple fallback method for retrieving Service Accounts upon cache misses.
disableFallback: false # Whether or not to disable the simple fallback method for looking up Service Accounts upon cache misses.
resyncPeriod: 1h # How often to fully resync.
cacheTokens:
enabled: true # Whether or not to proactively cache tokens for the Service Accounts used by the Pods running in the same Node.
Expand Down
26 changes: 17 additions & 9 deletions internal/pods/list/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,31 @@ func NewProvider(opts ProviderOptions) pods.Provider {
}

func (p *Provider) GetByIP(ctx context.Context, ipAddr string) (*corev1.Pod, error) {
podList, err := p.opts.KubeClient.CoreV1().Pods(corev1.NamespaceAll).List(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("spec.nodeName=%s,status.podIP=%s", p.opts.NodeName, ipAddr),
})
fieldSelector := strings.Join([]string{
"spec.nodeName=" + p.opts.NodeName,
"spec.hostNetwork=false",
"status.podIP=" + ipAddr,
}, ",")
podList, err := p.opts.KubeClient.
CoreV1().
Pods(corev1.NamespaceAll).
List(ctx, metav1.ListOptions{FieldSelector: fieldSelector})
if err != nil {
return nil, fmt.Errorf("error listing pods in the node matching cluster ip address %q: %w", ipAddr, err)
return nil, fmt.Errorf("error listing pods in the node matching cluster ip %s: %w", ipAddr, err)
}
if n := len(podList.Items); n != 1 || podList.Items[0].Spec.HostNetwork {
if n == 1 { // pods on the host network are not supported, see README.md
podList.Items = nil
n = 0

if n := len(podList.Items); n != 1 {
if n == 0 {
return nil, fmt.Errorf("no pods found in the node matching cluster ip %s", ipAddr)
}

refs := make([]string, n)
for i, pod := range podList.Items {
refs[i] = fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
}
return nil, fmt.Errorf("error listing pods in the node matching cluster ip address %q: %v pods found instead of 1 [%s]",
return nil, fmt.Errorf("multiple pods found in the node matching cluster ip %s (%v pods): %s",
ipAddr, n, strings.Join(refs, ", "))
}

return &podList.Items[0], nil
}
75 changes: 44 additions & 31 deletions internal/pods/watch/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,21 @@ func NewProvider(opts ProviderOptions) *Provider {
opts.KubeClient,
corev1.NamespaceAll,
opts.ResyncPeriod,
cache.Indexers{ // pods on the host network are not supported, see README.md
cache.Indexers{
ipIndex: func(obj interface{}) ([]string, error) {
pod := obj.(*corev1.Pod)
return []string{pod.Status.PodIP, fmt.Sprint(pod.Spec.HostNetwork)}, nil
podIP := pod.Status.PodIP
if podIP == "" {
podIP = "<empty>"
}
return []string{podIP}, nil
},
},
func(lo *metav1.ListOptions) {
lo.FieldSelector = fmt.Sprintf("spec.nodeName=%s", opts.NodeName)
lo.FieldSelector = strings.Join([]string{
"spec.nodeName=" + opts.NodeName,
"spec.hostNetwork=false",
}, ",")
},
)

Expand All @@ -111,22 +118,14 @@ func NewProvider(opts ProviderOptions) *Provider {
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
numPods.Inc()
pod := obj.(*corev1.Pod)
if pod.Spec.HostNetwork {
return
}
for _, l := range p.listeners {
l.AddPod(pod)
l.AddPod(obj.(*corev1.Pod))
}
},
DeleteFunc: func(obj interface{}) {
numPods.Dec()
pod := obj.(*corev1.Pod)
if pod.Spec.HostNetwork {
return
}
for _, l := range p.listeners {
l.DeletePod(pod)
l.DeletePod(obj.(*corev1.Pod))
}
},
})
Expand All @@ -135,44 +134,58 @@ func NewProvider(opts ProviderOptions) *Provider {
}

func (p *Provider) GetByIP(ctx context.Context, ipAddr string) (*corev1.Pod, error) {
pod, err := p.getByIP(ctx, ipAddr)
pod, err := p.getByIP(ipAddr)
if err == nil {
return pod, nil
}
if p.opts.FallbackSource == nil {
return nil, fmt.Errorf("error getting pod by ip %q from cache: %w", ipAddr, err)
return nil, fmt.Errorf("error getting pod with cluster ip %s from cache: %w", ipAddr, err)
}
p.cacheMisses.Inc()

logging.
FromContext(ctx).
WithError(err).
Error("error getting pod by ip from cache, delegating request to fallback source")
return p.opts.FallbackSource.GetByIP(ctx, ipAddr)
WithField("cluster_ip", ipAddr).
Error("error getting pod by cluster ip from cache, delegating request to fallback source")

pod, err = p.opts.FallbackSource.GetByIP(ctx, ipAddr)
if err != nil {
return nil, err
}
p.cacheMisses.Inc()
return pod, nil
}

func (p *Provider) getByIP(ctx context.Context, ipAddr string) (*corev1.Pod, error) {
v, err := p.informer.GetIndexer().Index(ipIndex, &corev1.Pod{
func (p *Provider) getByIP(ipAddr string) (*corev1.Pod, error) {
list, err := p.informer.GetIndexer().Index(ipIndex, &corev1.Pod{
Status: corev1.PodStatus{PodIP: ipAddr},
})
if err != nil {
return nil, fmt.Errorf("error getting pod from cache by ip address %q: %w", ipAddr, err)
return nil, fmt.Errorf("error listing pods in the node cache matching cluster ip %s: %w", ipAddr, err)
}
var podsMatchingIP []*corev1.Pod
for _, p := range v {
pod := p.(*corev1.Pod)
if pod.Status.PodIP == ipAddr {
podsMatchingIP = append(podsMatchingIP, pod)
}

var matchingPods []*corev1.Pod
for _, v := range list {
pod := v.(*corev1.Pod)
matchingPods = append(matchingPods, pod)
// if pod.Spec.NodeName == p.opts.NodeName && !pod.Spec.HostNetwork && pod.Status.PodIP == ipAddr {
// }
}
if n := len(podsMatchingIP); n != 1 {

if n := len(matchingPods); n != 1 {
if n == 0 {
return nil, fmt.Errorf("no pods found in the node cache matching cluster ip %s", ipAddr)
}

refs := make([]string, n)
for i, pod := range podsMatchingIP {
for i, pod := range matchingPods {
refs[i] = fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
}
return nil, fmt.Errorf("error getting pod from cache by ip address %q: %v pods found instead of 1 [%s]",
return nil, fmt.Errorf("multiple pods found in the node cache matching cluster ip %s (%v pods): %s",
ipAddr, n, strings.Join(refs, ", "))
}
return podsMatchingIP[0], nil

return matchingPods[0], nil
}

func (p *Provider) Start(ctx context.Context) {
Expand Down
30 changes: 17 additions & 13 deletions internal/serviceaccounts/watch/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/matheuscscp/gke-metadata-server/internal/serviceaccounts"

"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
informersv1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -98,15 +97,13 @@ func NewProvider(ctx context.Context, opts ProviderOptions) *Provider {
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
numServiceAccounts.Inc()
sa := obj.(*corev1.ServiceAccount)
for _, l := range p.listeners {
l.UpdateServiceAccount(sa)
l.UpdateServiceAccount(obj.(*corev1.ServiceAccount))
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
sa := newObj.(*corev1.ServiceAccount)
for _, l := range p.listeners {
l.UpdateServiceAccount(sa)
l.UpdateServiceAccount(newObj.(*corev1.ServiceAccount))
}
},
DeleteFunc: func(obj interface{}) {
Expand All @@ -118,22 +115,29 @@ func NewProvider(ctx context.Context, opts ProviderOptions) *Provider {
}

func (p *Provider) Get(ctx context.Context, namespace, name string) (*corev1.ServiceAccount, error) {
sa, err := p.get(ctx, namespace, name)
sa, err := p.get(namespace, name)
if err == nil {
return sa, nil
}
if p.opts.FallbackSource == nil {
return nil, fmt.Errorf("error getting service account '%s/%s' from cache: %w", namespace, name, err)
return nil, fmt.Errorf("error getting service account %s/%s from cache: %w", namespace, name, err)
}

logging.
FromContext(ctx).
WithError(err).
WithField("service_account", fmt.Sprintf("%s/%s", namespace, name)).
Error("error getting service account from cache, delegating request to fallback source")

sa, err = p.opts.FallbackSource.Get(ctx, namespace, name)
if err != nil {
return nil, err
}
p.cacheMisses.Inc()
logging.FromContext(ctx).WithError(err).WithField("service_account", logrus.Fields{
"namespace": namespace,
"name": name,
}).Error("error getting service account from cache, delegating request to fallback source")
return p.opts.FallbackSource.Get(ctx, namespace, name)
return sa, nil
}

func (p *Provider) get(ctx context.Context, namespace, name string) (*corev1.ServiceAccount, error) {
func (p *Provider) get(namespace, name string) (*corev1.ServiceAccount, error) {
v, ok, err := p.informer.GetStore().GetByKey(fmt.Sprintf("%s/%s", namespace, name))
if err != nil {
return nil, err
Expand Down

0 comments on commit 541a823

Please sign in to comment.