Skip to content

Commit

Permalink
Implement inspect and summary commands
Browse files Browse the repository at this point in the history
Signed-off-by: Daichi Sakaue <[email protected]>
  • Loading branch information
yokaze committed Oct 24, 2024
1 parent 940c56a commit 0b8979e
Show file tree
Hide file tree
Showing 14 changed files with 684 additions and 85 deletions.
2 changes: 1 addition & 1 deletion cmd/npv/app/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var dumpCmd = &cobra.Command{
}

func runDump(ctx context.Context, w io.Writer, name string) error {
clientset, dynamicClient, _, err := createClients(ctx, name)
clientset, dynamicClient, err := createK8sClients()
if err != nil {
return err
}
Expand Down
122 changes: 113 additions & 9 deletions cmd/npv/app/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"strconv"
"strings"

"github.com/cilium/cilium/pkg/client"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -14,34 +18,56 @@ import (
"k8s.io/client-go/rest"
)

func createClients(ctx context.Context, name string) (*kubernetes.Clientset, *dynamic.DynamicClient, *client.Client, error) {
const (
directionEgress = "Egress"
directionIngress = "Ingress"

policyAllow = "Allow"
policyDeny = "Deny"
)

var cachedCiliumClients map[string]*client.Client

func init() {
cachedCiliumClients = make(map[string]*client.Client)
}

func createK8sClients() (*kubernetes.Clientset, *dynamic.DynamicClient, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}

// Create Kubernetes Clients
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}

dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}

// Create Cilium Client
return clientset, dynamicClient, nil
}

func createCiliumClient(ctx context.Context, clientset *kubernetes.Clientset, name string) (*client.Client, error) {
endpoint, err := getProxyEndpoint(ctx, clientset, rootOptions.namespace, name)
if err != nil {
return nil, nil, nil, err
return nil, err
}

if cached, ok := cachedCiliumClients[endpoint]; ok {
return cached, nil
}

ciliumClient, err := client.NewClient(endpoint)
if err != nil {
return nil, nil, nil, err
return nil, err
}
cachedCiliumClients[endpoint] = ciliumClient

return clientset, dynamicClient, ciliumClient, err
return ciliumClient, err
}

func getProxyEndpoint(ctx context.Context, c *kubernetes.Clientset, namespace, name string) (string, error) {
Expand Down Expand Up @@ -89,3 +115,81 @@ func getPodEndpointID(ctx context.Context, d *dynamic.DynamicClient, namespace,

return endpointID, nil
}

func getIdentityMap(ctx context.Context, d *dynamic.DynamicClient) (map[int]*unstructured.Unstructured, error) {
gvr := schema.GroupVersionResource{
Group: "cilium.io",
Version: "v2",
Resource: "ciliumidentities",
}
li, err := d.Resource(gvr).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}

ret := make(map[int]*unstructured.Unstructured)
for _, item := range li.Items {
id, err := strconv.Atoi(item.GetName())
if err != nil {
return nil, err
}
ret[id] = &item
}
return ret, nil
}

func getIdentityExampleMap(ctx context.Context, d *dynamic.DynamicClient) (map[int]string, error) {
gvr := schema.GroupVersionResource{
Group: "cilium.io",
Version: "v2",
Resource: "ciliumendpoints",
}

li, err := d.Resource(gvr).Namespace(corev1.NamespaceAll).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}

ret := make(map[int]string)
for _, ep := range li.Items {
identity, ok, err := unstructured.NestedInt64(ep.Object, "status", "identity", "id")
if err != nil {
return nil, err
}
if !ok {
continue
}
if _, ok := ret[int(identity)]; ok {
ret[int(identity)] += "," + ep.GetName()
} else {
ret[int(identity)] = ep.GetName()
}
}
for k, v := range ret {
if strings.Contains(v, ",") {
samples := strings.Split(v, ",")
i := rand.Intn(len(samples))
ret[k] = samples[i]
}
}
return ret, nil
}

func findPodWithPrefix(ctx context.Context, clientset *kubernetes.Clientset, namespace, name string) (string, error) {
pods, err := clientset.CoreV1().Pods(rootOptions.namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return "", nil
}
found := false
prefix := name
for _, p := range pods.Items {
if strings.HasPrefix(p.GetName(), prefix) {
if found {
return "", errors.New("multiple pods found for the prefix: " + prefix)
}
found = true
name = p.GetName()
}
}
return name, nil
}
226 changes: 226 additions & 0 deletions cmd/npv/app/inspect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
package app

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"text/tabwriter"

"github.com/cilium/cilium/pkg/identity"
"github.com/cilium/cilium/pkg/u8proto"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
)

var inspectOptions struct {
prefix bool
}

func init() {
inspectCmd.Flags().BoolVarP(&inspectOptions.prefix, "prefix", "p", false, "find pod with specified prefix")
rootCmd.AddCommand(inspectCmd)
}

var inspectCmd = &cobra.Command{
Use: "inspect",
Short: "Inspect network policies applied to a pod",
Long: `Inspect network policies applied to a pod`,

Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
return runInspect(context.Background(), cmd.OutOrStdout(), args[0])
},
}

type policyEntryKey struct {
Identity int `json:"Identity"`
Direction int `json:"TrafficDirection"`
Protocol int `json:"Nexthdr"`
BigPort int `json:"DestPortNetwork"` // big endian
}

// For the meanings of the flags, see:
// https://github.com/cilium/cilium/blob/v1.16.3/bpf/lib/common.h#L394
type policyEntry struct {
Flags int `json:"Flags"`
Packets int `json:"Packets"`
Bytes int `json:"Bytes"`
Key policyEntryKey `json:"Key"`
}

func (p policyEntry) IsDenyRule() bool {
return (p.Flags & 1) > 0
}

func (p policyEntry) IsEgressRule() bool {
return p.Key.Direction > 0
}

func (p policyEntry) IsWildcardProtocol() bool {
return (p.Flags & 2) > 0
}

func (p policyEntry) IsWildcardPort() bool {
return (p.Flags & 4) > 0
}

// This command aims to show the result of "cilium bpf policy get" from a remote pod.
// https://github.com/cilium/cilium/blob/v1.16.3/cilium-dbg/cmd/bpf_policy_get.go
type inspectEntry struct {
Policy string `json:"policy"`
Direction string `json:"direction"`
Namespace string `json:"namespace"`
Example string `json:"example"`
Identity int `json:"identity"`
WildcardProtocol bool `json:"wildcard_protocol"`
WildcardPort bool `json:"wildcard_port"`
Protocol int `json:"protocol"`
Port int `json:"port"`
Bytes int `json:"bytes"`
Packets int `json:"packets"`
}

func queryPolicyMap(ctx context.Context, clientset *kubernetes.Clientset, dynamicClient *dynamic.DynamicClient, namespace, name string) ([]policyEntry, error) {
endpointID, err := getPodEndpointID(ctx, dynamicClient, namespace, name)
if err != nil {
return nil, fmt.Errorf("failed to get pod endpoint ID: %w", err)
}

url, err := getProxyEndpoint(ctx, clientset, namespace, name)
if err != nil {
return nil, fmt.Errorf("failed to get proxy endpoint: %w", err)
}

url = fmt.Sprintf("%s/policy/%d", url, endpointID)
resp, err := http.Get(url)
if err != nil {
return nil, fmt.Errorf("failed to request policy: %w", err)
}
defer resp.Body.Close()

data, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response: %w", err)
}

policies := make([]policyEntry, 0)
if err = json.Unmarshal(data, &policies); err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %w", err)
}

return policies, nil
}

func runInspect(ctx context.Context, w io.Writer, name string) error {
clientset, dynamicClient, err := createK8sClients()
if err != nil {
return err
}

if inspectOptions.prefix {
newName, err := findPodWithPrefix(ctx, clientset, rootOptions.namespace, name)
if err != nil {
return err
}
name = newName
}

policies, err := queryPolicyMap(ctx, clientset, dynamicClient, rootOptions.namespace, name)
if err != nil {
return err
}

ids, err := getIdentityMap(ctx, dynamicClient)
if err != nil {
return err
}

examples, err := getIdentityExampleMap(ctx, dynamicClient)
if err != nil {
return err
}

arr := make([]inspectEntry, len(policies))
for i, policy := range policies {
var entry inspectEntry
if policy.IsDenyRule() {
entry.Policy = policyDeny
} else {
entry.Policy = policyAllow
}
if policy.IsEgressRule() {
entry.Direction = directionEgress
} else {
entry.Direction = directionIngress
}
entry.Namespace = "-"
if id, ok := ids[policy.Key.Identity]; ok {
ns, ok, err := unstructured.NestedString(id.Object, "security-labels", "k8s:io.kubernetes.pod.namespace")
if err != nil {
return err
}
if ok {
entry.Namespace = ns
}
}
if v, ok := examples[policy.Key.Identity]; ok {
entry.Example = v
} else {
idObj := identity.NumericIdentity(policy.Key.Identity)
if idObj.IsReservedIdentity() {
entry.Example = "reserved:" + idObj.String()
} else {
entry.Example = "-"
}
}
entry.Identity = policy.Key.Identity
entry.WildcardProtocol = policy.IsWildcardProtocol()
entry.WildcardPort = policy.IsWildcardPort()
entry.Protocol = policy.Key.Protocol
entry.Port = ((policy.Key.BigPort & 0xFF) << 8) + ((policy.Key.BigPort & 0xFF00) >> 8)
entry.Bytes = policy.Bytes
entry.Packets = policy.Packets
arr[i] = entry
}

switch rootOptions.output {
case OutputJson:
text, err := json.MarshalIndent(arr, "", " ")
if err != nil {
return err
}
_, err = w.Write(text)
return err
case OutputSimple:
tw := tabwriter.NewWriter(w, 0, 1, 1, ' ', 0)
if !rootOptions.noHeaders {
if _, err := tw.Write([]byte("POLICY\tDIRECTION\tIDENTITY\tNAMESPACE\tEXAMPLE\tPROTOCOL\tPORT\tBYTES\tPACKETS\n")); err != nil {
return err
}
}
for _, p := range arr {
var protocol, port string
if p.WildcardProtocol {
protocol = "ANY"
} else {
protocol = u8proto.U8proto(p.Protocol).String()
}
if p.WildcardPort {
port = "ANY"
} else {
port = strconv.Itoa(p.Port)
}
if _, err := tw.Write([]byte(fmt.Sprintf("%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\n", p.Policy, p.Direction, p.Identity, p.Namespace, p.Example, protocol, port, p.Bytes, p.Packets))); err != nil {
return err
}
}
return tw.Flush()
default:
return fmt.Errorf("unknown format: %s", rootOptions.output)
}
}
Loading

0 comments on commit 0b8979e

Please sign in to comment.