Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Addition to anupam tarian detector integration 2 #76

Merged
merged 8 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/tarianctl/cmd/install/defaultValues.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func natsHelmDefaultValues(natsValuesFile string) error {
},
FileStorage: StorageOpts{
Enabled: true,
Size: "50Mi",
Size: "200Mi",
},
},
},
Expand Down
4 changes: 2 additions & 2 deletions cmd/tarianctl/cmd/install/defaultValues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func TestNatsHelmDefaultValues(t *testing.T) {
t.Errorf("natsValuesFile does not contain 'size: 128Mi' in MemStorage")
}

if !containsSubstring(string(yamlContent), "size: 50Mi") {
t.Errorf("natsValuesFile does not contain 'size: 50Mi' in FileStorage")
if !containsSubstring(string(yamlContent), "size: 200Mi") {
t.Errorf("natsValuesFile does not contain 'size: 200Mi' in FileStorage")
}
}

Expand Down
2 changes: 1 addition & 1 deletion dev/nats-helm-values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ nats:

fileStorage:
enabled: true
size: 50Mi
size: 200Mi
31 changes: 14 additions & 17 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ require (
google.golang.org/grpc v1.58.3
google.golang.org/protobuf v1.33.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.29.2
k8s.io/apimachinery v0.29.2
k8s.io/client-go v0.29.2
k8s.io/api v0.29.3
k8s.io/apimachinery v0.29.3
k8s.io/client-go v0.29.3
sigs.k8s.io/controller-runtime v0.17.2
)

Expand Down Expand Up @@ -61,28 +61,27 @@ require (
github.com/spf13/pflag v1.0.5
go.mongodb.org/mongo-driver v1.10.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/net v0.23.0
golang.org/x/oauth2 v0.18.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/net v0.24.0
golang.org/x/oauth2 v0.19.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/term v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0
k8s.io/apiextensions-apiserver v0.29.0 // indirect
k8s.io/component-base v0.29.0 // indirect
k8s.io/klog/v2 v2.120.1 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
k8s.io/kube-openapi v0.0.0-20240411171206-dc4e619f62f3 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)

require (
github.com/cilium/ebpf v0.13.2
github.com/cilium/ebpf v0.14.0
github.com/dgraph-io/dgo/v210 v210.0.0-20220113041351-ba0e5dfc4c3e
github.com/intelops/tarian-detector v0.0.0-20240226164335-7701e4e67daa
github.com/intelops/tarian-detector v0.0.0-20240412160221-7dcb666b7f54
github.com/nats-io/nats.go v1.22.1
github.com/sethvargo/go-retry v0.2.4
github.com/sirupsen/logrus v1.9.3
Expand All @@ -99,12 +98,12 @@ require (
github.com/moby/spdystream v0.2.0 // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 // indirect
golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
)

require (
github.com/emicklei/go-restful/v3 v3.11.3 // indirect
github.com/emicklei/go-restful/v3 v3.12.0 // indirect
github.com/evanphx/json-patch/v5 v5.8.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
Expand All @@ -114,10 +113,8 @@ require (
github.com/satori/go.uuid v1.2.0
go.opentelemetry.io/otel v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.19.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/sync v0.5.0
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/sync v0.7.0
k8s.io/utils v0.0.0-20240310230437-4693a0247e57 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
)

replace github.com/intelops/tarian-detector => github.com/andylibrian/tarian-detector v0.0.0-20240314095358-bd4d5419e74a
71 changes: 30 additions & 41 deletions go.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pkg/clusteragent/action_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func newActionHandler(logger *logrus.Logger, tarianServerAddress string, opts []
// QueueEvent queues a Tarian event for processing.
// It logs the queued event and adds it to the events channel.
func (ah *actionHandler) QueueEvent(event *tarianpb.Event) {
ah.logger.WithField("event", event).Debug("event queued")
ah.logger.WithField("event", event).Trace("event queued")
ah.eventsChan <- event
}

Expand All @@ -85,7 +85,7 @@ func (ah *actionHandler) Run() {
// ProcessActions processes Tarian actions based on Tarian events.
// It checks if actions should be executed for each event target and invokes the appropriate action.
func (ah *actionHandler) ProcessActions(event *tarianpb.Event) {
ah.logger.WithField("event", event).Debug("event processed")
ah.logger.WithField("event", event).Trace("event processed")
if event.GetTargets() == nil {
return
}
Expand Down
82 changes: 60 additions & 22 deletions pkg/nodeagent/nodeagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ import (
"context"
"encoding/json"
"fmt"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/intelops/tarian-detector/pkg/detector"
ebpf "github.com/intelops/tarian-detector/pkg/eBPF"
"github.com/intelops/tarian-detector/pkg/eventparser"
"github.com/intelops/tarian-detector/tarian"
"github.com/kube-tarian/tarian/pkg/tarianpb"
"github.com/scylladb/go-set/strset"
Expand Down Expand Up @@ -190,7 +193,7 @@ func (n *NodeAgent) loopTarianDetectorReadEvents(ctx context.Context) error {
}
podWatcher.Start()

eventsDetector, err := n.setupEventsDetector()
eventsDetector, tarianDetector, tarianEbpfModule, err := n.setupEventsDetector()
if err != nil {
return err
}
Expand All @@ -203,6 +206,12 @@ func (n *NodeAgent) loopTarianDetectorReadEvents(ctx context.Context) error {
}
defer eventsDetector.Close()

// Attaches tarian module programs to the kernel
err = tarianEbpfModule.Attach(tarianDetector)
if err != nil {
return fmt.Errorf("error while attaching tarian-detector: %w", err)
}

for {
select {
case <-ctx.Done():
Expand All @@ -214,11 +223,11 @@ func (n *NodeAgent) loopTarianDetectorReadEvents(ctx context.Context) error {
continue
}

if event == nil {
if event.EventId == "" {
continue
}

pid := event["hostProcessId"].(uint32)
pid := event.HostProcessId

// Retrieve the container ID.
containerID, err := procsContainerID(pid)
Expand All @@ -230,17 +239,15 @@ func (n *NodeAgent) loopTarianDetectorReadEvents(ctx context.Context) error {
continue
}

detectionDataType := event.EventId

// Find the corresponding Kubernetes Pod.
pod := podWatcher.FindPod(containerID)
if pod == nil {
continue
}

// TODO: sys_execve_entry could be added here
// But for kubectl exec, the detected entry comm is still the wrapper: runc:init
// With sys_execve_exit, the comm is the target process
detectionDataType := event["eventId"].(string)
if detectionDataType == "sys_execve_exit" {
if detectionDataType == "sys_execve_entry" {
execEvent, err2 := n.execEventFromTarianDetector(event, containerID, pod)
if err2 != nil {
n.logger.WithField("err", err2).Error("tarian-detector: error while converting tarian-detector to execEvent")
Expand All @@ -252,6 +259,8 @@ func (n *NodeAgent) loopTarianDetectorReadEvents(ctx context.Context) error {
n.logger.WithField("err", err3).Error("node-agent: error while handling exec event")
}
}

n.logger.WithField("execEvent", execEvent).WithField("event", event).Info("handled exec event")
}

byteData, err := json.Marshal(event)
Expand All @@ -260,24 +269,35 @@ func (n *NodeAgent) loopTarianDetectorReadEvents(ctx context.Context) error {
continue
}

n.SendDetectionEventToClusterAgent(detectionDataType, string(byteData))
n.logger.WithField("binary_file_path", event["directory"]).WithField("hostProcessId", event["hostProcessId"]).
WithField("processId", event["processId"]).WithField("comm", event["processName"]).Info("tarian-detector: ", detectionDataType)
go n.SendDetectionEventToClusterAgent(detectionDataType, string(byteData))
}
}
}

func (n *NodeAgent) setupEventsDetector() (*detector.EventsDetector, error) {
func (n *NodeAgent) setupEventsDetector() (*detector.EventsDetector, *ebpf.Handler, *ebpf.Module, error) {
tarianEbpfModule, err := tarian.GetModule()
if err != nil {
n.logger.Errorf("error while get tarian-detector ebpf module: %v", err)
return nil, fmt.Errorf("error while get tarian-detector ebpf module: %w", err)
return nil, nil, nil, fmt.Errorf("error while getting tarian-detector ebpf module: %w", err)
}

// Temporarily skip read and write syscalls until tarian-detector
// is able to reduce the volume of events generated
for _, p := range tarianEbpfModule.GetPrograms() {
i, err := p.GetName().Info()
if err != nil {
continue
}

if strings.Contains(i.Name, "tdf_write") || strings.Contains(i.Name, "tdf_read") {
p.Disable()
}
}

tarianDetector, err := tarianEbpfModule.Prepare()
if err != nil {
n.logger.Errorf("error while prepare tarian-detector: %v", err)
return nil, fmt.Errorf("error while prepare tarian-detector: %w", err)
return nil, nil, nil, fmt.Errorf("error while preparing tarian-detector: %w", err)
}

// Instantiate event detectors
Expand All @@ -286,7 +306,13 @@ func (n *NodeAgent) setupEventsDetector() (*detector.EventsDetector, error) {
// Add ebpf programs to detectors
eventsDetector.Add(tarianDetector)

return eventsDetector, nil
// Attaches tarian module programs to the kernel
err = tarianEbpfModule.Attach(tarianDetector)
if err != nil {
return nil, nil, nil, fmt.Errorf("error while preparing tarian-detector: %w", err)
}

return eventsDetector, tarianDetector, tarianEbpfModule, nil
}

func (n *NodeAgent) setupPodWatcher() (*PodWatcher, error) {
Expand All @@ -309,8 +335,8 @@ func (n *NodeAgent) setupPodWatcher() (*PodWatcher, error) {
return watcher, nil
}

func (n *NodeAgent) execEventFromTarianDetector(bpfEvt map[string]any, containerID string, pod *corev1.Pod) (*ExecEvent, error) {
pid := bpfEvt["hostProcessId"].(uint32)
func (n *NodeAgent) execEventFromTarianDetector(bpfEvt eventparser.TarianDetectorEvent, containerID string, pod *corev1.Pod) (*ExecEvent, error) {
pid := bpfEvt.HostProcessId

var podName string
var podUID string
Expand All @@ -324,11 +350,23 @@ func (n *NodeAgent) execEventFromTarianDetector(bpfEvt map[string]any, container
podLabels = pod.GetLabels()
podAnnotations = pod.GetAnnotations()

execFileName := bpfEvt.Executable
for _, c := range bpfEvt.Context {
if c.Name == "filename" {
execFileName = c.Value
break
}
}

// Running on kubernetes, bpfEvt["processName"] contains `runc:[2:INIT]`
// So, we take the command from the executable filename instead.
command := filepath.Base(execFileName)

// Create an ExecEvent and send it to the events channel.
execEvent := &ExecEvent{
Pid: pid,
Filename: bpfEvt["directory"].(string) + "/" + bpfEvt["processName"].(string),
Command: bpfEvt["processName"].(string),
Filename: execFileName,
Command: command,
ContainerID: containerID,
K8sPodName: podName,
K8sPodUID: podUID,
Expand Down Expand Up @@ -371,10 +409,10 @@ func (n *NodeAgent) handleExecEvent(evt *ExecEvent) error {

if registerProcess {
n.logger.WithField("comm", evt).Debug("violated process detected, going to register")
n.RegisterViolationsAsNewConstraint(violation)
go n.RegisterViolationsAsNewConstraint(violation)
} else {
n.logger.WithField("comm", evt).Debug("violated process detected")
n.ReportViolationsToClusterAgent(violation)
go n.ReportViolationsToClusterAgent(violation)
}
}

Expand Down Expand Up @@ -402,7 +440,7 @@ func (n *NodeAgent) SendDetectionEventToClusterAgent(detectionDataType, detectio
if err != nil {
n.logger.Error("error while sending detection events ", "err ", err)
} else {
n.logger.Debug("ingest event response", "response", resp)
n.logger.WithField("response", resp).Trace("ingest event response", "response", resp)
}
}

Expand Down
9 changes: 4 additions & 5 deletions pkg/protoqueue/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package protoqueue
import (
"errors"
"fmt"
"log"
"time"

"github.com/nats-io/nats.go"
Expand Down Expand Up @@ -189,14 +188,14 @@ func (j *JetStream) publishWithRetry(subject string, data []byte) error {
RetryInterval := 5 * time.Second
var err error
for i := 0; i < maxRetries; i++ {
// Publish message
_, err = j.Conn.JSContext.Publish(subject, data)
if err == nil {
// Message published successfully
return nil
}
log.Printf("Publish attempt %d failed: %v", i+1, err)
// Wait before retrying

j.logger.Warn("Publish attempt failed")
j.logger.WithError(err).Warnf("Publish attempt %d failed", i+1)

time.Sleep(RetryInterval)
}
return err
Expand Down
25 changes: 16 additions & 9 deletions pkg/server/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import (
// EventServer handles gRPC calls related to event ingestion and retrieval.
type EventServer struct {
tarianpb.UnimplementedEventServer
eventStore store.EventStore
ingestionQueue protoqueue.QueuePublisher
logger *logrus.Logger
eventStore store.EventStore
ingestionQueue protoqueue.QueuePublisher
ingestionQueueForEventDetection protoqueue.QueuePublisher
logger *logrus.Logger
}

// NewEventServer creates a new EventServer instance.
Expand All @@ -28,11 +29,12 @@ type EventServer struct {
//
// Returns:
// - *EventServer: A new instance of EventServer.
func NewEventServer(logger *logrus.Logger, s store.EventStore, ingestionQueue protoqueue.QueuePublisher) *EventServer {
func NewEventServer(logger *logrus.Logger, s store.EventStore, ingestionQueue protoqueue.QueuePublisher, ingestionQueueForEventDetection protoqueue.QueuePublisher) *EventServer {
return &EventServer{
eventStore: s,
ingestionQueue: ingestionQueue,
logger: logger,
eventStore: s,
ingestionQueue: ingestionQueue,
ingestionQueueForEventDetection: ingestionQueueForEventDetection,
logger: logger,
}
}

Expand All @@ -48,14 +50,19 @@ func NewEventServer(logger *logrus.Logger, s store.EventStore, ingestionQueue pr
func (es *EventServer) IngestEvent(ctx context.Context, request *tarianpb.IngestEventRequest) (*tarianpb.IngestEventResponse, error) {
es.logger.WithFields(logrus.Fields{
"request": request,
}).Debug("ingest event")
}).Trace("ingest event")

event := request.GetEvent()
if event == nil {
return nil, status.Error(codes.InvalidArgument, "required event is empty")
}

err := es.ingestionQueue.Publish(request.GetEvent())
var err error
if event.Type == tarianpb.EventTypeDetection {
err = es.ingestionQueueForEventDetection.Publish(request.GetEvent())
} else {
err = es.ingestionQueue.Publish(request.GetEvent())
}

if err != nil {
es.logger.WithError(err).Error("error while handling ingest event")
Expand Down
Loading
Loading