Skip to content

Commit

Permalink
containerInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
orishavit committed Jul 17, 2024
1 parent 66f8e82 commit b751a18
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 99 deletions.
4 changes: 4 additions & 0 deletions src/node-agent/cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"github.com/otterize/network-mapper/src/node-agent/pkg/container"
"github.com/otterize/network-mapper/src/node-agent/pkg/service"
"github.com/sirupsen/logrus"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -13,11 +14,14 @@ func main() {
mgr, client := service.CreateControllerRuntimeComponentsOrDie()

bpfmanClient := service.ConnectToBpfmanOrDie(signalHandlerCtx)
criClient := service.CreateCRIClientOrDie()
containerManager := container.NewContainerManager(criClient)

service.RegisterReconcilersOrDie(
mgr,
client,
bpfmanClient,
containerManager,
)

if err := mgr.Start(signalHandlerCtx); err != nil {
Expand Down
19 changes: 19 additions & 0 deletions src/node-agent/pkg/container/containerinfo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package container

type ContainerInfo interface {
GetID() string
GetPID() int32
}

type criContainerInfo struct {
Id string
Pid int32 `json:"pid"`
}

func (c criContainerInfo) GetID() string {
return c.Id
}

func (c criContainerInfo) GetPID() int32 {
return c.Pid
}
57 changes: 57 additions & 0 deletions src/node-agent/pkg/container/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package container

import (
"context"
"encoding/json"
"github.com/otterize/intents-operator/src/shared/errors"
"github.com/sirupsen/logrus"
internalapi "k8s.io/cri-api/pkg/apis"
"strings"
)

type ContainerManager struct {
criClient internalapi.RuntimeService
}

func NewContainerManager(criClient internalapi.RuntimeService) *ContainerManager {
return &ContainerManager{
criClient: criClient,
}
}

func (m *ContainerManager) GetContainerInfo(ctx context.Context, containerID string) (ContainerInfo, error) {
containerType, containerId, found := strings.Cut(containerID, "://")

if !found {
return nil, errors.Errorf("Failed to parse container ID: %s", containerID)
}

logrus.WithField("containerType", containerType).
WithField("containerId", containerId).
Debug("Getting container info")

resp, err := m.criClient.ContainerStatus(ctx, containerId, true)

if err != nil {
return nil, errors.Wrap(err)
}

if resp.Info == nil {
return nil, errors.Errorf("invalid container info for %s", containerId)
}

if _, ok := resp.Info["info"]; !ok {
return nil, errors.Errorf("invalid container info for %s", containerId)
}

var info criContainerInfo
err = json.Unmarshal([]byte(resp.Info["info"]), &info)

if err != nil {
return nil, errors.Wrap(err)
}

info.Id = resp.Status.Id

return info, nil
}
3 changes: 3 additions & 0 deletions src/node-agent/pkg/labels/labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package labels

const EBPFVisibilityLabelKey = "network-mapper.otterize.com/ebpf-visibility"
151 changes: 56 additions & 95 deletions src/node-agent/pkg/reconcilers/ebpfreconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,19 @@ package reconcilers

import (
"context"
"encoding/json"
bpfmanclient "github.com/bpfman/bpfman/clients/gobpfman/v1"
"github.com/otterize/intents-operator/src/shared/errors"
cri "github.com/otterize/network-mapper/src/shared/criclient"
"github.com/otterize/network-mapper/src/node-agent/pkg/container"
"github.com/otterize/network-mapper/src/node-agent/pkg/labels"
"github.com/otterize/network-mapper/src/shared/kubeutils"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"strings"
"time"
)

const (
Expand All @@ -28,22 +26,34 @@ const (
)

type EBPFReconciler struct {
client client.Client
bpfmanClient bpfmanclient.BpfmanClient
client client.Client
bpfmanClient bpfmanclient.BpfmanClient
containersManager *container.ContainerManager
}

func NewEBPFReconciler(client client.Client, bpfmanClient *bpfmanclient.BpfmanClient) *EBPFReconciler {
func NewEBPFReconciler(
client client.Client,
bpfmanClient bpfmanclient.BpfmanClient,
containerManager *container.ContainerManager,
) *EBPFReconciler {
return &EBPFReconciler{
client: client,
bpfmanClient: *bpfmanClient,
client: client,
bpfmanClient: bpfmanClient,
containersManager: containerManager,
}
}

func (r *EBPFReconciler) SetupWithManager(mgr manager.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Watches(&corev1.Pod{}, &handler.EnqueueRequestForObject{}).
Named("ebpf-reconciler").
Complete(r)
}

func (r *EBPFReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
logrus.WithContext(ctx).
logger := logrus.WithContext(ctx).
WithField("namespace", req.Namespace).
WithField("name", req.Name).
Info("Reconciling EBPF")
WithField("podName", req.Name)

pod := corev1.Pod{}

Expand All @@ -56,106 +66,57 @@ func (r *EBPFReconciler) Reconcile(ctx context.Context, req reconcile.Request) (
}

if pod.Status.Phase != corev1.PodRunning {
logger.Debug("Pod is not running, skipping")
return reconcile.Result{}, nil
}

_, hasEBPFLabel := pod.Labels["ebpf"]

if !hasEBPFLabel {
if !kubeutils.IsEnabledByLabel(pod.Labels, labels.EBPFVisibilityLabelKey) {
return reconcile.Result{}, nil
}

for _, container := range pod.Status.ContainerStatuses {
_ = r.loadBpfProgramToContainer(ctx, &container)
containerInfo, err := r.containersManager.GetContainerInfo(ctx, container.ContainerID)

if err != nil {
return reconcile.Result{}, errors.Wrap(err)
}
_ = r.loadBpfProgramToContainer(ctx, containerInfo)
}

return reconcile.Result{}, nil
}

func (r *EBPFReconciler) SetupWithManager(mgr manager.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Watches(&corev1.Pod{}, &handler.EnqueueRequestForObject{}).
Named("ebpf-reconciler").
Complete(r)
}

func (r *EBPFReconciler) loadBpfProgramToContainer(ctx context.Context, container *corev1.ContainerStatus) error {
logger := klog.Background()

// TODO: move to main
criClient, err := cri.NewRemoteRuntimeService(
"unix:///var/run/containerd/containerd.sock",
time.Second*5,
&logger,
func (r *EBPFReconciler) loadBpfProgramToContainer(ctx context.Context, containerInfo container.ContainerInfo) error {
fnName := "SSL_write"
pid := containerInfo.GetPID()

_, err := r.bpfmanClient.Load(
ctx,
&bpfmanclient.LoadRequest{
Name: "uprobe_counter",
ProgramType: Kprobe,
Attach: &bpfmanclient.AttachInfo{
Info: &bpfmanclient.AttachInfo_UprobeAttachInfo{
UprobeAttachInfo: &bpfmanclient.UprobeAttachInfo{
FnName: &fnName,
Target: "libssl",
ContainerPid: &pid,
},
},
},
Bytecode: &bpfmanclient.BytecodeLocation{
Location: &bpfmanclient.BytecodeLocation_File{
File: "/otterize/ebpf/uprobe-counter/bpf_x86_bpfel.o",
},
},
},
)

if err != nil {
return errors.Wrap(err)
}

// form is containerd://<container-id>
_, containerId, found := strings.Cut(container.ContainerID, "://")

if !found {
return errors.Errorf("Failed to parse container ID: %s", container.ContainerID)
}

criResp, err := criClient.ContainerStatus(ctx, containerId, true)

if err != nil {
return errors.Wrap(err)
}

containerInfo, found := criResp.Info["info"]

if !found {
return errors.Errorf("Failed to get container info: %s", containerId)
}

type ContainerInfo struct {
Pid int32 `json:"pid"`
}

var containerInfoStruct ContainerInfo
err = json.Unmarshal([]byte(containerInfo), &containerInfoStruct)

if err != nil {
logrus.WithError(err).Error("Failed to unmarshal container info")
return err
}

logrus.WithField("pid", containerInfoStruct.Pid).WithError(err).Info("Container PID")

//fnName := "SSL_write"
//resp, err := r.bpfmanClient.Load(
// ctx,
// &bpfmanclient.LoadRequest{
// Name: "uprobe_counter",
// ProgramType: Kprobe,
// Attach: &bpfmanclient.AttachInfo{
// Info: &bpfmanclient.AttachInfo_UprobeAttachInfo{
// UprobeAttachInfo: &bpfmanclient.UprobeAttachInfo{
// FnName: &fnName,
// Target: "libssl",
// ContainerPid: &containerInfoStruct.Pid,
// },
// },
// },
// Bytecode: &bpfmanclient.BytecodeLocation{
// Location: &bpfmanclient.BytecodeLocation_File{
// File: "/otterize/ebpf/uprobe-counter/bpf_x86_bpfel.o",
// },
// },
// },
//)

err = nil

if err != nil {
return errors.Wrap(err)
}

logrus.WithField("containerId", containerId).Info("Loaded program")
logrus.WithField("containerId", containerInfo.GetID()).Info("Loaded program")

return nil
}
25 changes: 25 additions & 0 deletions src/node-agent/pkg/service/cri.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package service

import (
cri "github.com/otterize/network-mapper/src/shared/criclient"
"github.com/sirupsen/logrus"
internalapi "k8s.io/cri-api/pkg/apis"
"k8s.io/klog/v2"
"time"
)

func CreateCRIClientOrDie() internalapi.RuntimeService {
logger := klog.Background()

criClient, err := cri.NewRemoteRuntimeService(
"unix:///var/run/containerd/containerd.sock",
time.Second*5,
&logger,
)

if err != nil {
logrus.WithError(err).Panic("failed to create CRI client")
}

return criClient
}
4 changes: 2 additions & 2 deletions src/node-agent/pkg/service/ebpfagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

const socketPath = "unix:///run/bpfman-sock/bpfman.sock"

func ConnectToBpfmanOrDie(ctx context.Context) *bpfmanclient.BpfmanClient {
func ConnectToBpfmanOrDie(ctx context.Context) bpfmanclient.BpfmanClient {
conn, err := grpc.NewClient(socketPath, grpc.WithTransportCredentials(insecure.NewCredentials()))

if err != nil {
Expand All @@ -19,5 +19,5 @@ func ConnectToBpfmanOrDie(ctx context.Context) *bpfmanclient.BpfmanClient {

client := bpfmanclient.NewBpfmanClient(conn)

return &client
return client
}
10 changes: 8 additions & 2 deletions src/node-agent/pkg/service/reconcilers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service

import (
bpfmanclient "github.com/bpfman/bpfman/clients/gobpfman/v1"
"github.com/otterize/network-mapper/src/node-agent/pkg/container"
"github.com/otterize/network-mapper/src/node-agent/pkg/reconcilers"
"github.com/sirupsen/logrus"
"reflect"
Expand All @@ -12,10 +13,15 @@ import (
func RegisterReconcilersOrDie(
mgr manager.Manager,
client crtClient.Client,
bpfmanClient *bpfmanclient.BpfmanClient,
bpfmanClient bpfmanclient.BpfmanClient,
containerManager *container.ContainerManager,
) {
reconcilersToRegister := []reconcilers.Reconciler{
reconcilers.NewEBPFReconciler(client, bpfmanClient),
reconcilers.NewEBPFReconciler(
client,
bpfmanClient,
containerManager,
),
}

for _, r := range reconcilersToRegister {
Expand Down
15 changes: 15 additions & 0 deletions src/shared/kubeutils/labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package kubeutils

func IsEnabledByLabel(labels map[string]string, labelKey string) bool {
if labels == nil {
return false
}

labelValue, ok := labels[labelKey]

if !ok {
return false
}

return labelValue == "true"
}

0 comments on commit b751a18

Please sign in to comment.