Skip to content

Commit

Permalink
refactor: Ensure cgroups return sorted slice
Browse files Browse the repository at this point in the history
* This wll return cgroups in a deterministic order which helps in unit and e2e tests

Signed-off-by: Mahendra Paipuri <[email protected]>
  • Loading branch information
mahendrapaipuri committed Oct 20, 2024
1 parent c077888 commit 8356fdf
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 38 deletions.
16 changes: 8 additions & 8 deletions pkg/collector/alloy_targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import (
var (
cgManager = CEEMSExporterApp.Flag(
"discoverer.alloy-targets.resource-manager",
"Discover Grafana Alloy targets from this resource manager [supported: slurm and libvirt].",
).Enum("slurm", "libvirt")
"Discover Grafana Alloy targets from this resource manager [supported: slurm].",
).Enum("slurm")
alloyTargetEnvVars = CEEMSExporterApp.Flag(
"discoverer.alloy-targets.env-var",
"Enable continuous profiling by Pyroscope only on the processes having any of these environment variables.",
"Enable continuous profiling by Grafana Alloy only on the processes having any of these environment variables.",
).Strings()
)

Expand Down Expand Up @@ -185,8 +185,8 @@ func (d *CEEMSAlloyTargetDiscoverer) discover() ([]Target, error) {
// Make targets from cgrpoups
var targets []Target

for uuid, procs := range dataPtr.cgroups {
for _, proc := range procs {
for _, cgroup := range dataPtr.cgroups {
for _, proc := range cgroup.procs {
exe, _ := proc.Executable()
comm, _ := proc.CmdLine()

Expand All @@ -197,14 +197,14 @@ func (d *CEEMSAlloyTargetDiscoverer) discover() ([]Target, error) {
}

target := Target{
Targets: []string{uuid},
Targets: []string{cgroup.id},
Labels: map[string]string{
"__process_pid__": strconv.FormatInt(int64(proc.PID), 10),
"__process_exe": exe,
"__process_commandline": strings.Join(comm, " "),
"__process_real_uid": strconv.FormatUint(realUID, 10),
"__process_effective_uid": strconv.FormatUint(effecUID, 10),
"service_name": uuid,
"service_name": cgroup.id,
},
}

Expand All @@ -230,7 +230,7 @@ func targetDiscoverer(data interface{}) error {
return security.ErrSecurityCtxDataAssertion
}

cgroups, err := cgroupProcs(d.procfs, d.cgroupManager.idRegex, d.targetEnvVars, d.cgroupManager.procFilter)
cgroups, err := getCgroups(d.procfs, d.cgroupManager.idRegex, d.targetEnvVars, d.cgroupManager.procFilter)
if err != nil {
return err
}
Expand Down
29 changes: 25 additions & 4 deletions pkg/collector/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"regexp"
"slices"
"strconv"
"strings"
"syscall"
Expand All @@ -18,6 +19,11 @@ var (
reParens = regexp.MustCompile(`\((.*)\)`)
)

type cgroup struct {
id string
procs []procfs.Proc
}

// SanitizeMetricName sanitize the given metric name by replacing invalid characters by underscores.
//
// OpenMetrics and the Prometheus exposition format require the metric name
Expand All @@ -33,8 +39,8 @@ func SanitizeMetricName(metricName string) string {
return metricNameRegex.ReplaceAllString(metricName, "_")
}

// cgroupProcs returns a map of active cgroups and processes contained in each cgroup.
func cgroupProcs(fs procfs.FS, idRegex *regexp.Regexp, targetEnvVars []string, procFilter func(string) bool) (map[string][]procfs.Proc, error) {
// getCgroups returns a slice of active cgroups and processes contained in each cgroup.
func getCgroups(fs procfs.FS, idRegex *regexp.Regexp, targetEnvVars []string, procFilter func(string) bool) ([]cgroup, error) {
// Get all active procs
allProcs, err := fs.AllProcs()
if err != nil {
Expand All @@ -46,7 +52,9 @@ func cgroupProcs(fs procfs.FS, idRegex *regexp.Regexp, targetEnvVars []string, p
return nil, errors.New("cgroup IDs cannot be retrieved due to empty regex")
}

cgroups := make(map[string][]procfs.Proc)
cgroupsMap := make(map[string][]procfs.Proc)

var cgroupIDs []string

for _, proc := range allProcs {
// Get cgroup ID from regex
Expand Down Expand Up @@ -119,7 +127,20 @@ func cgroupProcs(fs procfs.FS, idRegex *regexp.Regexp, targetEnvVars []string, p
}
}

cgroups[cgroupID] = append(cgroups[cgroupID], proc)
cgroupsMap[cgroupID] = append(cgroupsMap[cgroupID], proc)
cgroupIDs = append(cgroupIDs, cgroupID)
}

// Sort cgroupIDs and make slice of cgProcs
cgroups := make([]cgroup, len(cgroupsMap))

slices.Sort(cgroupIDs)

for icgroup, cgroupID := range slices.Compact(cgroupIDs) {
cgroups[icgroup] = cgroup{
id: cgroupID,
procs: cgroupsMap[cgroupID],
}
}

return cgroups, nil
Expand Down
22 changes: 11 additions & 11 deletions pkg/collector/perf.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,14 @@ type perfDiscovererSecurityCtxData struct {
procfs procfs.FS
cgroupManager *cgroupManager
targetEnvVars []string
cgroups map[string][]procfs.Proc
cgroups []cgroup
}

// perfProfilerSecurityCtxData contains the input/output data for
// opening/closing profilers inside security context.
type perfProfilerSecurityCtxData struct {
logger log.Logger
cgroups map[string][]procfs.Proc
cgroups []cgroup
activePIDs []int
perfHwProfilers map[int]*perf.HardwareProfiler
perfSwProfilers map[int]*perf.SoftwareProfiler
Expand Down Expand Up @@ -565,12 +565,12 @@ func (c *perfCollector) Update(ch chan<- prometheus.Metric, cgroupIDUUIDMap map[
wg.Add(len(cgroups))

// Update metrics in go routines for each cgroup
for cgroupID, procs := range cgroups {
for _, cgroup := range cgroups {
var uuid string
if cgroupIDUUIDMap != nil {
uuid = cgroupIDUUIDMap[cgroupID]
uuid = cgroupIDUUIDMap[cgroup.id]
} else {
uuid = cgroupID
uuid = cgroup.id
}

go func(u string, ps []procfs.Proc) {
Expand All @@ -587,7 +587,7 @@ func (c *perfCollector) Update(ch chan<- prometheus.Metric, cgroupIDUUIDMap map[
if err := c.updateCacheCounters(u, ps, ch); err != nil {
level.Error(c.logger).Log("msg", "failed to update cache counters", "uuid", u, "err", err)
}
}(uuid, procs)
}(uuid, cgroup.procs)
}

// Wait all go routines
Expand Down Expand Up @@ -828,7 +828,7 @@ func (c *perfCollector) updateCacheCounters(cgroupID string, procs []procfs.Proc

// discoverProcess returns a map of cgroup ID to procs. Depending on presence
// of targetEnvVars, this may be executed in a security context.
func (c *perfCollector) discoverProcess() (map[string][]procfs.Proc, error) {
func (c *perfCollector) discoverProcess() ([]cgroup, error) {
// Read discovered cgroups into data pointer
dataPtr := &perfDiscovererSecurityCtxData{
procfs: c.fs,
Expand Down Expand Up @@ -862,7 +862,7 @@ func (c *perfCollector) discoverProcess() (map[string][]procfs.Proc, error) {
}

// newProfilers open new perf profilers if they are not already in profilers map.
func (c *perfCollector) newProfilers(cgroups map[string][]procfs.Proc) []int {
func (c *perfCollector) newProfilers(cgroups []cgroup) []int {
dataPtr := &perfProfilerSecurityCtxData{
logger: c.logger,
cgroups: cgroups,
Expand Down Expand Up @@ -926,8 +926,8 @@ func openProfilers(data interface{}) error {

var activePIDs []int

for _, procs := range d.cgroups {
for _, proc := range procs {
for _, cgroup := range d.cgroups {
for _, proc := range cgroup.procs {
pid := proc.PID

activePIDs = append(activePIDs, pid)
Expand Down Expand Up @@ -1136,7 +1136,7 @@ func discoverer(data interface{}) error {
return security.ErrSecurityCtxDataAssertion
}

cgroups, err := cgroupProcs(d.procfs, d.cgroupManager.idRegex, d.targetEnvVars, d.cgroupManager.procFilter)
cgroups, err := getCgroups(d.procfs, d.cgroupManager.idRegex, d.targetEnvVars, d.cgroupManager.procFilter)
if err != nil {
return err
}
Expand Down
21 changes: 11 additions & 10 deletions pkg/collector/perf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestDiscoverProcess(t *testing.T) {
require.NoError(t, err)

// Discover processes
cgroupIDProcMap, err := collector.discoverProcess()
cgroups, err := collector.discoverProcess()
require.NoError(t, err)

// expected
Expand All @@ -112,15 +112,15 @@ func TestDiscoverProcess(t *testing.T) {

cgroupProcs := make(map[string][]int)

for cgroupID, procs := range cgroupIDProcMap {
cgroupIDs = append(cgroupIDs, cgroupID)
for _, cgroup := range cgroups {
cgroupIDs = append(cgroupIDs, cgroup.id)

var pids []int
for _, proc := range procs {
for _, proc := range cgroup.procs {
pids = append(pids, proc.PID)
}

cgroupProcs[cgroupID] = pids
cgroupProcs[cgroup.id] = pids
}

assert.ElementsMatch(t, cgroupIDs, expectedCgroupIDs)
Expand Down Expand Up @@ -156,10 +156,11 @@ func TestNewProfilers(t *testing.T) {
require.NoError(t, err)

// Use fake cgroupID for current process
cgroupIDProcMap := map[string][]procfs.Proc{
"1234": {
{
PID: os.Getpid(),
cgroups := []cgroup{
{
id: "1234",
procs: []procfs.Proc{
{PID: os.Getpid()},
},
},
}
Expand All @@ -176,7 +177,7 @@ func TestNewProfilers(t *testing.T) {
}()

// make new profilers
pids := collector.newProfilers(cgroupIDProcMap)
pids := collector.newProfilers(cgroups)
assert.ElementsMatch(t, pids, []int{os.Getpid()})

// update counters
Expand Down
10 changes: 5 additions & 5 deletions pkg/collector/rdma.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func (c *rdmaCollector) update(ch chan<- prometheus.Metric, cgroupIDUUIDMap map[
// procCgroups returns cgroup ID of all relevant processes.
func (c *rdmaCollector) procCgroups(cgroupIDUUIDMap map[string]string) (map[string]string, error) {
// First get cgroups and their associated procs
cgroups, err := cgroupProcs(c.procfs, c.cgroupManager.idRegex, nil, c.cgroupManager.procFilter)
cgroups, err := getCgroups(c.procfs, c.cgroupManager.idRegex, nil, c.cgroupManager.procFilter)
if err != nil {
level.Error(c.logger).Log("msg", "Failed to fetch active cgroups", "err", err)

Expand All @@ -426,15 +426,15 @@ func (c *rdmaCollector) procCgroups(cgroupIDUUIDMap map[string]string) (map[stri
// Make invert mapping of cgroups
procCgroup := make(map[string]string)

for cgroupID, procs := range cgroups {
for _, cgroup := range cgroups {
var uuid string
if cgroupIDUUIDMap != nil {
uuid = cgroupIDUUIDMap[cgroupID]
uuid = cgroupIDUUIDMap[cgroup.id]
} else {
uuid = cgroupID
uuid = cgroup.id
}

for _, proc := range procs {
for _, proc := range cgroup.procs {
p := strconv.FormatInt(int64(proc.PID), 10)
procCgroup[p] = uuid
}
Expand Down

0 comments on commit 8356fdf

Please sign in to comment.