diff --git a/NOTICE.txt b/NOTICE.txt index 61aa6ac7db3c..ecaf09ee5347 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -14216,11 +14216,11 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -------------------------------------------------------------------------------- Dependency : github.com/elastic/go-quark -Version: v0.2.0 +Version: v0.3.0 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/go-quark@v0.2.0/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/elastic/go-quark@v0.3.0/LICENSE.txt: Apache License @@ -22940,11 +22940,11 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -------------------------------------------------------------------------------- Dependency : github.com/stretchr/testify -Version: v1.9.0 +Version: v1.10.0 Licence type (autodetected): MIT -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/stretchr/testify@v1.9.0/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/stretchr/testify@v1.10.0/LICENSE: MIT License diff --git a/filebeat/channel/runner_mock_test.go b/filebeat/channel/runner_mock_test.go index e784d833e9e3..55cda0f1d480 100644 --- a/filebeat/channel/runner_mock_test.go +++ b/filebeat/channel/runner_mock_test.go @@ -18,6 +18,7 @@ package channel import ( + "reflect" "testing" "github.com/elastic/beats/v7/libbeat/beat" @@ -69,6 +70,10 @@ func (r runnerFactoryMock) Assert(t *testing.T) { // we need to make sure `Assert` is called after `Create` require.Len(t, r.cfgs, r.clientCount) + sameBacking := func(a, b any) bool { + return reflect.ValueOf(a).UnsafePointer() == reflect.ValueOf(b).UnsafePointer() + } + t.Run("new processing configuration each time", func(t *testing.T) { for i, c1 := range r.cfgs { for j, c2 := range r.cfgs { @@ -76,10 +81,8 @@ func (r runnerFactoryMock) Assert(t *testing.T) { continue } - require.NotSamef(t, c1.Processing, c2.Processing, "processing configuration cannot be re-used") - require.NotSamef(t, c1.Processing.Meta, c2.Processing.Meta, "`Processing.Meta` cannot be re-used") - require.NotSamef(t, c1.Processing.Fields, c2.Processing.Fields, "`Processing.Fields` cannot be re-used") - require.NotSamef(t, c1.Processing.Processor, c2.Processing.Processor, "`Processing.Processor` cannot be re-used") + require.Falsef(t, sameBacking(c1.Processing.Meta, c2.Processing.Meta), "`Processing.Meta` cannot be re-used") + require.Falsef(t, sameBacking(c1.Processing.Fields, c2.Processing.Fields), "`Processing.Fields` cannot be re-used") } } }) diff --git a/go.mod b/go.mod index e474081d2096..655fed320c06 100644 --- a/go.mod +++ b/go.mod @@ -118,7 +118,7 @@ require ( github.com/shopspring/decimal v1.3.1 // indirect github.com/spf13/cobra v1.8.1 github.com/spf13/pflag v1.0.5 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 github.com/ugorji/go/codec v1.1.8 github.com/vmware/govmomi v0.39.0 go.elastic.co/ecszap v1.0.2 @@ -179,7 +179,7 @@ require ( github.com/elastic/elastic-agent-libs v0.18.1 github.com/elastic/elastic-agent-system-metrics v0.11.7 github.com/elastic/go-elasticsearch/v8 v8.17.0 - github.com/elastic/go-quark v0.2.0 + github.com/elastic/go-quark v0.3.0 github.com/elastic/go-sfdc v0.0.0-20241010131323-8e176480d727 github.com/elastic/mito v1.16.0 github.com/elastic/mock-es v0.0.0-20240712014503-e5b47ece0015 diff --git a/go.sum b/go.sum index dcfcb934e66e..19de7bd540aa 100644 --- a/go.sum +++ b/go.sum @@ -370,8 +370,8 @@ github.com/elastic/go-lumber v0.1.2-0.20220819171948-335fde24ea0f h1:TsPpU5EAwlt github.com/elastic/go-lumber v0.1.2-0.20220819171948-335fde24ea0f/go.mod h1:HHaWnZamYKWsR9/eZNHqRHob8iQDKnchHmmskT/SKko= github.com/elastic/go-perf v0.0.0-20241029065020-30bec95324b8 h1:FD01NjsTes0RxZVQ22ebNYJA4KDdInVnR9cn1hmaMwA= github.com/elastic/go-perf v0.0.0-20241029065020-30bec95324b8/go.mod h1:Nt+pnRYvf0POC+7pXsrv8ubsEOSsaipJP0zlz1Ms1RM= -github.com/elastic/go-quark v0.2.0 h1:r2BL4NzvhESrrL/yA3AcHt8mwF7fvQDssBAUiOL1sdg= -github.com/elastic/go-quark v0.2.0/go.mod h1:/ngqgumD/Z5vnFZ4XPN2kCbxnEfG5/Uc+bRvOBabVVA= +github.com/elastic/go-quark v0.3.0 h1:d4vokx0psEJo+93fnhvWpTJMggPd9rfMJSleoLva4xA= +github.com/elastic/go-quark v0.3.0/go.mod h1:bO/XIGZBUJGxyiJ9FTsSYn9YlfOTRJnmOP+iBE2FyjA= github.com/elastic/go-seccomp-bpf v1.5.0 h1:gJV+U1iP+YC70ySyGUUNk2YLJW5/IkEw4FZBJfW8ZZY= github.com/elastic/go-seccomp-bpf v1.5.0/go.mod h1:umdhQ/3aybliBF2jjiZwS492I/TOKz+ZRvsLT3hVe1o= github.com/elastic/go-sfdc v0.0.0-20241010131323-8e176480d727 h1:yuiN60oaQUz2PtNpNhDI2H6zrCdfiiptmNdwV5WUaKA= @@ -901,8 +901,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/teambition/rrule-go v1.8.2 h1:lIjpjvWTj9fFUZCmuoVDrKVOtdiyzbzc93qTmRVe/J8= github.com/teambition/rrule-go v1.8.2/go.mod h1:Ieq5AbrKGciP1V//Wq8ktsTXwSwJHDD5mD/wLBGl3p4= github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= diff --git a/x-pack/auditbeat/module/system/process/config.go b/x-pack/auditbeat/module/system/process/config.go index 52cb6dd98933..f144c5991398 100644 --- a/x-pack/auditbeat/module/system/process/config.go +++ b/x-pack/auditbeat/module/system/process/config.go @@ -5,6 +5,7 @@ package process import ( + "fmt" "time" "github.com/elastic/beats/v7/auditbeat/helper/hasher" @@ -16,11 +17,19 @@ type Config struct { ProcessStatePeriod time.Duration `config:"process.state.period"` HasherConfig hasher.Config `config:"process.hash"` + Backend string `config:"process.backend"` } // Validate validates the config. func (c *Config) Validate() error { - return c.HasherConfig.Validate() + if err := c.HasherConfig.Validate(); err != nil { + return err + } + if c.Backend != "kernel_tracing" && c.Backend != "procfs" { + return fmt.Errorf("invalid process.backend '%s'", c.Backend) + } + + return nil } func (c *Config) effectiveStatePeriod() time.Duration { @@ -40,4 +49,5 @@ var defaultConfig = Config{ ScanRatePerSec: "50 MiB", ScanRateBytesPerSec: 50 * 1024 * 1024, }, + Backend: "procfs", } diff --git a/x-pack/auditbeat/module/system/process/gosysinfo_provider.go b/x-pack/auditbeat/module/system/process/gosysinfo_provider.go index da82a2e18106..b6a0539b8da7 100644 --- a/x-pack/auditbeat/module/system/process/gosysinfo_provider.go +++ b/x-pack/auditbeat/module/system/process/gosysinfo_provider.go @@ -20,7 +20,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common/capabilities" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/x-pack/auditbeat/cache" - "github.com/elastic/beats/v7/x-pack/auditbeat/module/system" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/go-sysinfo" "github.com/elastic/go-sysinfo/types" @@ -33,7 +32,6 @@ const ( // SysinfoMetricSet collects data about the host. type SysInfoMetricSet struct { - system.SystemMetricSet MetricSet hasher *hasher.FileHasher cache *cache.Cache @@ -81,7 +79,7 @@ func (p Process) toMapStr() mapstr.M { } // NewFromSysInfo constructs a new MetricSet backed by go-sysinfo. -func NewFromSysInfo(base mb.BaseMetricSet, ms MetricSet) (mb.MetricSet, error) { +func NewFromSysInfo(ms MetricSet) (mb.MetricSet, error) { bucket, err := datastore.OpenBucket(bucketName) if err != nil { return nil, fmt.Errorf("failed to open persistent datastore: %w", err) @@ -117,12 +115,11 @@ func NewFromSysInfo(base mb.BaseMetricSet, ms MetricSet) (mb.MetricSet, error) { } sm := &SysInfoMetricSet{ - SystemMetricSet: system.NewSystemMetricSet(base), - MetricSet: ms, - cache: cache.New(), - bucket: bucket, - lastState: lastState, - hasher: hasher, + MetricSet: ms, + cache: cache.New(), + bucket: bucket, + lastState: lastState, + hasher: hasher, } return sm, nil @@ -351,27 +348,12 @@ func putIfNotEmpty(mapstr *mapstr.M, key string, value string) { } func processMessage(process *Process, action eventAction) string { - if process.Error != nil { - return fmt.Sprintf("ERROR for PID %d: %v", process.Info.PID, process.Error) - } - - var actionString string - switch action { - case eventActionProcessStarted: - actionString = "STARTED" - case eventActionProcessStopped: - actionString = "STOPPED" - case eventActionExistingProcess: - actionString = "is RUNNING" - } - - var userString string + var username string if process.User != nil { - userString = fmt.Sprintf(" by user %v", process.User.Username) + username = process.User.Username } - return fmt.Sprintf("Process %v (PID: %d)%v %v", - process.Info.Name, process.Info.PID, userString, actionString) + return makeMessage(process.Info.PID, action, process.Info.Name, username, process.Error) } func convertToCacheable(processes []*Process) []cache.Cacheable { diff --git a/x-pack/auditbeat/module/system/process/process.go b/x-pack/auditbeat/module/system/process/process.go index c79e87ce0fad..a45ef2e80826 100644 --- a/x-pack/auditbeat/module/system/process/process.go +++ b/x-pack/auditbeat/module/system/process/process.go @@ -7,6 +7,7 @@ package process import ( "encoding/binary" "fmt" + "runtime" "time" "github.com/elastic/beats/v7/auditbeat/ab" @@ -26,6 +27,7 @@ const ( // MetricSet collects data about the host. type MetricSet struct { + system.SystemMetricSet config Config log *logp.Logger } @@ -36,6 +38,8 @@ const ( eventActionExistingProcess eventAction = iota eventActionProcessStarted eventActionProcessStopped + eventActionProcessRan + eventActionProcessChangedImage eventActionProcessError ) @@ -47,6 +51,10 @@ func (action eventAction) String() string { return "process_started" case eventActionProcessStopped: return "process_stopped" + case eventActionProcessRan: + return "process_ran" + case eventActionProcessChangedImage: + return "process_changed_image" case eventActionProcessError: return "process_error" default: @@ -62,6 +70,8 @@ func (action eventAction) Type() string { return "start" case eventActionProcessStopped: return "end" + case eventActionProcessChangedImage: + return "change" case eventActionProcessError: return "info" default: @@ -84,12 +94,21 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { ms.config = defaultConfig ms.log = logp.NewLogger(metricsetName) + ms.SystemMetricSet = system.NewSystemMetricSet(base) if err := base.Module().UnpackConfig(&ms.config); err != nil { return nil, fmt.Errorf("failed to unpack the %v/%v config: %w", system.ModuleName, metricsetName, err) } - return NewFromSysInfo(base, ms) + if runtime.GOOS == "linux" && ms.config.Backend == "kernel_tracing" { + if qm, err := NewFromQuark(ms); err == nil { + return qm, nil + } else { + ms.log.Errorf("can't use kernel_tracing, falling back to procfs: %v", err) + } + } + + return NewFromSysInfo(ms) } // entityID creates an ID that uniquely identifies this process across machines. @@ -102,3 +121,33 @@ func entityID(hostID string, pid int, startTime time.Time) string { binary.Write(h, binary.LittleEndian, int64(startTime.Nanosecond())) return h.Sum() } + +func makeMessage(pid int, action eventAction, name string, username string, err error) string { + if err != nil { + return fmt.Sprintf("ERROR for PID %d: %v", pid, err) + } + + var actionString string + switch action { + case eventActionProcessStarted: + actionString = "STARTED" + case eventActionProcessStopped: + actionString = "STOPPED" + case eventActionExistingProcess: + actionString = "is RUNNING" + case eventActionProcessRan: + actionString = "RAN" + case eventActionProcessChangedImage: + actionString = "CHANGED IMAGE" + case eventActionProcessError: // NOTREACHABLE as err != nil if action is ProcessError + actionString = "ERROR" + } + + var userString string + if len(username) > 0 { + userString = fmt.Sprintf(" by user %v", username) + } + + return fmt.Sprintf("Process %v (PID: %d)%v %v", + name, pid, userString, actionString) +} diff --git a/x-pack/auditbeat/module/system/process/quark_provider_linux.go b/x-pack/auditbeat/module/system/process/quark_provider_linux.go new file mode 100644 index 000000000000..fbb7e4aba74f --- /dev/null +++ b/x-pack/auditbeat/module/system/process/quark_provider_linux.go @@ -0,0 +1,337 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build linux && (amd64 || arm64) && cgo + +package process + +import ( + "fmt" + "os/user" + "strconv" + "time" + + "github.com/elastic/beats/v7/auditbeat/helper/hasher" + "github.com/elastic/beats/v7/auditbeat/helper/tty" + "github.com/elastic/beats/v7/libbeat/common/capabilities" + "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/monitoring" + + quark "github.com/elastic/go-quark" +) + +var quarkMetrics = struct { + insertions *monitoring.Uint + removals *monitoring.Uint + aggregations *monitoring.Uint + nonAggregations *monitoring.Uint + lost *monitoring.Uint + backend *monitoring.String +}{} + +func init() { + reg := monitoring.Default.NewRegistry("process@quark") + quarkMetrics.insertions = monitoring.NewUint(reg, "insertions") + quarkMetrics.removals = monitoring.NewUint(reg, "removals") + quarkMetrics.aggregations = monitoring.NewUint(reg, "aggregations") + quarkMetrics.nonAggregations = monitoring.NewUint(reg, "non_aggregations") + quarkMetrics.lost = monitoring.NewUint(reg, "lost") + quarkMetrics.backend = monitoring.NewString(reg, "backend", monitoring.Report) +} + +// QuarkMetricSet is a MetricSet with added members used only in and by +// quark. QuarkMetricSet uses mb.PushReporterV2 instead of +// mb.ReporterV2. More notably we don't do periodic state reports and +// we don't need a cache as it is provided by quark. +type QuarkMetricSet struct { + MetricSet + queue *quark.Queue // Quark runtime state + selfMntNsIno uint32 // Mnt inode from current process + cachedHasher *hasher.CachedHasher +} + +// Used for testing only and not exposed via config +var quarkForceKprobe bool + +// NewFromQuark instantiates the module with quark's backend. +func NewFromQuark(ms MetricSet) (mb.MetricSet, error) { + var qm QuarkMetricSet + + qm.MetricSet = ms + + ino64, err := selfNsIno("mnt") + if err != nil { + return nil, fmt.Errorf("failed to fetch self mount inode: %w", err) + } + qm.selfMntNsIno = uint32(ino64) + qm.cachedHasher, err = hasher.NewFileHasherWithCache(qm.config.HasherConfig, 4096) + if err != nil { + return nil, fmt.Errorf("can't create hash cache: %w", err) + } + + attr := quark.DefaultQueueAttr() + if quarkForceKprobe { + attr.Flags &= ^quark.QQ_ALL_BACKENDS + attr.Flags |= quark.QQ_KPROBE + } + qm.queue, err = quark.OpenQueue(attr, 1) + if err != nil { + qm.cachedHasher.Close() + return nil, fmt.Errorf("can't open quark queue: %w", err) + } + stats := qm.queue.Stats() + if stats.Backend == quark.QQ_EBPF { + qm.log.Info("quark using EBPF") + } else if stats.Backend == quark.QQ_KPROBE { + qm.log.Info("quark using KPROBES") + } else { + qm.queue.Close() + qm.cachedHasher.Close() + return nil, fmt.Errorf("quark has an invalid backend") + } + + return &qm, nil +} + +// Run reads events from quark's queue and pushes them into output. +// The queue is owned by this goroutine and should not be touched +// from outside as there is no synchronization. +func (ms *QuarkMetricSet) Run(r mb.PushReporterV2) { + ms.log.Info("Quark running") + + metricsStamp := time.Now() + +MainLoop: + for { + // Poll for done + select { + case <-r.Done(): + break MainLoop + default: + } + + ms.maybeUpdateMetrics(&metricsStamp) + + x := time.Now() + quarkEvents, err := ms.queue.GetEvents() + if len(quarkEvents) == 1 { + ms.log.Debugf("getevents took %v", time.Since(x)) + } + if err != nil { + ms.log.Error("quark GetEvents, unrecoverable error", err) + break MainLoop + } + if len(quarkEvents) == 0 { + err = ms.queue.Block() + if err != nil { + ms.log.Error("quark Block, unrecoverable error", err) + break MainLoop + } + continue + } + for _, quarkEvent := range quarkEvents { + if !wantedEvent(quarkEvent) { + continue + } + if event, ok := ms.toEvent(quarkEvent); ok { + r.Event(event) + } + } + } + + // Queue is owned by this goroutine, if we ever access it from + // outside, we need to consider synchronization. + ms.cachedHasher.Close() + ms.queue.Close() + ms.queue = nil +} + +// toEvent converts a quark.Event to a mb.Event, returns true if we +// were able to make an event. +func (ms *QuarkMetricSet) toEvent(quarkEvent quark.Event) (mb.Event, bool) { + action, evtype := actionAndTypeOfEvent(quarkEvent) + process := quarkEvent.Process + event := mb.Event{RootFields: mapstr.M{}} + + var username string + var processErr error + defer func() { + // Fill out root message and error.message + event.RootFields.Put("message", + makeMessage(int(process.Pid), action, process.Comm, username, processErr)) + if processErr != nil { + event.RootFields.Put("error.message", processErr.Error()) + } + }() + + // Values that are independent of Proc.Valid + // Fill out event.* + event.RootFields.Put("event.type", evtype) + event.RootFields.Put("event.action", action.String()) + event.RootFields.Put("event.category", []string{"process"}) + event.RootFields.Put("event.kind", "event") + // Fill out process.* + event.RootFields.Put("process.name", process.Comm) + event.RootFields.Put("process.args", process.Cmdline) + event.RootFields.Put("process.args_count", len(process.Cmdline)) + event.RootFields.Put("process.pid", process.Pid) + event.RootFields.Put("process.working_directory", process.Cwd) + event.RootFields.Put("process.executable", process.Filename) + if process.Exit.Valid { + event.RootFields.Put("process.exit_code", process.Exit.ExitCode) + } + if !process.Proc.Valid { + return event, true + } + + // + // Code below can rely on Proc + // + + // Ids + event.RootFields.Put("process.parent.pid", process.Proc.Ppid) + startTime := time.Unix(0, int64(process.Proc.TimeBoot)) + if ms.HostID() != "" { + // TODO unify with sessionview and guarantee loss of precision + event.RootFields.Put("process.entity_id", + entityID(ms.HostID(), int(process.Pid), startTime)) + } + event.RootFields.Put("process.start", startTime) + event.RootFields.Put("user.id", process.Proc.Uid) + event.RootFields.Put("user.group.id", process.Proc.Gid) + event.RootFields.Put("user.effective.id", process.Proc.Euid) + event.RootFields.Put("user.effective.group.id", process.Proc.Egid) + event.RootFields.Put("user.saved.id", process.Proc.Suid) + event.RootFields.Put("user.saved.group.id", process.Proc.Sgid) + if us, err := user.LookupId(strconv.FormatUint(uint64(process.Proc.Uid), 10)); err == nil { + event.RootFields.Put("user.name", us.Username) + username = us.Username + } + if group, err := user.LookupGroupId(strconv.FormatUint(uint64(process.Proc.Gid), 10)); err == nil { + event.RootFields.Put("user.group.name", group.Name) + } + // Tty things + event.RootFields.Put("process.interactive", + tty.InteractiveFromTTY(tty.TTYDev{ + Major: process.Proc.TtyMajor, + Minor: process.Proc.TtyMinor, + })) + if process.Proc.TtyMajor != 0 { + event.RootFields.Put("process.tty.char_device.major", process.Proc.TtyMajor) + event.RootFields.Put("process.tty.char_device.minor", process.Proc.TtyMinor) + } + // Capabilities + capEffective, _ := capabilities.FromUint64(process.Proc.CapEffective) + if len(capEffective) > 0 { + event.RootFields.Put("process.thread.capabilities.effective", capEffective) + } + capPermitted, _ := capabilities.FromUint64(process.Proc.CapPermitted) + if len(capPermitted) > 0 { + event.RootFields.Put("process.thread.capabilities.permitted", capPermitted) + } + // If we are in the same mount namespace of the process, hash + // the file. When quark is running on kprobes, there are + // limitations concerning the full path of the filename, in + // those cases, the path won't start with a slash. + if process.Proc.MntInonum == ms.selfMntNsIno && len(process.Filename) > 0 && process.Filename[0] == '/' { + hashes, err := ms.cachedHasher.HashFile(process.Filename) + if err != nil { + processErr = fmt.Errorf("failed to hash executable %v for PID %v: %w", + process.Filename, process.Pid, err) + ms.log.Warn(processErr.Error()) + } else { + for hashType, digest := range hashes { + fieldName := "process.hash." + string(hashType) + event.RootFields.Put(fieldName, digest) + } + } + } else { + ms.log.Debugf("skipping hash %s (inonum %d vs %d)", process.Filename, process.Proc.MntInonum, ms.selfMntNsIno) + } + + return event, true +} + +// wantedEvent filters in only the wanted events from quark. +func wantedEvent(quarkEvent quark.Event) bool { + const wanted uint64 = quark.QUARK_EV_FORK | + quark.QUARK_EV_EXEC | + quark.QUARK_EV_EXIT | + quark.QUARK_EV_SNAPSHOT + if quarkEvent.Events&wanted == 0 || + quarkEvent.Process.Pid == 2 || + quarkEvent.Process.Proc.Ppid == 2 { // skip kthreads + + return false + } + + return true +} + +// actionAndTypeOfEvent computes eventAction and event.type out of a quark.Event. +func actionAndTypeOfEvent(quarkEvent quark.Event) (eventAction, []string) { + snap := quarkEvent.Events&quark.QUARK_EV_SNAPSHOT != 0 + fork := quarkEvent.Events&quark.QUARK_EV_FORK != 0 + exec := quarkEvent.Events&quark.QUARK_EV_EXEC != 0 + exit := quarkEvent.Events&quark.QUARK_EV_EXIT != 0 + + // Calculate event.action + // If it's a snap, it's existing + // If it forked + exited and executed or not, we consider ran + // If it execed + exited we consider stopped + // If it execed but didn't fork or exit, we consider changed image + var action eventAction + if snap { + action = eventActionExistingProcess + } else if fork && exit { + action = eventActionProcessRan + } else if fork { + action = eventActionProcessStarted + } else if exit { + action = eventActionProcessStopped + } else if exec { + action = eventActionProcessChangedImage + } else { + action = eventActionProcessError + } + // Calculate event.type + evtype := make([]string, 0, 4) + if snap { + evtype = append(evtype, eventActionExistingProcess.Type()) + } + if fork { + evtype = append(evtype, eventActionProcessStarted.Type()) + } + if exec { + evtype = append(evtype, eventActionProcessChangedImage.Type()) + } + if exit { + evtype = append(evtype, eventActionProcessStopped.Type()) + } + + return action, evtype +} + +func (ms *QuarkMetricSet) maybeUpdateMetrics(stamp *time.Time) { + if time.Since(*stamp) < time.Second*5 { + return + } + + stats := ms.queue.Stats() + quarkMetrics.insertions.Set(stats.Insertions) + quarkMetrics.removals.Set(stats.Removals) + quarkMetrics.aggregations.Set(stats.Aggregations) + quarkMetrics.nonAggregations.Set(stats.NonAggregations) + quarkMetrics.lost.Set(stats.Lost) + if stats.Backend == quark.QQ_EBPF { + quarkMetrics.backend.Set("ebpf") + } else if stats.Backend == quark.QQ_KPROBE { + quarkMetrics.backend.Set("kprobe") + } else { + quarkMetrics.backend.Set("invalid") + } + + *stamp = time.Now() +} diff --git a/x-pack/auditbeat/module/system/process/quark_provider_linux_test.go b/x-pack/auditbeat/module/system/process/quark_provider_linux_test.go new file mode 100644 index 000000000000..92e4707a8fec --- /dev/null +++ b/x-pack/auditbeat/module/system/process/quark_provider_linux_test.go @@ -0,0 +1,411 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build linux && (amd64 || arm64) && cgo + +package process + +import ( + "os" + "os/exec" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/auditbeat/ab" + "github.com/elastic/beats/v7/auditbeat/helper/tty" + "github.com/elastic/beats/v7/libbeat/common/capabilities" + "github.com/elastic/beats/v7/metricbeat/mb" + mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" + "github.com/elastic/beats/v7/x-pack/auditbeat/module/system" + "github.com/elastic/elastic-agent-libs/mapstr" + quark "github.com/elastic/go-quark" +) + +type backend int + +const ( + Ebpf backend = iota + Kprobe +) + +func TestInitialSnapshotEbpf(t *testing.T) { + skipIfNotRoot(t) + testInitialSnapshot(t, Ebpf) +} + +func TestInitialSnapshotKprobe(t *testing.T) { + skipIfNotRoot(t) + testInitialSnapshot(t, Kprobe) +} + +func TestForkExecExitEbpf(t *testing.T) { + skipIfNotRoot(t) + testForkExecExit(t, Ebpf) +} + +func TestForkExecExitKprobe(t *testing.T) { + skipIfNotRoot(t) + testForkExecExit(t, Kprobe) +} + +func TestQuarkMetricSetEbpf(t *testing.T) { + skipIfNotRoot(t) + testQuarkMetricSet(t, Ebpf) +} + +func TestQuarkMetricSetKprobe(t *testing.T) { + skipIfNotRoot(t) + testQuarkMetricSet(t, Kprobe) +} + +// testInitialSnapshot see if quark is generating snapshot events +func testInitialSnapshot(t *testing.T, be backend) { + qq := openQueue(t, be) + defer qq.Close() + + // There should be events of kind quark.QUARK_EV_SNAPSHOT + qevs := drainFor(t, qq, 5*time.Millisecond) + var gotsnap bool + for _, qev := range qevs { + if qev.Events&quark.QUARK_EV_SNAPSHOT != 0 { + gotsnap = true + } + } + + require.True(t, gotsnap) +} + +// testForkExecExit tests if a spawned process shows up in quark +func testForkExecExit(t *testing.T, be backend) { + qq := openQueue(t, be) + defer qq.Close() + + // runNop will fork+exec+exit /bin/true + cmd := runNop(t) + qev := drainFirstOfPid(t, qq, cmd.Process.Pid) + + // We should get at least FORK|EXEC|EXIT in the aggregation + require.Equal(t, + qev.Events&(quark.QUARK_EV_FORK|quark.QUARK_EV_EXEC|quark.QUARK_EV_EXIT), + quark.QUARK_EV_FORK|quark.QUARK_EV_EXEC|quark.QUARK_EV_EXIT) + + // This is virtually impossible to fail, but we're pedantic + require.True(t, qev.Process.Proc.Valid) + + // We need these otherwise nothing works + require.NotZero(t, qev.Process.Proc.MntInonum) + require.NotZero(t, qev.Process.Proc.TimeBoot) + require.NotZero(t, qev.Process.Proc.Ppid) + + // Must be /bin/true + require.Equal(t, qev.Process.Filename, cmd.Path) + require.Equal(t, qev.Process.Filename, cmd.Args[0]) + + // Kprobe cwd path depth is limited + if be != Kprobe { + cwd, err := os.Getwd() + require.NoError(t, err) + + require.Equal(t, cwd, qev.Process.Cwd) + } + + // Check exit + require.True(t, qev.Process.Exit.Valid) + require.Zero(t, qev.Process.Exit.ExitCode) + // Don't care about ExitTime, it's also not precise +} + +// testQuarkMetricSet will start the module and check if the it +// generates the correct event for os.Getpid() (an existing process), +// and for a process we spawn ourselves via runNop(). +func testQuarkMetricSet(t *testing.T, be backend) { + config := getConfigForQuark(be) + + // Start the module, it will open its own queue + f := mbtest.NewPushMetricSetV2WithRegistry(t, config, ab.Registry) + ms, ok := f.(*QuarkMetricSet) + require.True(t, ok) + + // Start our own queue in parallel, so we can compare some + // members we have no other way of fetching + qq := openQueue(t, be) + defer qq.Close() + + // The queue is open, so we can spawn something and it should show up + cmd := runNop(t) + + // Run the main loop, it should get a snapshot event of + // and a fork+exec+exit of the nop we just ran + // XXX sadly we can't control the holdTime of the queue used by beats, + // so we need to wait a full second + events := mbtest.RunPushMetricSetV2(1100*time.Millisecond, 0, ms) + require.NotEmpty(t, events) + + // Lookup self from qq, we need Proc.TimeBoot and Suid/Sgid + selfFromQQ, ok := qq.Lookup(os.Getpid()) + require.True(t, ok) + // Make a fake event from self (Os.getpid()) + selfTarget := makeSelfEvent(t, selfFromQQ, be) + // Lookup what we actually got from beats + selfActual := firstEventOfPid(t, events, os.Getpid()) + // Compare + checkEvent(t, selfTarget, selfActual) + + // Drain until we find the event generated by runNop(), we + // need Proc.TimeBoot and Suid/Sgid + spawnedFromQQ := drainFirstOfPid(t, qq, cmd.Process.Pid) + // Make an event of the spawned cmd + spawnedTarget := makeEventOfCmd(t, cmd, spawnedFromQQ, be) + // Lookup what we actually got from beats + spawnedActual := firstEventOfPid(t, events, cmd.Process.Pid) + // Compare + checkEvent(t, spawnedTarget, spawnedActual) +} + +// checkEvent checks the equality of all the events from target in actual, +// not the other way around since actual is always larger +func checkEvent(t *testing.T, target mb.Event, actual mb.Event) { + for tk, tv := range target.RootFields { + av, err := actual.RootFields.GetValue(tk) + require.NoError(t, err) + require.Equal(t, tv, av) + } +} + +// openQueue opens a quark queue on a specific backend +func openQueue(t *testing.T, be backend) *quark.Queue { + attr := quark.DefaultQueueAttr() + attr.HoldTime = 25 + attr.Flags &= ^quark.QQ_ALL_BACKENDS + if be == Ebpf { + attr.Flags |= quark.QQ_EBPF + } else if be == Kprobe { + attr.Flags |= quark.QQ_KPROBE + } + qq, err := quark.OpenQueue(attr, 1) + require.NoError(t, err) + + return qq +} + +// runNop does fork+exec+exit /bin/true +func runNop(t *testing.T) *exec.Cmd { + cmd := exec.Command("/bin/true") + require.NotNil(t, cmd) + err := cmd.Run() + require.NoError(t, err) + + return cmd +} + +// drainFor drains all events for `d` +func drainFor(t *testing.T, qq *quark.Queue, d time.Duration) []quark.Event { + var allQevs []quark.Event + + start := time.Now() + + for { + qevs, err := qq.GetEvents() + require.NoError(t, err) + for _, qev := range qevs { + if !wantedEvent(qev) { + continue + } + allQevs = append(allQevs, qev) + } + if time.Since(start) > d { + break + } + // Intentionally placed at the end so that we always + // get one more try after the last block + if len(qevs) == 0 { + _ = qq.Block() + } + } + + return allQevs +} + +// drainFirstOfPid returns the first event +func drainFirstOfPid(t *testing.T, qq *quark.Queue, pid int) quark.Event { + start := time.Now() + + for { + qevs, err := qq.GetEvents() + require.NoError(t, err) + for _, qev := range qevs { + if !wantedEvent(qev) { + continue + } + if qev.Process.Pid == uint32(pid) { + return qev + } + } + if time.Since(start) > time.Second { + break + } + // Intentionally placed at the end so that we always + // get one more try after the last block + if len(qevs) == 0 { + _ = qq.Block() + } + } + + t.Fatalf("Can't find event of pid %d", pid) + + return quark.Event{} // NOTREACHED +} + +// firstEventOfPid looks up the first event of `pid` in `events` +func firstEventOfPid(t *testing.T, events []mb.Event, pid int) mb.Event { + for _, event := range events { + pid2, err := event.RootFields.GetValue("process.pid") + require.NoError(t, err) + if pid2.(uint32) == uint32(pid) { + return event + } + } + + t.Fatalf("Can't find event of pid %d", pid) + + return mb.Event{} // NOTREACHED +} + +// makeSelfEvent builds what should be the event that quark will +// generate as an initial snapshot of the current process +func makeSelfEvent(t *testing.T, qp quark.Process, be backend) mb.Event { + exe, err := os.Executable() + require.NoError(t, err) + + interactive := tty.InteractiveFromTTY(tty.TTYDev{ + Major: qp.Proc.TtyMajor, + Minor: qp.Proc.TtyMinor, + }) + + capEff, err := capabilities.FromPid(capabilities.Effective, os.Getpid()) + require.NoError(t, err) + capPer, err := capabilities.FromPid(capabilities.Permitted, os.Getpid()) + require.NoError(t, err) + + self := mb.Event{ + RootFields: mapstr.M{ + "event.type": []string{"info"}, + "event.action": "existing_process", + "event.category": []string{"process"}, + "event.kind": "event", + "process.name": qp.Comm, + "process.args": qp.Cmdline, + "process.args_count": len(qp.Cmdline), + "process.pid": uint32(os.Getpid()), + "process.executable": exe, + "process.parent.pid": uint32(os.Getppid()), + "process.start": time.Unix(0, int64(qp.Proc.TimeBoot)), + "user.id": uint32(0), + "user.group.id": uint32(0), + "user.effective.id": uint32(0), + "user.saved.id": qp.Proc.Suid, + "user.saved.group.id": qp.Proc.Sgid, + "user.name": "root", + "user.group.name": "root", + "process.interactive": interactive, + "process.thread.capabilities.effective": capEff, + "process.thread.capabilities.permitted": capPer, + }, + } + + // Kprobe path depth is limited + if be != Kprobe { + cwd, err := os.Getwd() + require.NoError(t, err) + + self.RootFields["process.working_directory"] = cwd + } + + if qp.Proc.TtyMajor != 0 { + self.RootFields["process.tty.char_device.major"] = qp.Proc.TtyMajor + self.RootFields["process.tty.char_device.minor"] = qp.Proc.TtyMinor + } + + return self +} + +// makeEventOfCmd builds an mb.Event out of cmd and qev +func makeEventOfCmd(t *testing.T, cmd *exec.Cmd, qev quark.Event, be backend) mb.Event { + // We should get at least FORK|EXEC|EXIT in the aggregation + require.Equal(t, + qev.Events&(quark.QUARK_EV_FORK|quark.QUARK_EV_EXEC|quark.QUARK_EV_EXIT), + quark.QUARK_EV_FORK|quark.QUARK_EV_EXEC|quark.QUARK_EV_EXIT) + // This is virtually impossible to fail, but we're pedantic + require.True(t, qev.Process.Proc.Valid) + + qp := qev.Process + + interactive := tty.InteractiveFromTTY(tty.TTYDev{ + Major: qp.Proc.TtyMajor, + Minor: qp.Proc.TtyMinor, + }) + + capEff, err := capabilities.FromPid(capabilities.Effective, os.Getpid()) + require.NoError(t, err) + capPer, err := capabilities.FromPid(capabilities.Permitted, os.Getpid()) + require.NoError(t, err) + + cmdEvent := mb.Event{ + RootFields: mapstr.M{ + "event.type": []string{"start", "change", "end"}, + "event.action": "process_ran", + "event.category": []string{"process"}, + "event.kind": "event", + "process.name": "true", + "process.args": []string{"/bin/true"}, + "process.args_count": 1, + "process.pid": uint32(cmd.Process.Pid), + "process.executable": "/bin/true", + "process.parent.pid": uint32(os.Getpid()), + "process.start": time.Unix(0, int64(qp.Proc.TimeBoot)), + "user.id": uint32(0), + "user.group.id": uint32(0), + "user.effective.id": uint32(0), + "user.saved.id": qp.Proc.Suid, + "user.saved.group.id": qp.Proc.Sgid, + "user.name": "root", + "user.group.name": "root", + "process.interactive": interactive, + "process.thread.capabilities.effective": capEff, + "process.thread.capabilities.permitted": capPer, + }, + } + + // Kprobe path depth is limited + if be != Kprobe { + cwd, err := os.Getwd() + require.NoError(t, err) + + cmdEvent.RootFields["process.working_directory"] = cwd + } + + return cmdEvent +} + +// getConfigForQuark enables quark and allows hashing so we can test +// the cached hasher. +func getConfigForQuark(be backend) map[string]interface{} { + config := map[string]interface{}{ + "module": system.ModuleName, + "datasets": []string{"process"}, + + "process.backend": "kernel_tracing", + } + quarkForceKprobe = be == Kprobe + + return config +} + +func skipIfNotRoot(t *testing.T) { + if os.Getuid() != 0 { + t.Skip("must be root") + } +} diff --git a/x-pack/auditbeat/module/system/process/quark_provider_other.go b/x-pack/auditbeat/module/system/process/quark_provider_other.go new file mode 100644 index 000000000000..513d7e9df514 --- /dev/null +++ b/x-pack/auditbeat/module/system/process/quark_provider_other.go @@ -0,0 +1,18 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build !linux || !(amd64 || arm64) || !cgo + +package process + +import ( + "errors" + + "github.com/elastic/beats/v7/metricbeat/mb" +) + +// NewFromQuark instantiates the module with quark's backend. +func NewFromQuark(ms MetricSet) (mb.MetricSet, error) { + return nil, errors.New("quark is only available on linux on amd64/arm64 and needs cgo") +} diff --git a/x-pack/auditbeat/seccomp_linux.go b/x-pack/auditbeat/seccomp_linux.go index 5dd05618d31c..dc8735f9b94b 100644 --- a/x-pack/auditbeat/seccomp_linux.go +++ b/x-pack/auditbeat/seccomp_linux.go @@ -43,5 +43,12 @@ func init() { ); err != nil { panic(err) } + + // The system/process dataset uses additional syscalls + if err := seccomp.ModifyDefaultPolicy(seccomp.AddSyscall, + "statx", + ); err != nil { + panic(err) + } } }