Skip to content

Commit

Permalink
chore: open some more syscalls from tarian-detector
Browse files Browse the repository at this point in the history
- add 2nd queue for tarian-detection event
- skip eBPF events from this process ID
- send events to tarian-server with Go routine
  • Loading branch information
andylibrian committed Apr 1, 2024
1 parent ffe5df6 commit ea22db3
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 89 deletions.
2 changes: 1 addition & 1 deletion cmd/tarianctl/cmd/install/defaultValues.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func natsHelmDefaultValues(natsValuesFile string) error {
},
FileStorage: StorageOpts{
Enabled: true,
Size: "50Mi",
Size: "200Mi",
},
},
},
Expand Down
4 changes: 2 additions & 2 deletions cmd/tarianctl/cmd/install/defaultValues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func TestNatsHelmDefaultValues(t *testing.T) {
t.Errorf("natsValuesFile does not contain 'size: 128Mi' in MemStorage")
}

if !containsSubstring(string(yamlContent), "size: 50Mi") {
t.Errorf("natsValuesFile does not contain 'size: 50Mi' in FileStorage")
if !containsSubstring(string(yamlContent), "size: 200Mi") {
t.Errorf("natsValuesFile does not contain 'size: 200Mi' in FileStorage")
}
}

Expand Down
2 changes: 1 addition & 1 deletion dev/nats-helm-values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ nats:

fileStorage:
enabled: true
size: 50Mi
size: 200Mi
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,4 @@ require (
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
)

replace github.com/intelops/tarian-detector => github.com/andylibrian/tarian-detector v0.0.0-20240328042403-0ad0b29f56cf
replace github.com/intelops/tarian-detector => github.com/andylibrian/tarian-detector v0.0.0-20240401225603-ad8e890004a8
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
github.com/andylibrian/tarian-detector v0.0.0-20240328042403-0ad0b29f56cf h1:sCIBD/c64HW9pW41ws1hDzFSwiQ53etRTl35zzYQzuw=
github.com/andylibrian/tarian-detector v0.0.0-20240328042403-0ad0b29f56cf/go.mod h1:dXcRWq8AHABseHsjcnM8iJqwXCGX+dGGOR8kiXw1acY=
github.com/andylibrian/tarian-detector v0.0.0-20240401225603-ad8e890004a8 h1:qHPCT8fRp50sbB1s2NWsP2B6wPRDqFJOjeYwbMMIuKk=
github.com/andylibrian/tarian-detector v0.0.0-20240401225603-ad8e890004a8/go.mod h1:dXcRWq8AHABseHsjcnM8iJqwXCGX+dGGOR8kiXw1acY=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
Expand Down
18 changes: 10 additions & 8 deletions pkg/nodeagent/nodeagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ func (n *NodeAgent) loopTarianDetectorReadEvents(ctx context.Context) error {
continue
}

thisPID := event["processId"].(uint32)
if thisPID == 1 {

Check warning on line 224 in pkg/nodeagent/nodeagent.go

View check run for this annotation

Codecov / codecov/patch

pkg/nodeagent/nodeagent.go#L223-L224

Added lines #L223 - L224 were not covered by tests
continue
}

pid := event["hostProcessId"].(uint32)

// Retrieve the container ID.
Expand All @@ -238,9 +243,6 @@ func (n *NodeAgent) loopTarianDetectorReadEvents(ctx context.Context) error {
continue

Check warning on line 243 in pkg/nodeagent/nodeagent.go

View check run for this annotation

Codecov / codecov/patch

pkg/nodeagent/nodeagent.go#L241-L243

Added lines #L241 - L243 were not covered by tests
}

// TODO: sys_execve_entry could be added here
// But for kubectl exec, the detected entry comm is still the wrapper: runc:init
// With sys_execve_exit, the comm is the target process
detectionDataType := event["eventId"].(string)
if detectionDataType == "sys_execve_entry" {
execEvent, err2 := n.execEventFromTarianDetector(event, containerID, pod)
Expand All @@ -255,7 +257,7 @@ func (n *NodeAgent) loopTarianDetectorReadEvents(ctx context.Context) error {
}

Check warning on line 257 in pkg/nodeagent/nodeagent.go

View check run for this annotation

Codecov / codecov/patch

pkg/nodeagent/nodeagent.go#L253-L257

Added lines #L253 - L257 were not covered by tests
}

n.logger.WithField("execEvent", execEvent).WithField("event", event).Info("DEBUG")
n.logger.WithField("execEvent", execEvent).WithField("event", event).Info("=============== DEBUG")

Check warning on line 260 in pkg/nodeagent/nodeagent.go

View check run for this annotation

Codecov / codecov/patch

pkg/nodeagent/nodeagent.go#L260

Added line #L260 was not covered by tests
}

byteData, err := json.Marshal(event)
Expand All @@ -264,7 +266,7 @@ func (n *NodeAgent) loopTarianDetectorReadEvents(ctx context.Context) error {
continue

Check warning on line 266 in pkg/nodeagent/nodeagent.go

View check run for this annotation

Codecov / codecov/patch

pkg/nodeagent/nodeagent.go#L263-L266

Added lines #L263 - L266 were not covered by tests
}

n.SendDetectionEventToClusterAgent(detectionDataType, string(byteData))
go n.SendDetectionEventToClusterAgent(detectionDataType, string(byteData))

Check warning on line 269 in pkg/nodeagent/nodeagent.go

View check run for this annotation

Codecov / codecov/patch

pkg/nodeagent/nodeagent.go#L269

Added line #L269 was not covered by tests
}
}
}
Expand Down Expand Up @@ -387,10 +389,10 @@ func (n *NodeAgent) handleExecEvent(evt *ExecEvent) error {

if registerProcess {
n.logger.WithField("comm", evt).Debug("violated process detected, going to register")
n.RegisterViolationsAsNewConstraint(violation)
go n.RegisterViolationsAsNewConstraint(violation)
} else {
n.logger.WithField("comm", evt).Debug("violated process detected")
n.ReportViolationsToClusterAgent(violation)
go n.ReportViolationsToClusterAgent(violation)
}

Check warning on line 396 in pkg/nodeagent/nodeagent.go

View check run for this annotation

Codecov / codecov/patch

pkg/nodeagent/nodeagent.go#L390-L396

Added lines #L390 - L396 were not covered by tests
}

Expand Down Expand Up @@ -418,7 +420,7 @@ func (n *NodeAgent) SendDetectionEventToClusterAgent(detectionDataType, detectio
if err != nil {
n.logger.Error("error while sending detection events", "err", err)
} else {
n.logger.Debug("ingest event response", "response", resp)
n.logger.WithField("response", resp).Trace("ingest event response", "response", resp)
}

Check warning on line 424 in pkg/nodeagent/nodeagent.go

View check run for this annotation

Codecov / codecov/patch

pkg/nodeagent/nodeagent.go#L405-L424

Added lines #L405 - L424 were not covered by tests
}

Expand Down
23 changes: 15 additions & 8 deletions pkg/server/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import (
// EventServer handles gRPC calls related to event ingestion and retrieval.
type EventServer struct {
tarianpb.UnimplementedEventServer
eventStore store.EventStore
ingestionQueue protoqueue.QueuePublisher
logger *logrus.Logger
eventStore store.EventStore
ingestionQueue protoqueue.QueuePublisher
ingestionQueueForEventDetection protoqueue.QueuePublisher
logger *logrus.Logger
}

// NewEventServer creates a new EventServer instance.
Expand All @@ -28,11 +29,12 @@ type EventServer struct {
//
// Returns:
// - *EventServer: A new instance of EventServer.
func NewEventServer(logger *logrus.Logger, s store.EventStore, ingestionQueue protoqueue.QueuePublisher) *EventServer {
func NewEventServer(logger *logrus.Logger, s store.EventStore, ingestionQueue protoqueue.QueuePublisher, ingestionQueueForEventDetection protoqueue.QueuePublisher) *EventServer {

Check warning on line 32 in pkg/server/event_server.go

View check run for this annotation

Codecov / codecov/patch

pkg/server/event_server.go#L32

Added line #L32 was not covered by tests
return &EventServer{
eventStore: s,
ingestionQueue: ingestionQueue,
logger: logger,
eventStore: s,
ingestionQueue: ingestionQueue,
ingestionQueueForEventDetection: ingestionQueueForEventDetection,
logger: logger,

Check warning on line 37 in pkg/server/event_server.go

View check run for this annotation

Codecov / codecov/patch

pkg/server/event_server.go#L34-L37

Added lines #L34 - L37 were not covered by tests
}
}

Expand All @@ -55,7 +57,12 @@ func (es *EventServer) IngestEvent(ctx context.Context, request *tarianpb.Ingest
return nil, status.Error(codes.InvalidArgument, "required event is empty")
}

err := es.ingestionQueue.Publish(request.GetEvent())
var err error
if event.Type == tarianpb.EventTypeDetection {
err = es.ingestionQueueForEventDetection.Publish(request.GetEvent())
} else {
err = es.ingestionQueue.Publish(request.GetEvent())
}

Check warning on line 65 in pkg/server/event_server.go

View check run for this annotation

Codecov / codecov/patch

pkg/server/event_server.go#L60-L65

Added lines #L60 - L65 were not covered by tests

if err != nil {
es.logger.WithError(err).Error("error while handling ingest event")
Expand Down
47 changes: 39 additions & 8 deletions pkg/server/ingestion_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (

// IngestionWorker handles the ingestion of events from a message queue.
type IngestionWorker struct {
eventStore store.EventStore
IngestionQueue protoqueue.QueueSubscriber
logger *logrus.Logger
eventStore store.EventStore
IngestionQueue protoqueue.QueueSubscriber
ingestionQueueForEventDetection protoqueue.QueueSubscriber
logger *logrus.Logger
}

// NewIngestionWorker creates a new IngestionWorker instance.
Expand All @@ -24,11 +25,12 @@ type IngestionWorker struct {
//
// Returns:
// - *IngestionWorker: A new instance of IngestionWorker.
func NewIngestionWorker(logger *logrus.Logger, eventStore store.EventStore, queueSubscriber protoqueue.QueueSubscriber) *IngestionWorker {
func NewIngestionWorker(logger *logrus.Logger, eventStore store.EventStore, queueSubscriber protoqueue.QueueSubscriber, queueSubscriberForEventDetection protoqueue.QueueSubscriber) *IngestionWorker {

Check warning on line 28 in pkg/server/ingestion_worker.go

View check run for this annotation

Codecov / codecov/patch

pkg/server/ingestion_worker.go#L28

Added line #L28 was not covered by tests
return &IngestionWorker{
eventStore: eventStore,
IngestionQueue: queueSubscriber,
logger: logger,
eventStore: eventStore,
IngestionQueue: queueSubscriber,
ingestionQueueForEventDetection: queueSubscriberForEventDetection,
logger: logger,

Check warning on line 33 in pkg/server/ingestion_worker.go

View check run for this annotation

Codecov / codecov/patch

pkg/server/ingestion_worker.go#L30-L33

Added lines #L30 - L33 were not covered by tests
}
}

Expand All @@ -40,8 +42,14 @@ func NewIngestionWorker(logger *logrus.Logger, eventStore store.EventStore, queu
// - If it is a valid event, it updates the server timestamp and stores the event in the event store.
// - If there are errors during processing, they are logged.
func (iw *IngestionWorker) Start() {
go iw.loopConsumeQueue(iw.IngestionQueue)
go iw.loopConsumeQueueEventDetection(iw.ingestionQueueForEventDetection)

Check warning on line 46 in pkg/server/ingestion_worker.go

View check run for this annotation

Codecov / codecov/patch

pkg/server/ingestion_worker.go#L45-L46

Added lines #L45 - L46 were not covered by tests
}

func (iw *IngestionWorker) loopConsumeQueue(queue protoqueue.QueueSubscriber) {

Check warning on line 49 in pkg/server/ingestion_worker.go

View check run for this annotation

Codecov / codecov/patch

pkg/server/ingestion_worker.go#L49

Added line #L49 was not covered by tests
for {
msg, err := iw.IngestionQueue.NextMessage(&tarianpb.Event{})
msg, err := queue.NextMessage(&tarianpb.Event{})
iw.logger.WithField("event", msg).Infof("loopConsumeQueue: got message")

Check warning on line 52 in pkg/server/ingestion_worker.go

View check run for this annotation

Codecov / codecov/patch

pkg/server/ingestion_worker.go#L51-L52

Added lines #L51 - L52 were not covered by tests
if err != nil {
iw.logger.WithError(err).Error("error while processing event")
continue
Expand All @@ -61,3 +69,26 @@ func (iw *IngestionWorker) Start() {
}
}
}

func (iw *IngestionWorker) loopConsumeQueueEventDetection(queue protoqueue.QueueSubscriber) {
for {
msg, err := queue.NextMessage(&tarianpb.Event{})
if err != nil {
// iw.logger.WithError(err).Error("error while processing event")
continue

Check warning on line 78 in pkg/server/ingestion_worker.go

View check run for this annotation

Codecov / codecov/patch

pkg/server/ingestion_worker.go#L73-L78

Added lines #L73 - L78 were not covered by tests
}

event, ok := msg.(*tarianpb.Event)
if !ok {
// iw.logger.WithError(err).Error("error while processing event")
continue

Check warning on line 84 in pkg/server/ingestion_worker.go

View check run for this annotation

Codecov / codecov/patch

pkg/server/ingestion_worker.go#L81-L84

Added lines #L81 - L84 were not covered by tests
}

event.ServerTimestamp = timestamppb.Now()
_ = iw.eventStore.Add(event)

Check warning on line 88 in pkg/server/ingestion_worker.go

View check run for this annotation

Codecov / codecov/patch

pkg/server/ingestion_worker.go#L87-L88

Added lines #L87 - L88 were not covered by tests

// if err != nil {
// iw.logger.WithError(err).Error("error while processing event")
// }
}
}
Loading

0 comments on commit ea22db3

Please sign in to comment.