From 9c0c1aa3e096efd156109b3db5fe40a3ebbe7035 Mon Sep 17 00:00:00 2001 From: Daichi Sakaue Date: Mon, 9 Dec 2024 17:16:26 +0900 Subject: [PATCH] Move proxy-related helper functions to a dedicated file Signed-off-by: Daichi Sakaue --- cmd/npv/app/helper.go | 42 ------------ cmd/npv/app/helper_proxy.go | 123 ++++++++++++++++++++++++++++++++++++ cmd/npv/app/inspect.go | 69 +------------------- 3 files changed, 124 insertions(+), 110 deletions(-) create mode 100644 cmd/npv/app/helper_proxy.go diff --git a/cmd/npv/app/helper.go b/cmd/npv/app/helper.go index e0025ac..dc53b57 100644 --- a/cmd/npv/app/helper.go +++ b/cmd/npv/app/helper.go @@ -45,48 +45,6 @@ func createK8sClients() (*kubernetes.Clientset, *dynamic.DynamicClient, error) { return clientset, dynamicClient, nil } -func createCiliumClient(ctx context.Context, clientset *kubernetes.Clientset, namespace, name string) (*client.Client, error) { - endpoint, err := getProxyEndpoint(ctx, clientset, namespace, name) - if err != nil { - return nil, err - } - - if cached, ok := cachedCiliumClients[endpoint]; ok { - return cached, nil - } - - ciliumClient, err := client.NewClient(endpoint) - if err != nil { - return nil, err - } - cachedCiliumClients[endpoint] = ciliumClient - - return ciliumClient, err -} - -func getProxyEndpoint(ctx context.Context, c *kubernetes.Clientset, namespace, name string) (string, error) { - targetPod, err := c.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{}) - if err != nil { - return "", err - } - targetNode := targetPod.Spec.NodeName - - pods, err := c.CoreV1().Pods("kube-system").List(ctx, metav1.ListOptions{ - FieldSelector: "spec.nodeName=" + targetNode, - LabelSelector: rootOptions.proxySelector, - }) - if err != nil { - return "", err - } - if num := len(pods.Items); num != 1 { - err := fmt.Errorf("failed to find cilium-agent-proxy. found %d pods", num) - return "", err - } - - podIP := pods.Items[0].Status.PodIP - return fmt.Sprintf("http://%s:%d", podIP, rootOptions.proxyPort), nil -} - func getPodEndpointID(ctx context.Context, d *dynamic.DynamicClient, namespace, name string) (int64, error) { ep, err := d.Resource(gvrEndpoint).Namespace(namespace).Get(ctx, name, metav1.GetOptions{}) if err != nil { diff --git a/cmd/npv/app/helper_proxy.go b/cmd/npv/app/helper_proxy.go new file mode 100644 index 0000000..444412b --- /dev/null +++ b/cmd/npv/app/helper_proxy.go @@ -0,0 +1,123 @@ +package app + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + + "github.com/cilium/cilium/pkg/client" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" +) + +func getProxyEndpoint(ctx context.Context, c *kubernetes.Clientset, namespace, name string) (string, error) { + targetPod, err := c.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return "", err + } + targetNode := targetPod.Spec.NodeName + + pods, err := c.CoreV1().Pods("kube-system").List(ctx, metav1.ListOptions{ + FieldSelector: "spec.nodeName=" + targetNode, + LabelSelector: rootOptions.proxySelector, + }) + if err != nil { + return "", err + } + if num := len(pods.Items); num != 1 { + err := fmt.Errorf("failed to find cilium-agent-proxy. found %d pods", num) + return "", err + } + + podIP := pods.Items[0].Status.PodIP + return fmt.Sprintf("http://%s:%d", podIP, rootOptions.proxyPort), nil +} + +func createCiliumClient(ctx context.Context, clientset *kubernetes.Clientset, namespace, name string) (*client.Client, error) { + endpoint, err := getProxyEndpoint(ctx, clientset, namespace, name) + if err != nil { + return nil, err + } + + if cached, ok := cachedCiliumClients[endpoint]; ok { + return cached, nil + } + + ciliumClient, err := client.NewClient(endpoint) + if err != nil { + return nil, err + } + cachedCiliumClients[endpoint] = ciliumClient + + return ciliumClient, err +} + +type policyEntryKey struct { + Identity int `json:"Identity"` + Direction int `json:"TrafficDirection"` + Protocol int `json:"Nexthdr"` + BigPort int `json:"DestPortNetwork"` // big endian +} + +func (p policyEntryKey) Port() int { + return ((p.BigPort & 0xFF) << 8) + ((p.BigPort & 0xFF00) >> 8) +} + +// For the meanings of the flags, see: +// https://github.com/cilium/cilium/blob/v1.16.3/bpf/lib/common.h#L394 +type policyEntry struct { + Flags int `json:"Flags"` + Packets int `json:"Packets"` + Bytes int `json:"Bytes"` + Key policyEntryKey `json:"Key"` +} + +func (p policyEntry) IsDenyRule() bool { + return (p.Flags & 1) > 0 +} + +func (p policyEntry) IsEgressRule() bool { + return p.Key.Direction > 0 +} + +func (p policyEntry) IsWildcardProtocol() bool { + return (p.Flags & 2) > 0 +} + +func (p policyEntry) IsWildcardPort() bool { + return (p.Flags & 4) > 0 +} + +func queryPolicyMap(ctx context.Context, clientset *kubernetes.Clientset, dynamicClient *dynamic.DynamicClient, namespace, name string) ([]policyEntry, error) { + endpointID, err := getPodEndpointID(ctx, dynamicClient, namespace, name) + if err != nil { + return nil, fmt.Errorf("failed to get pod endpoint ID: %w", err) + } + + url, err := getProxyEndpoint(ctx, clientset, namespace, name) + if err != nil { + return nil, fmt.Errorf("failed to get proxy endpoint: %w", err) + } + + url = fmt.Sprintf("%s/policy/%d", url, endpointID) + resp, err := http.Get(url) + if err != nil { + return nil, fmt.Errorf("failed to request policy: %w", err) + } + defer resp.Body.Close() + + data, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response: %w", err) + } + + policies := make([]policyEntry, 0) + if err = json.Unmarshal(data, &policies); err != nil { + return nil, fmt.Errorf("failed to unmarshal response: %w", err) + } + + return policies, nil +} diff --git a/cmd/npv/app/inspect.go b/cmd/npv/app/inspect.go index 1d8253f..a8caf52 100644 --- a/cmd/npv/app/inspect.go +++ b/cmd/npv/app/inspect.go @@ -2,10 +2,8 @@ package app import ( "context" - "encoding/json" "fmt" "io" - "net/http" "slices" "strconv" "strings" @@ -16,8 +14,6 @@ import ( "github.com/spf13/cobra" "golang.org/x/exp/rand" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/kubernetes" ) func init() { @@ -36,38 +32,6 @@ var inspectCmd = &cobra.Command{ ValidArgsFunction: completePods, } -type policyEntryKey struct { - Identity int `json:"Identity"` - Direction int `json:"TrafficDirection"` - Protocol int `json:"Nexthdr"` - BigPort int `json:"DestPortNetwork"` // big endian -} - -// For the meanings of the flags, see: -// https://github.com/cilium/cilium/blob/v1.16.3/bpf/lib/common.h#L394 -type policyEntry struct { - Flags int `json:"Flags"` - Packets int `json:"Packets"` - Bytes int `json:"Bytes"` - Key policyEntryKey `json:"Key"` -} - -func (p policyEntry) IsDenyRule() bool { - return (p.Flags & 1) > 0 -} - -func (p policyEntry) IsEgressRule() bool { - return p.Key.Direction > 0 -} - -func (p policyEntry) IsWildcardProtocol() bool { - return (p.Flags & 2) > 0 -} - -func (p policyEntry) IsWildcardPort() bool { - return (p.Flags & 4) > 0 -} - // This command aims to show the result of "cilium bpf policy get" from a remote pod. // https://github.com/cilium/cilium/blob/v1.16.3/cilium-dbg/cmd/bpf_policy_get.go type inspectEntry struct { @@ -84,37 +48,6 @@ type inspectEntry struct { Packets int `json:"packets"` } -func queryPolicyMap(ctx context.Context, clientset *kubernetes.Clientset, dynamicClient *dynamic.DynamicClient, namespace, name string) ([]policyEntry, error) { - endpointID, err := getPodEndpointID(ctx, dynamicClient, namespace, name) - if err != nil { - return nil, fmt.Errorf("failed to get pod endpoint ID: %w", err) - } - - url, err := getProxyEndpoint(ctx, clientset, namespace, name) - if err != nil { - return nil, fmt.Errorf("failed to get proxy endpoint: %w", err) - } - - url = fmt.Sprintf("%s/policy/%d", url, endpointID) - resp, err := http.Get(url) - if err != nil { - return nil, fmt.Errorf("failed to request policy: %w", err) - } - defer resp.Body.Close() - - data, err := io.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("failed to read response: %w", err) - } - - policies := make([]policyEntry, 0) - if err = json.Unmarshal(data, &policies); err != nil { - return nil, fmt.Errorf("failed to unmarshal response: %w", err) - } - - return policies, nil -} - func runInspect(ctx context.Context, w io.Writer, name string) error { clientset, dynamicClient, err := createK8sClients() if err != nil { @@ -204,7 +137,7 @@ func runInspect(ctx context.Context, w io.Writer, name string) error { entry.WildcardProtocol = p.IsWildcardProtocol() entry.WildcardPort = p.IsWildcardPort() entry.Protocol = p.Key.Protocol - entry.Port = ((p.Key.BigPort & 0xFF) << 8) + ((p.Key.BigPort & 0xFF00) >> 8) + entry.Port = p.Key.Port() entry.Bytes = p.Bytes entry.Packets = p.Packets arr[i] = entry