From 812244dd76a3626dad8910c7582ef8864f3d69ae Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Thu, 20 Feb 2025 18:39:38 +0000 Subject: [PATCH 01/33] add initial perf ring package --- cmd/collector/go.mod | 8 -- cmd/collector/go.sum | 4 - cmd/cpi-count/go.mod | 13 -- cmd/cpi-count/go.sum | 13 -- cmd/prometheus_metrics/go.mod | 21 --- cmd/prometheus_metrics/go.sum | 31 ----- go.mod | 31 +++++ go.sum | 62 +++++++++ pkg/perf/README.md | 86 ++++++++++++ pkg/perf/ring.go | 202 ++++++++++++++++++++++++++++ pkg/perf/ring_test.go | 244 ++++++++++++++++++++++++++++++++++ 11 files changed, 625 insertions(+), 90 deletions(-) delete mode 100644 cmd/collector/go.mod delete mode 100644 cmd/collector/go.sum delete mode 100644 cmd/cpi-count/go.mod delete mode 100644 cmd/cpi-count/go.sum delete mode 100644 cmd/prometheus_metrics/go.mod delete mode 100644 cmd/prometheus_metrics/go.sum create mode 100644 go.mod create mode 100644 go.sum create mode 100644 pkg/perf/README.md create mode 100644 pkg/perf/ring.go create mode 100644 pkg/perf/ring_test.go diff --git a/cmd/collector/go.mod b/cmd/collector/go.mod deleted file mode 100644 index 166649f..0000000 --- a/cmd/collector/go.mod +++ /dev/null @@ -1,8 +0,0 @@ -module collector - -go 1.22.2 - -require ( - github.com/cilium/ebpf v0.17.3 // indirect - golang.org/x/sys v0.30.0 // indirect -) diff --git a/cmd/collector/go.sum b/cmd/collector/go.sum deleted file mode 100644 index f409c1a..0000000 --- a/cmd/collector/go.sum +++ /dev/null @@ -1,4 +0,0 @@ -github.com/cilium/ebpf v0.17.3 h1:FnP4r16PWYSE4ux6zN+//jMcW4nMVRvuTLVTvCjyyjg= -github.com/cilium/ebpf v0.17.3/go.mod h1:G5EDHij8yiLzaqn0WjyfJHvRa+3aDlReIaLVRMvOyJk= -golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= -golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/cmd/cpi-count/go.mod b/cmd/cpi-count/go.mod deleted file mode 100644 index ef88679..0000000 --- a/cmd/cpi-count/go.mod +++ /dev/null @@ -1,13 +0,0 @@ -module cpi-count - -go 1.22.9 - -require github.com/elastic/go-perf v0.0.0-20241029065020-30bec95324b8 - -require ( - github.com/davecgh/go-spew v1.1.1 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/stretchr/testify v1.10.0 // indirect - golang.org/x/sys v0.26.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect -) diff --git a/cmd/cpi-count/go.sum b/cmd/cpi-count/go.sum deleted file mode 100644 index a65d4cf..0000000 --- a/cmd/cpi-count/go.sum +++ /dev/null @@ -1,13 +0,0 @@ -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/elastic/go-perf v0.0.0-20241029065020-30bec95324b8 h1:FD01NjsTes0RxZVQ22ebNYJA4KDdInVnR9cn1hmaMwA= -github.com/elastic/go-perf v0.0.0-20241029065020-30bec95324b8/go.mod h1:Nt+pnRYvf0POC+7pXsrv8ubsEOSsaipJP0zlz1Ms1RM= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= -golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/cmd/prometheus_metrics/go.mod b/cmd/prometheus_metrics/go.mod deleted file mode 100644 index a065b9e..0000000 --- a/cmd/prometheus_metrics/go.mod +++ /dev/null @@ -1,21 +0,0 @@ -module memory-collector - -go 1.22.9 - -require ( - github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect - github.com/klauspost/compress v1.17.9 // indirect - github.com/kylelemons/godebug v1.1.0 // indirect - github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_golang v1.20.5 // indirect - github.com/prometheus/client_model v0.6.1 // indirect - github.com/prometheus/common v0.55.0 // indirect - github.com/prometheus/procfs v0.15.1 // indirect - github.com/stretchr/testify v1.10.0 // indirect - golang.org/x/sys v0.22.0 // indirect - google.golang.org/protobuf v1.34.2 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect -) diff --git a/cmd/prometheus_metrics/go.sum b/cmd/prometheus_metrics/go.sum deleted file mode 100644 index 55e39a6..0000000 --- a/cmd/prometheus_metrics/go.sum +++ /dev/null @@ -1,31 +0,0 @@ -github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= -github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= -github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= -github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= -github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= -github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= -github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= -github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y= -github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= -github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= -github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= -github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc= -github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= -github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= -github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= -golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= -google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..643ad16 --- /dev/null +++ b/go.mod @@ -0,0 +1,31 @@ +module github.com/unvariance/collector + +go 1.22.2 + +require ( + github.com/cilium/ebpf v0.17.3 + github.com/elastic/go-perf v0.0.0-20241029065020-30bec95324b8 + github.com/go-quicktest/qt v1.101.0 + github.com/prometheus/client_golang v1.21.0 + github.com/stretchr/testify v1.10.0 + golang.org/x/sys v0.30.0 +) + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/google/go-cmp v0.6.0 // indirect + github.com/klauspost/compress v1.17.11 // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.62.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect + github.com/rogpeppe/go-internal v1.11.0 // indirect + google.golang.org/protobuf v1.36.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..c110929 --- /dev/null +++ b/go.sum @@ -0,0 +1,62 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cilium/ebpf v0.17.3 h1:FnP4r16PWYSE4ux6zN+//jMcW4nMVRvuTLVTvCjyyjg= +github.com/cilium/ebpf v0.17.3/go.mod h1:G5EDHij8yiLzaqn0WjyfJHvRa+3aDlReIaLVRMvOyJk= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/elastic/go-perf v0.0.0-20241029065020-30bec95324b8 h1:FD01NjsTes0RxZVQ22ebNYJA4KDdInVnR9cn1hmaMwA= +github.com/elastic/go-perf v0.0.0-20241029065020-30bec95324b8/go.mod h1:Nt+pnRYvf0POC+7pXsrv8ubsEOSsaipJP0zlz1Ms1RM= +github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7eI= +github.com/go-quicktest/qt v1.101.0/go.mod h1:14Bz/f7NwaXPtdYEgzsx46kqSxVwTbzVZsDC26tQJow= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA= +github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= +github.com/jsimonetti/rtnetlink/v2 v2.0.1 h1:xda7qaHDSVOsADNouv7ukSuicKZO7GgVUCXxpaIEIlM= +github.com/jsimonetti/rtnetlink/v2 v2.0.1/go.mod h1:7MoNYNbb3UaDHtF8udiJo/RH6VsTKP1pqKLUTVCvToE= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/mdlayher/netlink v1.7.2 h1:/UtM3ofJap7Vl4QWCPDGXY8d3GIY2UGSDbK+QWmY8/g= +github.com/mdlayher/netlink v1.7.2/go.mod h1:xraEF7uJbxLhc5fpHL4cPe221LI2bdttWlU+ZGLfQSw= +github.com/mdlayher/socket v0.4.1 h1:eM9y2/jlbs1M615oshPQOHZzj6R6wMT7bX5NPiQvn2U= +github.com/mdlayher/socket v0.4.1/go.mod h1:cAqeGjoufqdxWkD7DkpyS+wcefOtmu5OQ8KuoJGIReA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.21.0 h1:DIsaGmiaBkSangBgMtWdNfxbMNdku5IK6iNhrEqWvdA= +github.com/prometheus/client_golang v1.21.0/go.mod h1:U9NM32ykUErtVBxdvD3zfi+EuFkkaBvMb09mIfe0Zgg= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io= +github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= +github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= +golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= +golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= +google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/perf/README.md b/pkg/perf/README.md new file mode 100644 index 0000000..fb4246a --- /dev/null +++ b/pkg/perf/README.md @@ -0,0 +1,86 @@ +# Perf Package + +This package provides a Go implementation of the Linux perf ring buffer interface, designed for efficient communication between kernel and userspace. It is particularly useful for collecting eBPF samples from per-CPU perf rings. + +## Layer 1: Basic PerfRing + +The basic PerfRing layer provides core functionality for working with contiguous memory containing shared structure and data. It implements: + +- Reading and writing to PerfRing +- Support for peeking record metadata +- Batched operations for efficient reading and writing +- Proper memory barrier handling for concurrent access + +### Key Components + +#### PerfRing + +The main structure that represents a perf ring buffer. It contains: +- Shared metadata page +- Data buffer +- Buffer mask for efficient modulo operations +- Head and tail pointers for reading/writing + +#### Operations + +- `InitContiguous`: Initialize a PerfRing using contiguous memory +- `StartWriteBatch`/`FinishWriteBatch`: Batch write operations +- `StartReadBatch`/`FinishReadBatch`: Batch read operations +- `Write`: Write data to the ring buffer +- `PeekSize`/`PeekType`: Examine record metadata without consuming +- `PeekCopy`: Copy data from the ring without consuming +- `Pop`: Consume the current record +- `BytesRemaining`: Get available bytes to read + +### Usage + +```go +// Initialize a ring buffer +data := make([]byte, pageSize*(1+nPages)) // 1 meta page + n data pages +ring, err := perf.InitContiguous(data, nPages, pageSize) +if err != nil { + // Handle error +} + +// Write data +ring.StartWriteBatch() +offset, err := ring.Write(data, eventType) +ring.FinishWriteBatch() + +// Read data +ring.StartReadBatch() +size, _ := ring.PeekSize() +buf := make([]byte, size) +ring.PeekCopy(buf, 0, uint16(size)) +ring.Pop() +ring.FinishReadBatch() +``` + +### Memory Layout + +The perf ring buffer consists of: +1. A metadata page containing shared information +2. A power-of-2 sized data buffer for the actual ring + +The data buffer is treated as a circular buffer where: +- Writers append to the tail +- Readers consume from the head +- Buffer wrapping is handled automatically + +### Thread Safety + +The implementation uses proper memory barriers through the metadata page to ensure thread safety between kernel writers and userspace readers. The batched operations help minimize cache line bouncing between processors. + +## Testing + +Run the tests with: + +```bash +go test -v ./pkg/perf +``` + +The test suite includes: +- Basic initialization tests +- Write and read operations +- Buffer wraparound handling +- Bytes remaining calculation \ No newline at end of file diff --git a/pkg/perf/ring.go b/pkg/perf/ring.go new file mode 100644 index 0000000..8da2c1e --- /dev/null +++ b/pkg/perf/ring.go @@ -0,0 +1,202 @@ +package perf + +import ( + "fmt" + "sync/atomic" + "unsafe" +) + +// PerfEventHeader represents the header of a perf event +type PerfEventHeader struct { + Type uint32 + Misc uint16 + Size uint16 +} + +// PerfRing represents a perf ring buffer with shared metadata and data pages +type PerfRing struct { + // Shared metadata page + meta *PerfEventMmapPage + // Data buffer + data []byte + // Mask for quick modulo operations (buffer size - 1) + bufMask uint64 + // Current head position for reading + head uint64 + // Current tail position for writing + tail uint64 +} + +// PerfEventMmapPage represents the shared metadata page +type PerfEventMmapPage struct { + Version uint32 // ABI version + Compat_version uint32 // Lowest compatible version + Pad1 [1024 - 8]byte // Pad to 1024 bytes + Data_head uint64 // Head in the data section + Data_tail uint64 // Tail in the data section + Data_offset uint64 // Offset of data section + Data_size uint64 // Size of data section + AuxOffset uint64 // Offset of aux section + AuxSize uint64 // Size of aux section +} + +// InitContiguous initializes a PerfRing using contiguous memory +func InitContiguous(data []byte, nPages uint32, pageSize uint64) (*PerfRing, error) { + if data == nil { + return nil, fmt.Errorf("data buffer cannot be nil") + } + + bufLen := uint64(nPages) * pageSize + if (bufLen&(bufLen-1)) != 0 || bufLen < 8 { + return nil, fmt.Errorf("buffer length must be a power of 2 and at least 8 bytes") + } + + // First page is metadata, rest is data + meta := (*PerfEventMmapPage)(unsafe.Pointer(&data[0])) + // if Data_offset is not given (older kernels), we need to skip a full page, otherwise we skip Data_offset bytes + dataStart := meta.Data_offset + if dataStart == 0 { + dataStart = pageSize + } + + ring := &PerfRing{ + meta: meta, + data: data[dataStart : dataStart+bufLen], + bufMask: bufLen - 1, + head: atomic.LoadUint64(&meta.Data_tail), + tail: atomic.LoadUint64(&meta.Data_head), + } + + return ring, nil +} + +// StartWriteBatch starts a write batch operation +func (r *PerfRing) StartWriteBatch() { + // Get the current tail position from shared memory using atomic load + r.head = atomic.LoadUint64(&r.meta.Data_tail) +} + +// Write writes data to the ring buffer with the given type +func (r *PerfRing) Write(data []byte, eventType uint32) (int, error) { + if len(data) == 0 { + return 0, fmt.Errorf("cannot write empty data") + } + + // Calculate total size including header, aligned to 8 bytes + alignedLen := ((uint32(len(data)) + uint32(unsafe.Sizeof(PerfEventHeader{})) + 7) & ^uint32(7)) + if alignedLen > uint32(r.bufMask) { + return 0, fmt.Errorf("data too large for buffer") + } + + // Check if there's enough space + if r.tail+uint64(alignedLen)-r.head > r.bufMask+1 { + return 0, fmt.Errorf("buffer full") + } + + // Write header + header := PerfEventHeader{ + Type: eventType, + Size: uint16(alignedLen), + } + headerPos := r.tail & r.bufMask + *(*PerfEventHeader)(unsafe.Pointer(&r.data[headerPos])) = header + + // Write data + dataPos := (r.tail + uint64(unsafe.Sizeof(header))) & r.bufMask + if dataPos+uint64(len(data)) <= uint64(len(r.data)) { + // Data fits without wrapping + copy(r.data[dataPos:], data) + } else { + // Data wraps around buffer end + firstPart := uint64(len(r.data)) - dataPos + copy(r.data[dataPos:], data[:firstPart]) + copy(r.data[0:], data[firstPart:]) + } + + r.tail += uint64(alignedLen) + return int(dataPos), nil +} + +// FinishWriteBatch commits the write batch +func (r *PerfRing) FinishWriteBatch() { + // Ensure all writes are visible before updating tail using atomic store + atomic.StoreUint64(&r.meta.Data_head, r.tail) +} + +// StartReadBatch starts a read batch operation +func (r *PerfRing) StartReadBatch() { + // Get the current head position from shared memory using atomic load + r.tail = atomic.LoadUint64(&r.meta.Data_head) +} + +// PeekSize returns the size of the next event in the ring buffer +func (r *PerfRing) PeekSize() (int, error) { + if r.tail == r.head { + return 0, fmt.Errorf("buffer empty") + } + + header := (*PerfEventHeader)(unsafe.Pointer(&r.data[r.head&r.bufMask])) + return int(header.Size - uint16(unsafe.Sizeof(PerfEventHeader{}))), nil +} + +// PeekType returns the type of the next event +func (r *PerfRing) PeekType() uint32 { + header := (*PerfEventHeader)(unsafe.Pointer(&r.data[r.head&r.bufMask])) + return header.Type +} + +// PeekCopy copies data from the ring buffer without consuming it +func (r *PerfRing) PeekCopy(buf []byte, offset uint16) error { + size, err := r.PeekSize() + if err != nil { + return err + } + + if len(buf) > int(size) { + return fmt.Errorf("buffer too small") + } + + startPos := (r.head + uint64(unsafe.Sizeof(PerfEventHeader{})) + uint64(offset)) & r.bufMask + endPos := (startPos + uint64(len(buf)) - 1) & r.bufMask + + if endPos < startPos { + // Data wraps around buffer end + firstLen := uint64(len(r.data)) - startPos + copy(buf, r.data[startPos:startPos+firstLen]) + copy(buf[firstLen:], r.data[:endPos+1]) + } else { + // Data is contiguous + copy(buf, r.data[startPos:startPos+uint64(len(buf))]) + } + + return nil +} + +// Pop consumes the current event +func (r *PerfRing) Pop() error { + if r.tail == r.head { + return fmt.Errorf("buffer empty") + } + + header := (*PerfEventHeader)(unsafe.Pointer(&r.data[r.head&r.bufMask])) + r.head += uint64(header.Size) + return nil +} + +// FinishReadBatch commits the read batch +func (r *PerfRing) FinishReadBatch() { + // Update tail position using atomic store + atomic.StoreUint64(&r.meta.Data_tail, r.head) +} + +// BytesRemaining returns the number of bytes available to read +func (r *PerfRing) BytesRemaining() uint32 { + begin := r.head & r.bufMask + end := r.tail & r.bufMask + + if end < begin { + return uint32((r.bufMask + 1) - begin + end) + } + + return uint32(end - begin) +} diff --git a/pkg/perf/ring_test.go b/pkg/perf/ring_test.go new file mode 100644 index 0000000..d542aab --- /dev/null +++ b/pkg/perf/ring_test.go @@ -0,0 +1,244 @@ +package perf + +import ( + "testing" + "unsafe" +) + +func TestInitContiguous(t *testing.T) { + pageSize := uint64(4096) + nPages := uint32(2) + data := make([]byte, pageSize*(1+uint64(nPages))) // 1 meta page + 2 data pages + + tests := []struct { + name string + data []byte + nPages uint32 + pageSize uint64 + wantError bool + }{ + { + name: "valid initialization", + data: data, + nPages: nPages, + pageSize: pageSize, + wantError: false, + }, + { + name: "nil data", + data: nil, + nPages: nPages, + pageSize: pageSize, + wantError: true, + }, + { + name: "invalid buffer size", + data: make([]byte, 7), // Less than minimum size + nPages: 1, + pageSize: 7, + wantError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ring, err := InitContiguous(tt.data, tt.nPages, tt.pageSize) + if tt.wantError { + if err == nil { + t.Error("expected error, got nil") + } + } else { + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if ring == nil { + t.Error("expected non-nil ring") + } + } + }) + } +} + +func TestWriteAndRead(t *testing.T) { + pageSize := uint64(4096) + nPages := uint32(2) + data := make([]byte, pageSize*(1+uint64(nPages))) + + ring, err := InitContiguous(data, nPages, pageSize) + if err != nil { + t.Fatalf("failed to initialize ring: %v", err) + } + + testData := []byte("test data") + eventType := uint32(1) + + // Start write batch + ring.StartWriteBatch() + + // Write data + offset, err := ring.Write(testData, eventType) + if err != nil { + t.Fatalf("failed to write data: %v", err) + } + + // Verify offset is within buffer bounds + if offset < 0 || offset >= int(pageSize*uint64(nPages)) { + t.Errorf("offset %d outside buffer bounds [0, %d)", offset, pageSize*uint64(nPages)) + } + + // Finish write batch + ring.FinishWriteBatch() + + // Start read batch + ring.StartReadBatch() + + // Check size + size, err := ring.PeekSize() + if err != nil { + t.Fatalf("failed to peek size: %v", err) + } + if size != (len(testData)+7)/8*8 { + t.Errorf("expected size %d, got %d", (len(testData)+7)/8*8, size) + } + + // Check type + if typ := ring.PeekType(); typ != eventType { + t.Errorf("expected type %d, got %d", eventType, typ) + } + + // Read data + readBuf := make([]byte, size) + err = ring.PeekCopy(readBuf, 0) + if err != nil { + t.Fatalf("failed to peek copy: %v", err) + } + + // Compare data + if string(readBuf[:len(testData)]) != string(testData) { + t.Errorf("expected data %q, got %q", testData, readBuf) + } + + // Pop the event + if err := ring.Pop(); err != nil { + t.Fatalf("failed to pop event: %v", err) + } + + // Check remaining bytes (should be 0) + if remaining := ring.BytesRemaining(); remaining != 0 { + t.Errorf("expected 0 bytes remaining, got %d", remaining) + } + + // Finish read batch + ring.FinishReadBatch() +} + +func TestBytesRemaining(t *testing.T) { + pageSize := uint64(4096) + nPages := uint32(2) + data := make([]byte, pageSize*(1+uint64(nPages))) + + ring, err := InitContiguous(data, nPages, pageSize) + if err != nil { + t.Fatalf("failed to initialize ring: %v", err) + } + + remaining := ring.BytesRemaining() + + if remaining != 0 { + t.Errorf("expected 0 bytes remaining in empty buffer, got %d", remaining) + } +} + +func TestWraparound(t *testing.T) { + pageSize := uint64(4096) + nPages := uint32(2) + data := make([]byte, pageSize*(1+uint64(nPages))) + + ring, err := InitContiguous(data, nPages, pageSize) + if err != nil { + t.Fatalf("failed to initialize ring: %v", err) + } + + // Write data that will wrap around the buffer + dataSize := int(pageSize) - int(unsafe.Sizeof(PerfEventHeader{})) - 10 + testData := make([]byte, dataSize) + for i := range testData { + testData[i] = byte(i) + } + + ring.StartWriteBatch() + + // Write first chunk + _, err = ring.Write(testData, 1) + if err != nil { + t.Fatalf("failed to write first chunk: %v", err) + } + + // Write second chunk + _, err = ring.Write(testData, 2) + if err != nil { + t.Fatalf("failed to write second chunk: %v", err) + } + + ring.FinishWriteBatch() + + // Read and verify both chunks + ring.StartReadBatch() + + // Read first chunk + readBuf := make([]byte, dataSize) + err = ring.PeekCopy(readBuf, 0) + if err != nil { + t.Fatalf("failed to read first chunk: %v", err) + } + for i := range readBuf { + if readBuf[i] != testData[i] { + t.Errorf("first chunk mismatch at index %d: expected %d, got %d", i, testData[i], readBuf[i]) + } + } + ring.Pop() + + ring.FinishReadBatch() + + // there should now be space for one more event, that would wrap around the buffer. Write it. + ring.StartWriteBatch() + _, err = ring.Write(testData, 3) + if err != nil { + t.Fatalf("failed to write third chunk: %v", err) + } + ring.FinishWriteBatch() + + // Now read the second and third chunks and verify they are correct + ring.StartReadBatch() + + // Read second chunk + err = ring.PeekCopy(readBuf, 0) + if err != nil { + t.Fatalf("failed to read second chunk: %v", err) + } + for i := range readBuf { + if readBuf[i] != testData[i] { + t.Errorf("second chunk mismatch at index %d: expected %d, got %d", i, testData[i], readBuf[i]) + } + } + ring.Pop() + + // Read third chunk + err = ring.PeekCopy(readBuf, 0) + if err != nil { + t.Fatalf("failed to read third chunk: %v", err) + } + for i := range readBuf { + if readBuf[i] != testData[i] { + t.Errorf("third chunk mismatch at index %d: expected %d, got %d", i, testData[i], readBuf[i]) + } + } + ring.Pop() + + ring.FinishReadBatch() + + // ring should be empty now + if remaining := ring.BytesRemaining(); remaining != 0 { + t.Errorf("expected 0 bytes remaining, got %d", remaining) + } +} From 8df8b0cbf89d3d39f2ee70091318707d8bdc10ef Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Thu, 20 Feb 2025 18:50:57 +0000 Subject: [PATCH 02/33] fix build after moving go.mod to root --- .github/workflows/ci-prometheus-metrics.yaml | 2 +- cmd/prometheus_metrics/Dockerfile | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci-prometheus-metrics.yaml b/.github/workflows/ci-prometheus-metrics.yaml index 1f1d439..9fb7c77 100644 --- a/.github/workflows/ci-prometheus-metrics.yaml +++ b/.github/workflows/ci-prometheus-metrics.yaml @@ -44,7 +44,7 @@ jobs: push: ${{ github.event_name != 'pull_request' }} tags: ${{ steps.docker-metadata.outputs.tags }} labels: ${{ steps.docker-metadata.outputs.labels }} - context: cmd/prometheus_metrics + context: . file: cmd/prometheus_metrics/Dockerfile platforms: linux/amd64,linux/arm64 cache-from: type=registry,ref=ghcr.io/${{ github.repository }}/prometheus-metrics:cache diff --git a/cmd/prometheus_metrics/Dockerfile b/cmd/prometheus_metrics/Dockerfile index 96210c1..16d29fc 100644 --- a/cmd/prometheus_metrics/Dockerfile +++ b/cmd/prometheus_metrics/Dockerfile @@ -14,10 +14,10 @@ COPY go.mod go.sum ./ RUN go mod download # Copy the source code -COPY . . +COPY cmd/prometheus_metrics/ cmd/prometheus_metrics/ # Build the application -RUN go build -o prometheus-metrics +RUN go build -o prometheus-metrics ./cmd/prometheus_metrics # Stage 2: Create a lightweight production image FROM alpine:latest From 53645ad942e97dfeec6e16d82d896b5cbd70daf5 Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Thu, 20 Feb 2025 19:01:59 +0000 Subject: [PATCH 03/33] fix cpi-count build after moving go.mod to root --- .github/workflows/ci-cpi-count.yaml | 2 +- cmd/cpi-count/Dockerfile | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci-cpi-count.yaml b/.github/workflows/ci-cpi-count.yaml index ecac719..f93400d 100644 --- a/.github/workflows/ci-cpi-count.yaml +++ b/.github/workflows/ci-cpi-count.yaml @@ -44,7 +44,7 @@ jobs: push: ${{ github.event_name != 'pull_request' }} tags: ${{ steps.docker-metadata.outputs.tags }} labels: ${{ steps.docker-metadata.outputs.labels }} - context: cmd/cpi-count + context: . file: cmd/cpi-count/Dockerfile platforms: linux/amd64,linux/arm64 cache-from: type=registry,ref=ghcr.io/${{ github.repository }}/cpi-count:cache # Need to add the repository name diff --git a/cmd/cpi-count/Dockerfile b/cmd/cpi-count/Dockerfile index e0edd76..9f00010 100644 --- a/cmd/cpi-count/Dockerfile +++ b/cmd/cpi-count/Dockerfile @@ -14,10 +14,10 @@ COPY go.mod go.sum ./ RUN go mod download # Copy the source code -COPY . . +COPY cmd/cpi-count/ cmd/cpi-count/ # Build the application -RUN go build -o cpi-count +RUN go build -o cpi-count ./cmd/cpi-count # Stage 2: Create a lightweight production image FROM alpine:latest From 9f16ed9e4abdd5e8873bedf33942d55f6def5ffb Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Thu, 20 Feb 2025 19:34:21 +0000 Subject: [PATCH 04/33] add ring storage: memory and mmap --- pkg/perf/README.md | 95 +++++++++++++++++++++----- pkg/perf/storage.go | 141 +++++++++++++++++++++++++++++++++++++++ pkg/perf/storage_test.go | 118 ++++++++++++++++++++++++++++++++ 3 files changed, 338 insertions(+), 16 deletions(-) create mode 100644 pkg/perf/storage.go create mode 100644 pkg/perf/storage_test.go diff --git a/pkg/perf/README.md b/pkg/perf/README.md index fb4246a..7a17c2e 100644 --- a/pkg/perf/README.md +++ b/pkg/perf/README.md @@ -32,7 +32,67 @@ The main structure that represents a perf ring buffer. It contains: - `Pop`: Consume the current record - `BytesRemaining`: Get available bytes to read -### Usage +## Layer 2: Storage Layer + +The storage layer provides different implementations for managing the underlying memory of perf ring buffers. It focuses solely on memory management and perf event configuration. + +### RingStorage Interface + +The common interface implemented by all storage types: +```go +type RingStorage interface { + Data() []byte + NumDataPages() uint32 + PageSize() uint64 + Close() error + FileDescriptor() int +} +``` + +### Memory-based Storage + +`MemoryRingStorage` provides a simple memory-based implementation useful for: +- Testing +- Inter-thread communication +- Scenarios not requiring kernel interaction + +```go +storage, err := NewMemoryRingStorage(nPages) +``` + +### Mmap-based Storage + +`MmapRingStorage` provides kernel integration through: +- `perf_event_open` syscall +- Memory mapping of ring buffer +- Support for BPF program output +- Configurable watermark settings + +```go +// Create mmap-based storage with watermark configuration +storage, err := NewMmapRingStorage( + cpu, // CPU to monitor (-1 for any CPU) + nPages, // Number of data pages + watermarkBytes // Bytes to accumulate before waking up (0 for every event) +) +``` + +Features: +- Configurable watermark for event batching +- Proper cleanup with finalizers +- Integration with BPF program output + +### Memory Layout + +The storage layer manages: +1. Metadata page (perf event shared page) +2. Data pages (ring buffer) +3. Memory mapping and permissions +4. Page size alignment + +## Usage + +### Basic Usage ```go // Initialize a ring buffer @@ -56,20 +116,23 @@ ring.Pop() ring.FinishReadBatch() ``` -### Memory Layout - -The perf ring buffer consists of: -1. A metadata page containing shared information -2. A power-of-2 sized data buffer for the actual ring - -The data buffer is treated as a circular buffer where: -- Writers append to the tail -- Readers consume from the head -- Buffer wrapping is handled automatically +### Using Mmap Storage -### Thread Safety +```go +// Create mmap-based storage that wakes up after accumulating 4KB of data +storage, err := NewMmapRingStorage(0, 8, 4096) +if err != nil { + // Handle error +} +defer storage.Close() -The implementation uses proper memory barriers through the metadata page to ensure thread safety between kernel writers and userspace readers. The batched operations help minimize cache line bouncing between processors. +// Or wake up on every event +storage, err := NewMmapRingStorage(0, 8, 0) +if err != nil { + // Handle error +} +defer storage.Close() +``` ## Testing @@ -81,6 +144,6 @@ go test -v ./pkg/perf The test suite includes: - Basic initialization tests -- Write and read operations -- Buffer wraparound handling -- Bytes remaining calculation \ No newline at end of file +- Storage implementation tests +- Watermark configuration tests +- Error cases and cleanup \ No newline at end of file diff --git a/pkg/perf/storage.go b/pkg/perf/storage.go new file mode 100644 index 0000000..2de8854 --- /dev/null +++ b/pkg/perf/storage.go @@ -0,0 +1,141 @@ +package perf + +import ( + "fmt" + "os" + "runtime" + + "golang.org/x/sys/unix" +) + +// RingStorage defines the interface for perf ring buffer storage +type RingStorage interface { + // Data returns the raw data buffer containing metadata page and data pages + Data() []byte + // NumDataPages returns the number of data pages in the ring buffer + NumDataPages() uint32 + // PageSize returns the system page size + PageSize() uint64 + // Close releases any resources associated with the storage + Close() error + // FileDescriptor returns the file descriptor if this is a perf event storage, or -1 otherwise + FileDescriptor() int +} + +// MemoryRingStorage implements RingStorage using regular memory allocation +// This is useful for testing and inter-thread communication +type MemoryRingStorage struct { + data []byte + nDataPages uint32 + pageSize uint64 +} + +// NewMemoryRingStorage creates a new memory-based ring storage +func NewMemoryRingStorage(nPages uint32) (*MemoryRingStorage, error) { + pageSize := uint64(os.Getpagesize()) + totalSize := pageSize * (1 + uint64(nPages)) // 1 metadata page + data pages + + data := make([]byte, totalSize) + storage := &MemoryRingStorage{ + data: data, + nDataPages: nPages, + pageSize: pageSize, + } + + return storage, nil +} + +func (s *MemoryRingStorage) Data() []byte { return s.data } +func (s *MemoryRingStorage) NumDataPages() uint32 { return s.nDataPages } +func (s *MemoryRingStorage) PageSize() uint64 { return s.pageSize } +func (s *MemoryRingStorage) Close() error { return nil } +func (s *MemoryRingStorage) FileDescriptor() int { return -1 } + +// MmapRingStorage implements RingStorage using perf_event_open and mmap +type MmapRingStorage struct { + data []byte + nDataPages uint32 + pageSize uint64 + fd int +} + +// NewMmapRingStorage creates a new mmap-based ring storage +// cpu: the CPU to monitor (-1 for any CPU) +// nPages: number of data pages in the ring buffer +// nWatermarkBytes: number of bytes to wait before waking up. If 0, wake up on every event. +func NewMmapRingStorage(cpu int, nPages uint32, nWatermarkBytes uint32) (*MmapRingStorage, error) { + pageSize := uint64(os.Getpagesize()) + + // Configure perf event attributes + attr := unix.PerfEventAttr{ + Type: unix.PERF_TYPE_SOFTWARE, + Config: unix.PERF_COUNT_SW_BPF_OUTPUT, + Sample_type: unix.PERF_SAMPLE_RAW, + } + + // Configure watermark behavior + if nWatermarkBytes > 0 { + attr.Bits = unix.PerfBitWatermark + attr.Wakeup = nWatermarkBytes + } else { + attr.Wakeup = 1 // Wake up on every event + } + + // Open perf event + fd, err := unix.PerfEventOpen(&attr, -1, cpu, -1, unix.PERF_FLAG_FD_CLOEXEC) + if err != nil { + return nil, fmt.Errorf("perf_event_open failed: %w", err) + } + + // Set up cleanup in case of errors + success := false + defer func() { + if !success { + unix.Close(fd) + } + }() + + // Calculate total size and mmap the buffer + totalSize := pageSize * (1 + uint64(nPages)) // 1 metadata page + data pages + data, err := unix.Mmap(fd, 0, int(totalSize), unix.PROT_READ|unix.PROT_WRITE, unix.MAP_SHARED) + if err != nil { + return nil, fmt.Errorf("mmap failed: %w", err) + } + + storage := &MmapRingStorage{ + data: data, + nDataPages: nPages, + pageSize: pageSize, + fd: fd, + } + + // Set up finalizer to ensure cleanup + runtime.SetFinalizer(storage, (*MmapRingStorage).Close) + success = true + return storage, nil +} + +func (s *MmapRingStorage) Data() []byte { return s.data } +func (s *MmapRingStorage) NumDataPages() uint32 { return s.nDataPages } +func (s *MmapRingStorage) PageSize() uint64 { return s.pageSize } +func (s *MmapRingStorage) FileDescriptor() int { return s.fd } + +// Close releases the mmap'd memory and closes the file descriptor +func (s *MmapRingStorage) Close() error { + if s.data != nil { + if err := unix.Munmap(s.data); err != nil { + return fmt.Errorf("munmap failed: %w", err) + } + s.data = nil + } + + if s.fd != -1 { + if err := unix.Close(s.fd); err != nil { + return fmt.Errorf("close failed: %w", err) + } + s.fd = -1 + } + + runtime.SetFinalizer(s, nil) + return nil +} diff --git a/pkg/perf/storage_test.go b/pkg/perf/storage_test.go new file mode 100644 index 0000000..edd82fc --- /dev/null +++ b/pkg/perf/storage_test.go @@ -0,0 +1,118 @@ +package perf + +import ( + "runtime" + "testing" + + "golang.org/x/sys/unix" +) + +func TestMemoryRingStorage(t *testing.T) { + nPages := uint32(2) + storage, err := NewMemoryRingStorage(nPages) + if err != nil { + t.Fatalf("failed to create memory storage: %v", err) + } + defer storage.Close() + + // Check basic properties + if storage.NumDataPages() != nPages { + t.Errorf("expected %d pages, got %d", nPages, storage.NumDataPages()) + } + + if storage.PageSize() != uint64(unix.Getpagesize()) { + t.Errorf("expected page size %d, got %d", unix.Getpagesize(), storage.PageSize()) + } + + expectedSize := storage.PageSize() * (1 + uint64(nPages)) + if uint64(len(storage.Data())) != expectedSize { + t.Errorf("expected data size %d, got %d", expectedSize, len(storage.Data())) + } + + if fd := storage.FileDescriptor(); fd != -1 { + t.Errorf("expected file descriptor -1, got %d", fd) + } +} + +func TestMmapRingStorage(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("skipping test on non-linux platform") + } + + nPages := uint32(2) + storage, err := NewMmapRingStorage(0, nPages, 0) // Wake up on every event + if err != nil { + t.Fatalf("failed to create mmap storage: %v", err) + } + defer storage.Close() + + // Check basic properties + if storage.NumDataPages() != nPages { + t.Errorf("expected %d pages, got %d", nPages, storage.NumDataPages()) + } + + if storage.PageSize() != uint64(unix.Getpagesize()) { + t.Errorf("expected page size %d, got %d", unix.Getpagesize(), storage.PageSize()) + } + + expectedSize := storage.PageSize() * (1 + uint64(nPages)) + if uint64(len(storage.Data())) != expectedSize { + t.Errorf("expected data size %d, got %d", expectedSize, len(storage.Data())) + } + + if fd := storage.FileDescriptor(); fd <= 0 { + t.Errorf("expected valid file descriptor, got %d", fd) + } +} + +func TestMmapRingStorageClose(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("skipping test on non-linux platform") + } + + nPages := uint32(2) + storage, err := NewMmapRingStorage(0, nPages, 0) + if err != nil { + t.Fatalf("failed to create mmap storage: %v", err) + } + + // Test double close + if err := storage.Close(); err != nil { + t.Errorf("first close failed: %v", err) + } + + if err := storage.Close(); err != nil { + t.Errorf("second close failed: %v", err) + } +} + +func TestMmapRingStorageWatermark(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("skipping test on non-linux platform") + } + + tests := []struct { + name string + watermarkBytes uint32 + }{ + { + name: "wake up on every event", + watermarkBytes: 0, + }, + { + name: "wake up after 4096 bytes", + watermarkBytes: 4096, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + nPages := uint32(2) + storage, err := NewMmapRingStorage(0, nPages, tt.watermarkBytes) + if err != nil { + t.Fatalf("failed to create mmap storage: %v", err) + } + defer storage.Close() + }) + } +} From 6bb08fd9683539bdfb2611c44631a7517acdf53d Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Thu, 20 Feb 2025 19:47:26 +0000 Subject: [PATCH 05/33] perf: Add error constants for ring buffer operations --- pkg/perf/ring.go | 35 ++++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/pkg/perf/ring.go b/pkg/perf/ring.go index 8da2c1e..e4b5d79 100644 --- a/pkg/perf/ring.go +++ b/pkg/perf/ring.go @@ -1,11 +1,28 @@ package perf import ( - "fmt" + "errors" "sync/atomic" "unsafe" ) +var ( + // ErrInvalidBufferLength is returned when the buffer size is invalid (not power of 2 or too small) + ErrInvalidBufferLength = errors.New("buffer length must be a power of 2 and at least 8 bytes") + // ErrNilBuffer is returned when a nil buffer is provided + ErrNilBuffer = errors.New("data buffer cannot be nil") + // ErrNoSpace is returned when trying to write to a full ring buffer + ErrNoSpace = errors.New("buffer full") + // ErrBufferEmpty is returned when trying to read from an empty ring buffer + ErrBufferEmpty = errors.New("buffer empty") + // ErrCannotFit is returned when trying to write data larger than the buffer, this data can never fit in the buffer + ErrCannotFit = errors.New("data too large for buffer") + // ErrEmptyWrite is returned when trying to write empty data + ErrEmptyWrite = errors.New("cannot write empty data") + // ErrSizeExceeded is returned when trying to read too much data + ErrSizeExceeded = errors.New("requested read larger than data") +) + // PerfEventHeader represents the header of a perf event type PerfEventHeader struct { Type uint32 @@ -43,12 +60,12 @@ type PerfEventMmapPage struct { // InitContiguous initializes a PerfRing using contiguous memory func InitContiguous(data []byte, nPages uint32, pageSize uint64) (*PerfRing, error) { if data == nil { - return nil, fmt.Errorf("data buffer cannot be nil") + return nil, ErrNilBuffer } bufLen := uint64(nPages) * pageSize if (bufLen&(bufLen-1)) != 0 || bufLen < 8 { - return nil, fmt.Errorf("buffer length must be a power of 2 and at least 8 bytes") + return nil, ErrInvalidBufferLength } // First page is metadata, rest is data @@ -79,18 +96,18 @@ func (r *PerfRing) StartWriteBatch() { // Write writes data to the ring buffer with the given type func (r *PerfRing) Write(data []byte, eventType uint32) (int, error) { if len(data) == 0 { - return 0, fmt.Errorf("cannot write empty data") + return 0, ErrEmptyWrite } // Calculate total size including header, aligned to 8 bytes alignedLen := ((uint32(len(data)) + uint32(unsafe.Sizeof(PerfEventHeader{})) + 7) & ^uint32(7)) if alignedLen > uint32(r.bufMask) { - return 0, fmt.Errorf("data too large for buffer") + return 0, ErrCannotFit } // Check if there's enough space if r.tail+uint64(alignedLen)-r.head > r.bufMask+1 { - return 0, fmt.Errorf("buffer full") + return 0, ErrNoSpace } // Write header @@ -132,7 +149,7 @@ func (r *PerfRing) StartReadBatch() { // PeekSize returns the size of the next event in the ring buffer func (r *PerfRing) PeekSize() (int, error) { if r.tail == r.head { - return 0, fmt.Errorf("buffer empty") + return 0, ErrBufferEmpty } header := (*PerfEventHeader)(unsafe.Pointer(&r.data[r.head&r.bufMask])) @@ -153,7 +170,7 @@ func (r *PerfRing) PeekCopy(buf []byte, offset uint16) error { } if len(buf) > int(size) { - return fmt.Errorf("buffer too small") + return ErrSizeExceeded } startPos := (r.head + uint64(unsafe.Sizeof(PerfEventHeader{})) + uint64(offset)) & r.bufMask @@ -175,7 +192,7 @@ func (r *PerfRing) PeekCopy(buf []byte, offset uint16) error { // Pop consumes the current event func (r *PerfRing) Pop() error { if r.tail == r.head { - return fmt.Errorf("buffer empty") + return ErrBufferEmpty } header := (*PerfEventHeader)(unsafe.Pointer(&r.data[r.head&r.bufMask])) From 52daf05ce367f3a2543dace91e763cbc455a9569 Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Thu, 20 Feb 2025 19:49:59 +0000 Subject: [PATCH 06/33] ci: add golang package tests --- .github/workflows/test-go-packages.yaml | 54 +++++++++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 .github/workflows/test-go-packages.yaml diff --git a/.github/workflows/test-go-packages.yaml b/.github/workflows/test-go-packages.yaml new file mode 100644 index 0000000..827887d --- /dev/null +++ b/.github/workflows/test-go-packages.yaml @@ -0,0 +1,54 @@ +name: Test Go Packages + +on: + push: + branches: [ main ] + paths: + - 'pkg/**/*.go' + - 'go.mod' + - 'go.sum' + pull_request: + branches: [ main ] + paths: + - 'pkg/**/*.go' + - 'go.mod' + - 'go.sum' + +jobs: + test: + name: Run Go Tests + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: '1.21' + check-latest: true + cache: true + + - name: Install dependencies + run: go mod download + + - name: Run tests + run: | + # Run tests for all packages under pkg/ + go test -v -race ./pkg/... + + - name: Run linter + uses: golangci/golangci-lint-action@v3 + with: + version: latest + args: --timeout=5m + working-directory: pkg/ + + - name: Check formatting + run: | + # Check if any .go files are not properly formatted + if [ -n "$(gofmt -l ./pkg)" ]; then + echo "The following files are not properly formatted:" + gofmt -l ./pkg + exit 1 + fi \ No newline at end of file From d0f36eb366a57ff00568e34dd5d13c1a140889b7 Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Thu, 20 Feb 2025 19:55:26 +0000 Subject: [PATCH 07/33] add permissions to the perf event tests in storage.go --- .github/workflows/test-go-packages.yaml | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test-go-packages.yaml b/.github/workflows/test-go-packages.yaml index 827887d..00f0b00 100644 --- a/.github/workflows/test-go-packages.yaml +++ b/.github/workflows/test-go-packages.yaml @@ -32,10 +32,22 @@ jobs: - name: Install dependencies run: go mod download - - name: Run tests + - name: Install libcap + run: sudo apt-get update && sudo apt-get install -y libcap2-bin + + - name: Run perf package tests + run: | + # Compile the test binary + go test -c ./pkg/perf -o perf.test + # Add CAP_PERFMON capability to the test binary + sudo setcap cap_perfmon+ep perf.test + # Run the perf tests with the permissioned binary + ./perf.test -test.v -test.race + + - name: Run other package tests run: | - # Run tests for all packages under pkg/ - go test -v -race ./pkg/... + # Run tests for all packages except perf + go test -v -race $(go list ./pkg/... | grep -v pkg/perf) - name: Run linter uses: golangci/golangci-lint-action@v3 From 7b976bd5b6665c473efba399b5c8c07670396a7e Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Thu, 20 Feb 2025 19:57:55 +0000 Subject: [PATCH 08/33] enable manual launching of go tests --- .github/workflows/test-go-packages.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/test-go-packages.yaml b/.github/workflows/test-go-packages.yaml index 00f0b00..76e83f6 100644 --- a/.github/workflows/test-go-packages.yaml +++ b/.github/workflows/test-go-packages.yaml @@ -13,6 +13,7 @@ on: - 'pkg/**/*.go' - 'go.mod' - 'go.sum' + workflow_dispatch: jobs: test: From 0dba1e2e5c9b19fcfafc579e4842b5d8ddb56983 Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Thu, 20 Feb 2025 20:03:31 +0000 Subject: [PATCH 09/33] use -race flag when building the test --- .github/workflows/test-go-packages.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test-go-packages.yaml b/.github/workflows/test-go-packages.yaml index 76e83f6..3da57d5 100644 --- a/.github/workflows/test-go-packages.yaml +++ b/.github/workflows/test-go-packages.yaml @@ -39,11 +39,11 @@ jobs: - name: Run perf package tests run: | # Compile the test binary - go test -c ./pkg/perf -o perf.test + go test -c ./pkg/perf -race -o perf.test # Add CAP_PERFMON capability to the test binary sudo setcap cap_perfmon+ep perf.test # Run the perf tests with the permissioned binary - ./perf.test -test.v -test.race + ./perf.test -test.v - name: Run other package tests run: | From 5ed2bd6e4ec4f1c333eb6b96ef730d6900fce1df Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Thu, 20 Feb 2025 20:08:29 +0000 Subject: [PATCH 10/33] set perf_event_paranoid to enable perf --- .github/workflows/test-go-packages.yaml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/.github/workflows/test-go-packages.yaml b/.github/workflows/test-go-packages.yaml index 3da57d5..f362220 100644 --- a/.github/workflows/test-go-packages.yaml +++ b/.github/workflows/test-go-packages.yaml @@ -42,6 +42,16 @@ jobs: go test -c ./pkg/perf -race -o perf.test # Add CAP_PERFMON capability to the test binary sudo setcap cap_perfmon+ep perf.test + # Verfiy capability + getcap perf.test + + # Check if perf_event_paranoid is restricting access + cat /proc/sys/kernel/perf_event_paranoid + # Set perf_event_paranoid to 1 + echo 1 | sudo tee /proc/sys/kernel/perf_event_paranoid + # Verify perf_event_paranoid + cat /proc/sys/kernel/perf_event_paranoid + # Run the perf tests with the permissioned binary ./perf.test -test.v From 2b4a9f2b997a45f4cf2e57ea866b05c8a5b0f9bb Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Thu, 20 Feb 2025 20:10:46 +0000 Subject: [PATCH 11/33] only run the other package tests if those tests are not empty --- .github/workflows/test-go-packages.yaml | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test-go-packages.yaml b/.github/workflows/test-go-packages.yaml index f362220..19225f3 100644 --- a/.github/workflows/test-go-packages.yaml +++ b/.github/workflows/test-go-packages.yaml @@ -44,7 +44,7 @@ jobs: sudo setcap cap_perfmon+ep perf.test # Verfiy capability getcap perf.test - + # Check if perf_event_paranoid is restricting access cat /proc/sys/kernel/perf_event_paranoid # Set perf_event_paranoid to 1 @@ -57,8 +57,10 @@ jobs: - name: Run other package tests run: | - # Run tests for all packages except perf - go test -v -race $(go list ./pkg/... | grep -v pkg/perf) + # Run tests for all packages except perf, only if there are other packages + if [ -n "$(go list ./pkg/... | grep -v pkg/perf)" ]; then + go test -v -race $(go list ./pkg/... | grep -v pkg/perf) + fi - name: Run linter uses: golangci/golangci-lint-action@v3 From 55d9b13e25d3beea580958a383d50e3569fc4dfd Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Thu, 20 Feb 2025 20:11:30 +0000 Subject: [PATCH 12/33] run workflow on changes to the CI workflow --- .github/workflows/test-go-packages.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/test-go-packages.yaml b/.github/workflows/test-go-packages.yaml index 19225f3..f2d1992 100644 --- a/.github/workflows/test-go-packages.yaml +++ b/.github/workflows/test-go-packages.yaml @@ -7,6 +7,7 @@ on: - 'pkg/**/*.go' - 'go.mod' - 'go.sum' + - '.github/workflows/test-go-packages.yaml' pull_request: branches: [ main ] paths: From b53d1298619b39d6c77c36690233786b7a75655e Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Thu, 20 Feb 2025 20:20:40 +0000 Subject: [PATCH 13/33] fix checking Pop for errors --- pkg/perf/ring_test.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/pkg/perf/ring_test.go b/pkg/perf/ring_test.go index d542aab..67dd1ba 100644 --- a/pkg/perf/ring_test.go +++ b/pkg/perf/ring_test.go @@ -196,7 +196,10 @@ func TestWraparound(t *testing.T) { t.Errorf("first chunk mismatch at index %d: expected %d, got %d", i, testData[i], readBuf[i]) } } - ring.Pop() + err = ring.Pop() + if err != nil { + t.Fatalf("failed to pop first chunk: %v", err) + } ring.FinishReadBatch() @@ -221,7 +224,10 @@ func TestWraparound(t *testing.T) { t.Errorf("second chunk mismatch at index %d: expected %d, got %d", i, testData[i], readBuf[i]) } } - ring.Pop() + err = ring.Pop() + if err != nil { + t.Fatalf("failed to pop second chunk: %v", err) + } // Read third chunk err = ring.PeekCopy(readBuf, 0) @@ -233,7 +239,10 @@ func TestWraparound(t *testing.T) { t.Errorf("third chunk mismatch at index %d: expected %d, got %d", i, testData[i], readBuf[i]) } } - ring.Pop() + err = ring.Pop() + if err != nil { + t.Fatalf("failed to pop third chunk: %v", err) + } ring.FinishReadBatch() From a12041ccb9126123f7fbdf52b56fa260136f6526 Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Thu, 20 Feb 2025 22:19:32 +0000 Subject: [PATCH 14/33] devcontainer: add the golang language server --- Dockerfile.devcontainer | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Dockerfile.devcontainer b/Dockerfile.devcontainer index 5af287f..2f6e87b 100644 --- a/Dockerfile.devcontainer +++ b/Dockerfile.devcontainer @@ -34,6 +34,9 @@ RUN apt-get update && apt-get install -y \ ${EBPF_PACKAGES} \ && rm -rf /var/lib/apt/lists/* +# For Go development +RUN go install -v golang.org/x/tools/gopls@latest + # Create architecture-specific symlink for asm RUN arch=$(uname -m) && \ case ${arch} in \ From 370cfc72474e69096f30f18ee55258ae1c140e2e Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Fri, 21 Feb 2025 03:17:39 +0000 Subject: [PATCH 15/33] perf: Add reader implementation with multi-ring event sorting --- pkg/perf/README.md | 123 +++++++++++---- pkg/perf/reader.go | 230 +++++++++++++++++++++++++++ pkg/perf/reader_test.go | 341 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 667 insertions(+), 27 deletions(-) create mode 100644 pkg/perf/reader.go create mode 100644 pkg/perf/reader_test.go diff --git a/pkg/perf/README.md b/pkg/perf/README.md index 7a17c2e..6e58451 100644 --- a/pkg/perf/README.md +++ b/pkg/perf/README.md @@ -90,60 +90,129 @@ The storage layer manages: 3. Memory mapping and permissions 4. Page size alignment -## Usage +## Layer 3: Reader Layer + +The reader layer provides functionality for reading from multiple CPU rings and sorting events by timestamp. This layer is particularly useful when dealing with multi-CPU systems where events need to be processed in chronological order. + +### Record Format Requirements + +For `PERF_RECORD_SAMPLE` records, each record must include a timestamp as its first 8 bytes in the form of a uint64. This timestamp is used by the reader to maintain chronological order when reading from multiple rings. The timestamp should be placed immediately after the perf event header in the record data. + +For example: +``` +[perf_event_header] // Standard perf event header +[uint64 timestamp] // 8-byte timestamp required for ordering +[remaining data...] // Rest of the record data +``` + +The reader handles the following special cases: +- Non-sample records (e.g., `PERF_RECORD_LOST`): Assigned timestamp 0 to ensure immediate processing +- Malformed sample records (less than 8 bytes): Assigned timestamp 0 to ensure immediate processing +- Failed timestamp reads: Assigned timestamp 0 to ensure immediate processing -### Basic Usage +It is the responsibility of the user of the reader to: +- Ensure proper timestamp placement in sample records +- Handle malformed records appropriately when encountered +- Process non-sample records (like `PERF_RECORD_LOST`) as needed +- Handle records with timestamp 0 according to their application logic + +### Key Components + +#### RingContainer + +Manages multiple CPU rings: +- Maintains a heap of entries sorted by timestamp +- Dynamically grows as rings are added +- Provides efficient timestamp-based access ```go -// Initialize a ring buffer -data := make([]byte, pageSize*(1+nPages)) // 1 meta page + n data pages -ring, err := perf.InitContiguous(data, nPages, pageSize) +container := NewRingContainer() + +// Add ring for CPU 0 +container.AddRing(ring0) +``` + +#### Reader + +Provides sorted access to events: +- Reads events in timestamp order +- Supports maximum timestamp cutoff +- Maintains proper cleanup of resources + +```go +// Create reader with max timestamp +maxTimestamp := uint64(time.Now().UnixNano()) +reader, err := NewReader(container, maxTimestamp) if err != nil { // Handle error } +defer reader.Close() -// Write data -ring.StartWriteBatch() -offset, err := ring.Write(data, eventType) -ring.FinishWriteBatch() - -// Read data -ring.StartReadBatch() -size, _ := ring.PeekSize() -buf := make([]byte, size) -ring.PeekCopy(buf, 0, uint16(size)) -ring.Pop() -ring.FinishReadBatch() +// Read events in timestamp order +for !reader.Empty() { + ring := reader.CurrentRing() + // Process event from ring + reader.Pop() +} ``` -### Using Mmap Storage + +## Usage + +### Complete Example ```go -// Create mmap-based storage that wakes up after accumulating 4KB of data +// Create storage storage, err := NewMmapRingStorage(0, 8, 4096) if err != nil { // Handle error } defer storage.Close() -// Or wake up on every event -storage, err := NewMmapRingStorage(0, 8, 0) +// Initialize ring +ring, err := InitContiguous(storage.Data(), storage.NumDataPages(), storage.PageSize()) if err != nil { // Handle error } -defer storage.Close() + +// Create container and add ring +container := NewRingContainer() +if err := container.AddRing(ring); err != nil { + // Handle error +} + +// Create reader +reader, err := NewReader(container, maxTimestamp) +if err != nil { + // Handle error +} +defer reader.Close() + +// Read events in timestamp order +for !reader.Empty() { + ring := reader.CurrentRing() + size, _ := ring.PeekSize() + buf := make([]byte, size) + ring.PeekCopy(buf, 0) + // Process event + reader.Pop() +} ``` ## Testing +The test suite includes: +- Basic ring buffer operations +- Storage implementation tests +- Multi-CPU ring container tests +- Reader timestamp ordering tests +- Error cases and cleanup +- Watermark configuration tests + Run the tests with: ```bash go test -v ./pkg/perf ``` -The test suite includes: -- Basic initialization tests -- Storage implementation tests -- Watermark configuration tests -- Error cases and cleanup \ No newline at end of file +Note: Some tests require root privileges or appropriate capabilities (CAP_PERFMON) to run perf_event_open syscalls. \ No newline at end of file diff --git a/pkg/perf/reader.go b/pkg/perf/reader.go new file mode 100644 index 0000000..a48466c --- /dev/null +++ b/pkg/perf/reader.go @@ -0,0 +1,230 @@ +package perf + +import ( + "container/heap" + "errors" + "unsafe" +) + +var ( + // ErrNoRings is returned when trying to read from a container with no rings + ErrNoRings = errors.New("no rings available") + // ErrNotActive is returned when trying to use a reader outside of a batch + ErrNotActive = errors.New("reader is not active") + // ErrAlreadyActive is returned when trying to modify a reader while it's active + ErrAlreadyActive = errors.New("reader is already active") +) + +const ( + // PERF_RECORD_SAMPLE is the type for sample records + PERF_RECORD_SAMPLE = 9 + // PERF_RECORD_LOST is the type for lost sample records + PERF_RECORD_LOST = 2 +) + +// perfEntry represents a timestamped entry from a specific ring +type perfEntry struct { + timestamp uint64 // Event timestamp + ringIndex int // Index of the source ring +} + +// perfEntryHeap implements heap.Interface for perfEntry +type perfEntryHeap struct { + entries []perfEntry + size int // Number of valid entries in the heap +} + +func (h *perfEntryHeap) Len() int { return h.size } +func (h *perfEntryHeap) Less(i, j int) bool { + return h.entries[i].timestamp < h.entries[j].timestamp +} +func (h *perfEntryHeap) Swap(i, j int) { + h.entries[i], h.entries[j] = h.entries[j], h.entries[i] +} +func (h *perfEntryHeap) Push(x interface{}) { + h.entries[h.size] = x.(perfEntry) + h.size++ +} +func (h *perfEntryHeap) Pop() interface{} { + h.size-- + return h.entries[h.size] +} + +// Reader provides sorted access to events from multiple rings +type Reader struct { + rings []*PerfRing // Rings for each CPU + heap perfEntryHeap // Heap of entries sorted by timestamp + inHeap []bool // Tracks whether each ring has an entry in the heap + active bool +} + +// NewReader creates a new reader for accessing events +func NewReader() *Reader { + return &Reader{ + rings: make([]*PerfRing, 0), + heap: perfEntryHeap{ + entries: make([]perfEntry, 0), + }, + inHeap: make([]bool, 0), + } +} + +// AddRing adds a ring to the collection +func (r *Reader) AddRing(ring *PerfRing) error { + if r.active { + return ErrAlreadyActive + } + + r.rings = append(r.rings, ring) + r.inHeap = append(r.inHeap, false) + + // Grow the heap entries slice if needed + if cap(r.heap.entries) < len(r.rings) { + newEntries := make([]perfEntry, len(r.rings)) + copy(newEntries, r.heap.entries) + r.heap.entries = newEntries + } + return nil +} + +// Start begins a read batch, initializing the heap with available entries +func (r *Reader) Start() error { + if len(r.rings) == 0 { + return ErrNoRings + } + if r.active { + return ErrAlreadyActive + } + + // Start read batches and initialize the heap + for i, ring := range r.rings { + ring.StartReadBatch() + if !r.inHeap[i] { + r.maintainHeapEntry(i) + } + } + + r.active = true + return nil +} + +// Finish ends the current read batch +func (r *Reader) Finish() error { + if !r.active { + return nil + } + + for _, ring := range r.rings { + ring.FinishReadBatch() + } + + r.active = false + return nil +} + +// Empty returns true if there are no more events to read +func (r *Reader) Empty() bool { + if !r.active { + return true + } + return r.heap.size == 0 +} + +// PeekTimestamp returns the timestamp of the next event +func (r *Reader) PeekTimestamp() (uint64, error) { + if !r.active { + return 0, ErrNotActive + } + if r.heap.size == 0 { + return 0, ErrBufferEmpty + } + return r.heap.entries[0].timestamp, nil +} + +// CurrentRing returns the ring containing the next event +func (r *Reader) CurrentRing() (*PerfRing, error) { + if !r.active { + return nil, ErrNotActive + } + if r.heap.size == 0 { + return nil, ErrBufferEmpty + } + entry := r.heap.entries[0] + return r.rings[entry.ringIndex], nil +} + +// Pop consumes the current event and updates the heap +func (r *Reader) Pop() error { + if !r.active { + return ErrNotActive + } + if r.heap.size == 0 { + return ErrBufferEmpty + } + + entry := r.heap.entries[0] + ring := r.rings[entry.ringIndex] + + if err := ring.Pop(); err != nil { + return err + } + + // Update the heap entry for this ring + r.maintainHeapEntry(entry.ringIndex) + + return nil +} + +// maintainHeapEntry manages the heap entry for a ring +// For PERF_RECORD_SAMPLE records, the timestamp is read from the first 8 bytes of the record data. +// A timestamp of 0 is assigned in the following cases: +// - Non-sample records (e.g., PERF_RECORD_LOST) +// - Malformed sample records (less than 8 bytes) +// - Failed timestamp reads +// This ensures such records are processed as soon as possible. +func (r *Reader) maintainHeapEntry(idx int) { + ring := r.rings[idx] + inHeap := r.inHeap[idx] + + // sanity check: if we call maintainHeapEntry, the ring must be *the minimum* in the heap + if inHeap && (r.heap.size == 0 || r.heap.entries[0].ringIndex != idx) { + panic("maintainHeapEntry was called for a ring that is not the minimum in the heap (should never happen)") + } + + // If the ring is empty, remove its entry if it's in the heap + if _, err := ring.PeekSize(); err != nil { + if r.inHeap[idx] { + heap.Remove(&r.heap, 0) + r.inHeap[idx] = false + } + return + } + + // Get the timestamp for the current entry + var timestamp uint64 = 0 + if ring.PeekType() == PERF_RECORD_SAMPLE { + // Sample records have an 8-byte timestamp after the header + // Skip the first 8 bytes (sample record) and read the timestamp + buf := make([]byte, 8) + if err := ring.PeekCopy(buf, 0); err == nil { + timestamp = *(*uint64)(unsafe.Pointer(&buf[0])) + } + } + // if we cannot read the timestamp, set it to 0 (most urgent to process) + + // Update or add the entry + entry := perfEntry{ + timestamp: timestamp, + ringIndex: idx, + } + + if r.inHeap[idx] { + // Update existing entry and fix heap + r.heap.entries[0] = entry + heap.Fix(&r.heap, 0) + } else { + // Add new entry + heap.Push(&r.heap, entry) + r.inHeap[idx] = true + } +} diff --git a/pkg/perf/reader_test.go b/pkg/perf/reader_test.go new file mode 100644 index 0000000..69fb525 --- /dev/null +++ b/pkg/perf/reader_test.go @@ -0,0 +1,341 @@ +package perf + +import ( + "encoding/binary" + "fmt" + "slices" + "testing" +) + +func TestReader(t *testing.T) { + reader := NewReader() + + // Create test rings + pageSize := uint64(4096) + nPages := uint32(2) + data1 := make([]byte, pageSize*(1+uint64(nPages))) + data2 := make([]byte, pageSize*(1+uint64(nPages))) + + ring1, err := InitContiguous(data1, nPages, pageSize) + if err != nil { + t.Fatalf("failed to create ring1: %v", err) + } + + ring2, err := InitContiguous(data2, nPages, pageSize) + if err != nil { + t.Fatalf("failed to create ring2: %v", err) + } + + // Add rings to reader + if err := reader.AddRing(ring1); err != nil { + t.Fatalf("failed to add ring1: %v", err) + } + if err := reader.AddRing(ring2); err != nil { + t.Fatalf("failed to add ring2: %v", err) + } + + // Test that adding a ring while active fails + if err := reader.Start(); err != nil { + t.Fatalf("failed to start reader: %v", err) + } + if err := reader.AddRing(ring1); err != ErrAlreadyActive { + t.Errorf("expected ErrAlreadyActive, got %v", err) + } + reader.Finish() + + // Test operations before Start should fail + if !reader.Empty() { + t.Error("expected reader to be empty when not active") + } + if _, err := reader.PeekTimestamp(); err != ErrNotActive { + t.Errorf("expected ErrNotActive, got %v", err) + } + if _, err := reader.CurrentRing(); err != ErrNotActive { + t.Errorf("expected ErrNotActive, got %v", err) + } + if err := reader.Pop(); err != ErrNotActive { + t.Errorf("expected ErrNotActive, got %v", err) + } + + // Start the reader + if err := reader.Start(); err != nil { + t.Fatalf("failed to start reader: %v", err) + } + + // Initially should be empty + if !reader.Empty() { + t.Error("expected reader to be empty") + } + + reader.Finish() + + // Create events with timestamps + event1 := make([]byte, 16) // 8 bytes for timestamp + "event1" + binary.LittleEndian.PutUint64(event1[:8], 100) // timestamp 100 + copy(event1[8:], []byte("event1")) + // print the hex of event1 + fmt.Printf("event1: %x\n", event1) + + event2 := make([]byte, 16) // 8 bytes for timestamp + "event2" + binary.LittleEndian.PutUint64(event2[:8], 200) // timestamp 200 + copy(event2[8:], []byte("event2")) + + ring1.StartWriteBatch() + if _, err := ring1.Write(event1, PERF_RECORD_SAMPLE); err != nil { + t.Fatalf("failed to write event1: %v", err) + } + ring1.FinishWriteBatch() + + ring2.StartWriteBatch() + if _, err := ring2.Write(event2, PERF_RECORD_SAMPLE); err != nil { + t.Fatalf("failed to write event2: %v", err) + } + ring2.FinishWriteBatch() + + // Start a new batch to see the new events + if err := reader.Start(); err != nil { + t.Fatalf("failed to restart reader: %v", err) + } + + // Test reading events + if reader.Empty() { + t.Error("expected reader to not be empty") + } + + // Pop events and verify they come in timestamp order + expectedTimestamps := []uint64{100, 200} + expectedRingData := [][]byte{event1, event2} + for i, expected := range expectedTimestamps { + ts, err := reader.PeekTimestamp() + if err != nil { + t.Errorf("failed to peek timestamp %d: %v", i, err) + } + if ts != expected { + t.Errorf("expected timestamp %d, got %d", expected, ts) + } + + // Get current ring and verify it's not nil + ring, err := reader.CurrentRing() + if err != nil { + t.Errorf("failed to get current ring: %v", err) + } + if ring == nil { + t.Error("expected non-nil current ring") + } + + // Copy the ring's data into a new buffer + size, err := ring.PeekSize() + if err != nil { + t.Errorf("failed to peek size: %v", err) + } + ringData := make([]byte, size) + if err := ring.PeekCopy(ringData, 0); err != nil { + t.Errorf("failed to peek copy ring data: %v", err) + } + fmt.Printf("ring data: %x\n", ringData) + + if !slices.Equal(ringData, expectedRingData[i]) { + t.Errorf("expected ring data %x, got %x", expectedRingData[i], ringData) + } + + if err := reader.Pop(); err != nil { + t.Errorf("failed to pop event %d: %v", i, err) + } + } + + // Should be empty after reading all events + if !reader.Empty() { + t.Error("expected reader to be empty after reading all events") + } + + // Finish the reader + if err := reader.Finish(); err != nil { + t.Errorf("failed to finish reader: %v", err) + } + + // Test operations after Finish should fail + if !reader.Empty() { + t.Error("expected reader to be empty when not active") + } + if _, err := reader.PeekTimestamp(); err != ErrNotActive { + t.Errorf("expected ErrNotActive, got %v", err) + } + if _, err := reader.CurrentRing(); err != ErrNotActive { + t.Errorf("expected ErrNotActive, got %v", err) + } + if err := reader.Pop(); err != ErrNotActive { + t.Errorf("expected ErrNotActive, got %v", err) + } +} + +func TestReaderLostRecords(t *testing.T) { + reader := NewReader() + + // Create two test rings + pageSize := uint64(4096) + nPages := uint32(2) + data1 := make([]byte, pageSize*(1+uint64(nPages))) + data2 := make([]byte, pageSize*(1+uint64(nPages))) + + ring1, err := InitContiguous(data1, nPages, pageSize) + if err != nil { + t.Fatalf("failed to create ring1: %v", err) + } + + ring2, err := InitContiguous(data2, nPages, pageSize) + if err != nil { + t.Fatalf("failed to create ring2: %v", err) + } + + if err := reader.AddRing(ring1); err != nil { + t.Fatalf("failed to add ring1: %v", err) + } + if err := reader.AddRing(ring2); err != nil { + t.Fatalf("failed to add ring2: %v", err) + } + + // Test 1: Show that events within a single ring maintain their order regardless of type + event1 := make([]byte, 16) + binary.LittleEndian.PutUint64(event1[:8], 100) + copy(event1[8:], []byte("event1")) + + event2 := make([]byte, 16) // Lost event data + copy(event2[8:], []byte("lost!")) + + // Write both events to ring1 + ring1.StartWriteBatch() + if _, err := ring1.Write(event1, PERF_RECORD_SAMPLE); err != nil { + t.Fatalf("failed to write event1: %v", err) + } + if _, err := ring1.Write(event2, PERF_RECORD_LOST); err != nil { + t.Fatalf("failed to write event2: %v", err) + } + ring1.FinishWriteBatch() + + // Start reader and verify events come in ring order (not by type) + if err := reader.Start(); err != nil { + t.Fatalf("failed to start reader: %v", err) + } + + // First event should be event1 (timestamp 100) + ts, err := reader.PeekTimestamp() + if err != nil { + t.Errorf("failed to peek timestamp: %v", err) + } + if ts != 100 { + t.Errorf("expected timestamp 100, got %d", ts) + } + + ring, err := reader.CurrentRing() + if err != nil { + t.Errorf("failed to get current ring: %v", err) + } + if typ := ring.PeekType(); typ != PERF_RECORD_SAMPLE { + t.Errorf("expected PERF_RECORD_SAMPLE, got %d", typ) + } + if err := reader.Pop(); err != nil { + t.Errorf("failed to pop event: %v", err) + } + + // Second event should be lost event (timestamp 0) + ts, err = reader.PeekTimestamp() + if err != nil { + t.Errorf("failed to peek timestamp: %v", err) + } + if ts != 0 { + t.Errorf("expected timestamp 0 for lost event, got %d", ts) + } + + ring, err = reader.CurrentRing() + if err != nil { + t.Errorf("failed to get current ring: %v", err) + } + if typ := ring.PeekType(); typ != PERF_RECORD_LOST { + t.Errorf("expected PERF_RECORD_LOST, got %d", typ) + } + if err := reader.Pop(); err != nil { + t.Errorf("failed to pop event: %v", err) + } + + reader.Finish() + + // Test 2: Show that lost events from one ring are processed before normal events from another ring + // Ring1: Normal event with timestamp 100 + // Ring2: Lost event (should get timestamp 0) + normalEvent := make([]byte, 16) + binary.LittleEndian.PutUint64(normalEvent[:8], 100) + copy(normalEvent[8:], []byte("normal")) + + lostEvent := make([]byte, 16) + copy(lostEvent[8:], []byte("lost!")) + + ring1.StartWriteBatch() + if _, err := ring1.Write(normalEvent, PERF_RECORD_SAMPLE); err != nil { + t.Fatalf("failed to write normal event: %v", err) + } + ring1.FinishWriteBatch() + + ring2.StartWriteBatch() + if _, err := ring2.Write(lostEvent, PERF_RECORD_LOST); err != nil { + t.Fatalf("failed to write lost event: %v", err) + } + ring2.FinishWriteBatch() + + // Start reader and verify lost event comes first + if err := reader.Start(); err != nil { + t.Fatalf("failed to start reader: %v", err) + } + + // First event should be lost event (timestamp 0) + ts, err = reader.PeekTimestamp() + if err != nil { + t.Errorf("failed to peek timestamp: %v", err) + } + if ts != 0 { + t.Errorf("expected timestamp 0 for lost event, got %d", ts) + } + + ring, err = reader.CurrentRing() + if err != nil { + t.Errorf("failed to get current ring: %v", err) + } + if ring != ring2 { + t.Error("expected lost event from ring2") + } + if typ := ring.PeekType(); typ != PERF_RECORD_LOST { + t.Errorf("expected PERF_RECORD_LOST, got %d", typ) + } + if err := reader.Pop(); err != nil { + t.Errorf("failed to pop lost event: %v", err) + } + + // Second event should be normal event (timestamp 100) + ts, err = reader.PeekTimestamp() + if err != nil { + t.Errorf("failed to peek timestamp: %v", err) + } + if ts != 100 { + t.Errorf("expected timestamp 100 for normal event, got %d", ts) + } + + ring, err = reader.CurrentRing() + if err != nil { + t.Errorf("failed to get current ring: %v", err) + } + if ring != ring1 { + t.Error("expected normal event from ring1") + } + if typ := ring.PeekType(); typ != PERF_RECORD_SAMPLE { + t.Errorf("expected PERF_RECORD_SAMPLE, got %d", typ) + } + if err := reader.Pop(); err != nil { + t.Errorf("failed to pop normal event: %v", err) + } + + // Should be empty after reading all events + if !reader.Empty() { + t.Error("expected reader to be empty after reading all events") + } + + reader.Finish() +} From e6a65dbdc665f565472f08746e05a9677778b376 Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Fri, 21 Feb 2025 03:29:54 +0000 Subject: [PATCH 16/33] update golang linter action to latest version --- .github/workflows/test-go-packages.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test-go-packages.yaml b/.github/workflows/test-go-packages.yaml index f2d1992..c3a492f 100644 --- a/.github/workflows/test-go-packages.yaml +++ b/.github/workflows/test-go-packages.yaml @@ -64,7 +64,7 @@ jobs: fi - name: Run linter - uses: golangci/golangci-lint-action@v3 + uses: golangci/golangci-lint-action@v6 with: version: latest args: --timeout=5m From 9191b54be3d5730deae97bd2469d6f5f9731445e Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Fri, 21 Feb 2025 03:33:00 +0000 Subject: [PATCH 17/33] fix linter errors in reader_test.go --- pkg/perf/reader_test.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pkg/perf/reader_test.go b/pkg/perf/reader_test.go index 69fb525..f812d7d 100644 --- a/pkg/perf/reader_test.go +++ b/pkg/perf/reader_test.go @@ -41,7 +41,9 @@ func TestReader(t *testing.T) { if err := reader.AddRing(ring1); err != ErrAlreadyActive { t.Errorf("expected ErrAlreadyActive, got %v", err) } - reader.Finish() + if err := reader.Finish(); err != nil { + t.Fatalf("failed to finish reader: %v", err) + } // Test operations before Start should fail if !reader.Empty() { @@ -67,7 +69,9 @@ func TestReader(t *testing.T) { t.Error("expected reader to be empty") } - reader.Finish() + if err := reader.Finish(); err != nil { + t.Fatalf("failed to finish reader: %v", err) + } // Create events with timestamps event1 := make([]byte, 16) // 8 bytes for timestamp + "event1" @@ -257,7 +261,9 @@ func TestReaderLostRecords(t *testing.T) { t.Errorf("failed to pop event: %v", err) } - reader.Finish() + if err := reader.Finish(); err != nil { + t.Errorf("failed to finish reader: %v", err) + } // Test 2: Show that lost events from one ring are processed before normal events from another ring // Ring1: Normal event with timestamp 100 From 5e2b91a4bcd74267286cbda0da91a82811ce640b Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Fri, 21 Feb 2025 03:37:49 +0000 Subject: [PATCH 18/33] fixed missed linter error --- pkg/perf/reader_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/perf/reader_test.go b/pkg/perf/reader_test.go index f812d7d..83d1f2e 100644 --- a/pkg/perf/reader_test.go +++ b/pkg/perf/reader_test.go @@ -343,5 +343,7 @@ func TestReaderLostRecords(t *testing.T) { t.Error("expected reader to be empty after reading all events") } - reader.Finish() + if err := reader.Finish(); err != nil { + t.Errorf("failed to finish reader: %v", err) + } } From 166a0ff2a6187d3334ebc72ebc6167bbef0e0cb3 Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Fri, 21 Feb 2025 03:54:09 +0000 Subject: [PATCH 19/33] Add eBPF perf map reader with tests and workflow update This commit introduces a new package `perf_ebpf` with a `PerfMapReader` for automating the task of creating and registering per-cpu perf ring buffers with cilium/ebpf --- .github/workflows/test-go-packages.yaml | 20 ++--- pkg/perf_ebpf/reader.go | 114 ++++++++++++++++++++++++ pkg/perf_ebpf/reader_test.go | 77 ++++++++++++++++ 3 files changed, 201 insertions(+), 10 deletions(-) create mode 100644 pkg/perf_ebpf/reader.go create mode 100644 pkg/perf_ebpf/reader_test.go diff --git a/.github/workflows/test-go-packages.yaml b/.github/workflows/test-go-packages.yaml index c3a492f..7a90e9c 100644 --- a/.github/workflows/test-go-packages.yaml +++ b/.github/workflows/test-go-packages.yaml @@ -37,14 +37,14 @@ jobs: - name: Install libcap run: sudo apt-get update && sudo apt-get install -y libcap2-bin - - name: Run perf package tests + - name: Run perf-related package tests run: | - # Compile the test binary - go test -c ./pkg/perf -race -o perf.test + # Compile the test binaries for perf and perf_ebpf packages + go test -c ./pkg/perf ./pkg/perf_ebpf -race -o perf_tests.test # Add CAP_PERFMON capability to the test binary - sudo setcap cap_perfmon+ep perf.test - # Verfiy capability - getcap perf.test + sudo setcap cap_perfmon+ep perf_tests.test + # Verify capability + getcap perf_tests.test # Check if perf_event_paranoid is restricting access cat /proc/sys/kernel/perf_event_paranoid @@ -54,13 +54,13 @@ jobs: cat /proc/sys/kernel/perf_event_paranoid # Run the perf tests with the permissioned binary - ./perf.test -test.v + ./perf_tests.test -test.v - name: Run other package tests run: | - # Run tests for all packages except perf, only if there are other packages - if [ -n "$(go list ./pkg/... | grep -v pkg/perf)" ]; then - go test -v -race $(go list ./pkg/... | grep -v pkg/perf) + # Run tests for all packages except perf and perf_ebpf + if [ -n "$(go list ./pkg/... | grep -v 'pkg/perf\|pkg/perf_ebpf')" ]; then + go test -v -race $(go list ./pkg/... | grep -v 'pkg/perf\|pkg/perf_ebpf') fi - name: Run linter diff --git a/pkg/perf_ebpf/reader.go b/pkg/perf_ebpf/reader.go new file mode 100644 index 0000000..8bc00dc --- /dev/null +++ b/pkg/perf_ebpf/reader.go @@ -0,0 +1,114 @@ +// Package perf_ebpf provides integration between perf ring buffers and eBPF maps. +package perf_ebpf + +import ( + "fmt" + + "github.com/cilium/ebpf" + "github.com/unvariance/collector/pkg/perf" +) + +// Options controls the behavior of PerfMapReader +type Options struct { + // The size of each per-CPU buffer in bytes + BufferSize int + // The number of bytes that must be written before waking up userspace + // Must be less than BufferSize + WatermarkBytes uint32 +} + +// PerfMapReader manages perf ring buffers connected to an eBPF map +type PerfMapReader struct { + array *ebpf.Map + rings []*perf.PerfRing + storage []*perf.MmapRingStorage + reader *perf.Reader +} + +// NewPerfMapReader creates a new reader connected to an eBPF map +func NewPerfMapReader(array *ebpf.Map, opts Options) (*PerfMapReader, error) { + if array == nil { + return nil, fmt.Errorf("array cannot be nil") + } + + if opts.BufferSize < 1 { + return nil, fmt.Errorf("buffer size must be greater than 0") + } + + if opts.WatermarkBytes >= uint32(opts.BufferSize) { + return nil, fmt.Errorf("watermark must be less than buffer size") + } + + // Get number of possible CPUs from the map + nCPU := int(array.MaxEntries()) + if nCPU < 1 { + return nil, fmt.Errorf("invalid number of CPUs in map: %d", nCPU) + } + + pmr := &PerfMapReader{ + array: array, + rings: make([]*perf.PerfRing, 0, nCPU), + storage: make([]*perf.MmapRingStorage, 0, nCPU), + } + + // Create storage and rings for each CPU + for cpu := 0; cpu < nCPU; cpu++ { + storage, err := perf.NewMmapRingStorage(cpu, uint32(opts.BufferSize/4096), opts.WatermarkBytes) + if err != nil { + pmr.Close() + return nil, fmt.Errorf("failed to create storage for CPU %d: %w", cpu, err) + } + pmr.storage = append(pmr.storage, storage) + + ring, err := perf.InitContiguous(storage.Data(), storage.NumDataPages(), storage.PageSize()) + if err != nil { + pmr.Close() + return nil, fmt.Errorf("failed to init ring for CPU %d: %w", cpu, err) + } + pmr.rings = append(pmr.rings, ring) + + // Store the file descriptor in the eBPF map + if err := array.Put(uint32(cpu), storage.FileDescriptor()); err != nil { + pmr.Close() + return nil, fmt.Errorf("failed to update map for CPU %d: %w", cpu, err) + } + } + + // Create reader + reader := perf.NewReader() + for _, ring := range pmr.rings { + if err := reader.AddRing(ring); err != nil { + pmr.Close() + return nil, fmt.Errorf("failed to add ring to reader: %w", err) + } + } + pmr.reader = reader + + return pmr, nil +} + +// Reader returns the underlying perf.Reader +func (pmr *PerfMapReader) Reader() *perf.Reader { + return pmr.reader +} + +// Close releases all resources +func (pmr *PerfMapReader) Close() error { + if pmr.reader != nil { + pmr.reader.Finish() + } + + for _, storage := range pmr.storage { + if storage != nil { + storage.Close() + } + } + + // Clear references + pmr.rings = nil + pmr.storage = nil + pmr.reader = nil + pmr.array = nil + + return nil +} diff --git a/pkg/perf_ebpf/reader_test.go b/pkg/perf_ebpf/reader_test.go new file mode 100644 index 0000000..1acdff1 --- /dev/null +++ b/pkg/perf_ebpf/reader_test.go @@ -0,0 +1,77 @@ +package perf_ebpf + +import ( + "testing" + + "github.com/cilium/ebpf" +) + +func TestNewPerfMapReader(t *testing.T) { + // Create a test eBPF map + mapSpec := &ebpf.MapSpec{ + Type: ebpf.PerfEventArray, + KeySize: 4, + ValueSize: 4, + MaxEntries: 4, // Support 4 CPUs for testing + } + + array, err := ebpf.NewMap(mapSpec) + if err != nil { + t.Fatalf("failed to create map: %v", err) + } + defer array.Close() + + tests := []struct { + name string + array *ebpf.Map + opts Options + wantErr bool + }{ + { + name: "nil array", + array: nil, + opts: Options{BufferSize: 4096, WatermarkBytes: 1024}, + wantErr: true, + }, + { + name: "zero buffer size", + array: array, + opts: Options{BufferSize: 0, WatermarkBytes: 1024}, + wantErr: true, + }, + { + name: "watermark too large", + array: array, + opts: Options{BufferSize: 4096, WatermarkBytes: 4096}, + wantErr: true, + }, + { + name: "valid options", + array: array, + opts: Options{BufferSize: 4096, WatermarkBytes: 1024}, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + reader, err := NewPerfMapReader(tt.array, tt.opts) + if tt.wantErr { + if err == nil { + t.Error("expected error, got nil") + } + return + } + if err != nil { + t.Errorf("unexpected error: %v", err) + return + } + defer reader.Close() + + // Verify reader is properly initialized + if reader.Reader() == nil { + t.Error("expected non-nil reader") + } + }) + } +} From 7530f01a419cbf790358daff9ad723252b4b7c67 Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Fri, 21 Feb 2025 03:59:27 +0000 Subject: [PATCH 20/33] Update perf test workflow to separately compile and run perf and perf_ebpf tests --- .github/workflows/test-go-packages.yaml | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/.github/workflows/test-go-packages.yaml b/.github/workflows/test-go-packages.yaml index 7a90e9c..9edc0cb 100644 --- a/.github/workflows/test-go-packages.yaml +++ b/.github/workflows/test-go-packages.yaml @@ -40,11 +40,13 @@ jobs: - name: Run perf-related package tests run: | # Compile the test binaries for perf and perf_ebpf packages - go test -c ./pkg/perf ./pkg/perf_ebpf -race -o perf_tests.test + go test -c ./pkg/perf ./pkg/perf_ebpf -race -o . # Add CAP_PERFMON capability to the test binary - sudo setcap cap_perfmon+ep perf_tests.test + sudo setcap cap_perfmon+ep perf.test + sudo setcap cap_perfmon+ep perf_ebpf.test # Verify capability - getcap perf_tests.test + getcap perf.test + getcap perf_ebpf.test # Check if perf_event_paranoid is restricting access cat /proc/sys/kernel/perf_event_paranoid @@ -54,7 +56,8 @@ jobs: cat /proc/sys/kernel/perf_event_paranoid # Run the perf tests with the permissioned binary - ./perf_tests.test -test.v + ./perf.test -test.v + ./perf_ebpf.test -test.v - name: Run other package tests run: | From 9ad2ef0c75524cda9ac35c7b23df51f895041370 Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Fri, 21 Feb 2025 04:04:03 +0000 Subject: [PATCH 21/33] remove memlock in reader_test.go --- pkg/perf_ebpf/reader_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/perf_ebpf/reader_test.go b/pkg/perf_ebpf/reader_test.go index 1acdff1..e85aa0d 100644 --- a/pkg/perf_ebpf/reader_test.go +++ b/pkg/perf_ebpf/reader_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/cilium/ebpf" + "github.com/cilium/ebpf/rlimit" ) func TestNewPerfMapReader(t *testing.T) { @@ -15,6 +16,11 @@ func TestNewPerfMapReader(t *testing.T) { MaxEntries: 4, // Support 4 CPUs for testing } + // remove memlock limit + if err := rlimit.RemoveMemlock(); err != nil { + t.Fatalf("failed to remove memlock: %v", err) + } + array, err := ebpf.NewMap(mapSpec) if err != nil { t.Fatalf("failed to create map: %v", err) From e3a466a35885ea997a2209c7cc9770d1af67ced3 Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Fri, 21 Feb 2025 04:10:32 +0000 Subject: [PATCH 22/33] gitignore: add macOS generated files --- .gitignore | 1 + 1 file changed, 1 insertion(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..79b5594 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +**/.DS_Store From fb5cd267274156d37b130dc9203fa44ededdd80c Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Fri, 21 Feb 2025 04:11:12 +0000 Subject: [PATCH 23/33] ci: add cap_bpf to perf_ebpf tests --- .github/workflows/test-go-packages.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test-go-packages.yaml b/.github/workflows/test-go-packages.yaml index 9edc0cb..14b7fa0 100644 --- a/.github/workflows/test-go-packages.yaml +++ b/.github/workflows/test-go-packages.yaml @@ -43,7 +43,7 @@ jobs: go test -c ./pkg/perf ./pkg/perf_ebpf -race -o . # Add CAP_PERFMON capability to the test binary sudo setcap cap_perfmon+ep perf.test - sudo setcap cap_perfmon+ep perf_ebpf.test + sudo setcap cap_perfmon+ep,cap_bpf+ep perf_ebpf.test # Verify capability getcap perf.test getcap perf_ebpf.test From 972ddf6050bbaec0f2f06253f04b2dfc48f5d286 Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Fri, 21 Feb 2025 04:15:50 +0000 Subject: [PATCH 24/33] ci: fix capability string --- .github/workflows/test-go-packages.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test-go-packages.yaml b/.github/workflows/test-go-packages.yaml index 14b7fa0..b6744f4 100644 --- a/.github/workflows/test-go-packages.yaml +++ b/.github/workflows/test-go-packages.yaml @@ -43,7 +43,7 @@ jobs: go test -c ./pkg/perf ./pkg/perf_ebpf -race -o . # Add CAP_PERFMON capability to the test binary sudo setcap cap_perfmon+ep perf.test - sudo setcap cap_perfmon+ep,cap_bpf+ep perf_ebpf.test + sudo setcap cap_perfmon,cap_bpf+ep perf_ebpf.test # Verify capability getcap perf.test getcap perf_ebpf.test From 5fe01c8a7e276e3839aecd8bfdcea25f551faa09 Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Fri, 21 Feb 2025 04:31:34 +0000 Subject: [PATCH 25/33] use right size of file descriptor for setting ebpf.Map --- pkg/perf_ebpf/reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/perf_ebpf/reader.go b/pkg/perf_ebpf/reader.go index 8bc00dc..18821a1 100644 --- a/pkg/perf_ebpf/reader.go +++ b/pkg/perf_ebpf/reader.go @@ -68,7 +68,7 @@ func NewPerfMapReader(array *ebpf.Map, opts Options) (*PerfMapReader, error) { pmr.rings = append(pmr.rings, ring) // Store the file descriptor in the eBPF map - if err := array.Put(uint32(cpu), storage.FileDescriptor()); err != nil { + if err := array.Put(uint32(cpu), uint32(storage.FileDescriptor())); err != nil { pmr.Close() return nil, fmt.Errorf("failed to update map for CPU %d: %w", cpu, err) } From 0424c535a1bfc5359e6603554334182d2cd7795e Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Fri, 21 Feb 2025 04:33:41 +0000 Subject: [PATCH 26/33] fix lint error (check return value) --- pkg/perf_ebpf/reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/perf_ebpf/reader.go b/pkg/perf_ebpf/reader.go index 18821a1..ec13ab4 100644 --- a/pkg/perf_ebpf/reader.go +++ b/pkg/perf_ebpf/reader.go @@ -95,7 +95,7 @@ func (pmr *PerfMapReader) Reader() *perf.Reader { // Close releases all resources func (pmr *PerfMapReader) Close() error { if pmr.reader != nil { - pmr.reader.Finish() + _ = pmr.reader.Finish() } for _, storage := range pmr.storage { From 686fa397fd80c76a1810b041f7d9695421e593f1 Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Fri, 21 Feb 2025 05:04:08 +0000 Subject: [PATCH 27/33] Refactor collector main to use new perf map reader --- cmd/collector/main.go | 115 +++++++++++++++++++++-------------- cmd/collector/task_counter.c | 2 + 2 files changed, 71 insertions(+), 46 deletions(-) diff --git a/cmd/collector/main.go b/cmd/collector/main.go index 8b23acf..7f49dc4 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -3,16 +3,17 @@ package main import ( "bytes" "encoding/binary" - "errors" "io/ioutil" "log" "os" "os/signal" "time" + "unsafe" "github.com/cilium/ebpf/link" - "github.com/cilium/ebpf/perf" "github.com/cilium/ebpf/rlimit" + ourperf "github.com/unvariance/collector/pkg/perf" + "github.com/unvariance/collector/pkg/perf_ebpf" "golang.org/x/sys/unix" ) @@ -92,12 +93,13 @@ func main() { // Create a ReaderOptions with a large Watermark perCPUBufferSize := 16 * os.Getpagesize() - opts := perf.ReaderOptions{ - Watermark: perCPUBufferSize / 2, + opts := perf_ebpf.Options{ + BufferSize: perCPUBufferSize, + WatermarkBytes: uint32(perCPUBufferSize / 2), } - // Open a perf reader from userspace - rd, err := perf.NewReaderWithOptions(objs.Events, perCPUBufferSize, opts) + // Create our perf map reader + rd, err := perf_ebpf.NewPerfMapReader(objs.Events, opts) if err != nil { log.Fatal(err) } @@ -165,16 +167,17 @@ func main() { signal.Notify(stopper, os.Interrupt) timeout := time.After(5 * time.Second) - - // set deadline in the past for rd, so it will not block - nextDeadline := time.Now().Add(time.Second) - rd.SetDeadline(nextDeadline) - - log.Println("Waiting for events...") + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() // Counter to maintain in userspace var totalEvents uint64 = 0 + // Start the reader + reader := rd.Reader() + + log.Println("Waiting for events...") + for { select { case <-stopper: @@ -185,48 +188,68 @@ func main() { log.Println("Finished counting after 5 seconds") dumpRmidMap(&objs) // Dump RMID map before exiting return - default: - - // if the deadline is in the past, set it to the next deadline - if time.Now().After(nextDeadline) { - nextDeadline = nextDeadline.Add(time.Second) - rd.SetDeadline(nextDeadline) - - // output counts - var count uint64 - var key uint32 = 0 - if err := objs.EventCount.Lookup(&key, &count); err != nil { - log.Fatal(err) - } - log.Printf("Event count: userspace %d, eBPF %d\n", totalEvents, count) + case <-ticker.C: + if err := reader.Start(); err != nil { + log.Fatal(err) } - record, err := rd.Read() - if err != nil { - if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, perf.ErrFlushed) { - break // make for loop check the select statement and set the deadline - } else if errors.Is(err, perf.ErrClosed) { - return + // Process all available events + for !reader.Empty() { + ring, err := reader.CurrentRing() + if err != nil { + log.Printf("Error getting current ring: %s", err) + break + } + + // Check for lost samples + if ring.PeekType() == ourperf.PERF_RECORD_LOST { + var lostCount uint64 + if err := ring.PeekCopy((*[8]byte)(unsafe.Pointer(&lostCount))[:], 8); err != nil { + log.Printf("Error reading lost count: %s", err) + } else { + log.Printf("Lost %d samples", lostCount) + } + reader.Pop() + continue + } + + // Parse the raw event + size, err := ring.PeekSize() + if err != nil { + log.Printf("Error getting event size: %s", err) + break + } + + eventData := make([]byte, size) + if err := ring.PeekCopy(eventData, 0); err != nil { + log.Printf("Error copying event data: %s", err) + break } - log.Printf("Reading from perf event reader: %s", err) - continue - } - if record.LostSamples != 0 { - log.Printf("Lost %d samples", record.LostSamples) - continue + var event taskCounterEvent + if err := binary.Read(bytes.NewReader(eventData), binary.LittleEndian, &event); err != nil { + log.Printf("Failed to parse perf event: %s", err) + break + } + + log.Printf("Event - RMID: %d, Time Delta: %d ns, Cycles Delta: %d, Instructions Delta: %d, LLC Misses Delta: %d", + event.Rmid, event.TimeDeltaNs, event.CyclesDelta, event.InstructionsDelta, event.LlcMissesDelta) + totalEvents++ + + reader.Pop() } - // Parse the raw bytes into our Event struct - var event taskCounterEvent - if err := binary.Read(bytes.NewReader(record.RawSample), binary.LittleEndian, &event); err != nil { - log.Printf("Failed to parse perf event: %s", err) - continue + if err := reader.Finish(); err != nil { + log.Printf("Error finishing reader: %s", err) } - log.Printf("Event - CPU: %d, RMID: %d, Time Delta: %d ns, Cycles Delta: %d, Instructions Delta: %d, LLC Misses Delta: %d", - record.CPU, event.Rmid, event.TimeDeltaNs, event.CyclesDelta, event.InstructionsDelta, event.LlcMissesDelta) - totalEvents++ + // Output counts every second + var count uint64 + var key uint32 = 0 + if err := objs.EventCount.Lookup(&key, &count); err != nil { + log.Fatal(err) + } + log.Printf("Event count: userspace %d, eBPF %d\n", totalEvents, count) } } } diff --git a/cmd/collector/task_counter.c b/cmd/collector/task_counter.c index 565028d..5d3f68c 100644 --- a/cmd/collector/task_counter.c +++ b/cmd/collector/task_counter.c @@ -9,6 +9,7 @@ // Define the event structure that matches the Go side struct event { + __u64 timestamp; // Add timestamp as first field __u32 rmid; __u64 cycles_delta; __u64 instructions_delta; @@ -263,6 +264,7 @@ int count_events(void *ctx) { prev->timestamp = now; // Submit the event to the perf event array + e.timestamp = now; bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, &e, sizeof(e)); increase_count(ctx); From 34a3f946e7ab8e99dea575821d0cb1337b4e001e Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Fri, 21 Feb 2025 06:04:26 +0000 Subject: [PATCH 28/33] account for u32 size element on PERF_SAMPLE_RAW in ring --- cmd/collector/main.go | 7 +++++-- pkg/perf/reader.go | 2 +- pkg/perf/reader_test.go | 11 +++++++++-- pkg/perf/ring.go | 13 ++++++++++++- 4 files changed, 27 insertions(+), 6 deletions(-) diff --git a/cmd/collector/main.go b/cmd/collector/main.go index 7f49dc4..1fc4e86 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -220,8 +220,8 @@ func main() { break } - eventData := make([]byte, size) - if err := ring.PeekCopy(eventData, 0); err != nil { + eventData := make([]byte, size-4) + if err := ring.PeekCopy(eventData, 4); err != nil { log.Printf("Error copying event data: %s", err) break } @@ -232,6 +232,9 @@ func main() { break } + // print hex encoding of eventData + log.Printf("Event data: %x", eventData) + // print parsed event log.Printf("Event - RMID: %d, Time Delta: %d ns, Cycles Delta: %d, Instructions Delta: %d, LLC Misses Delta: %d", event.Rmid, event.TimeDeltaNs, event.CyclesDelta, event.InstructionsDelta, event.LlcMissesDelta) totalEvents++ diff --git a/pkg/perf/reader.go b/pkg/perf/reader.go index a48466c..6786df0 100644 --- a/pkg/perf/reader.go +++ b/pkg/perf/reader.go @@ -206,7 +206,7 @@ func (r *Reader) maintainHeapEntry(idx int) { // Sample records have an 8-byte timestamp after the header // Skip the first 8 bytes (sample record) and read the timestamp buf := make([]byte, 8) - if err := ring.PeekCopy(buf, 0); err == nil { + if err := ring.PeekCopy(buf, 4); err == nil { timestamp = *(*uint64)(unsafe.Pointer(&buf[0])) } } diff --git a/pkg/perf/reader_test.go b/pkg/perf/reader_test.go index 83d1f2e..a36b0a7 100644 --- a/pkg/perf/reader_test.go +++ b/pkg/perf/reader_test.go @@ -132,8 +132,15 @@ func TestReader(t *testing.T) { if err != nil { t.Errorf("failed to peek size: %v", err) } - ringData := make([]byte, size) - if err := ring.PeekCopy(ringData, 0); err != nil { + + // size should be len(expectedRingData[i]) + sizeof(uint32) rounded up to 8 + expectedSize := uint32(len(expectedRingData[i])+4+7) & ^uint32(7) + if uint32(size) != expectedSize { + t.Errorf("expected size %d, got %d", expectedSize, size) + } + + ringData := make([]byte, len(expectedRingData[i])) + if err := ring.PeekCopy(ringData, 4); err != nil { t.Errorf("failed to peek copy ring data: %v", err) } fmt.Printf("ring data: %x\n", ringData) diff --git a/pkg/perf/ring.go b/pkg/perf/ring.go index e4b5d79..715a4bc 100644 --- a/pkg/perf/ring.go +++ b/pkg/perf/ring.go @@ -99,8 +99,14 @@ func (r *PerfRing) Write(data []byte, eventType uint32) (int, error) { return 0, ErrEmptyWrite } + unalignedLen := uint32(len(data)) + uint32(unsafe.Sizeof(PerfEventHeader{})) + + if eventType == PERF_RECORD_SAMPLE { + unalignedLen += 4 // add the u32 size field + } + // Calculate total size including header, aligned to 8 bytes - alignedLen := ((uint32(len(data)) + uint32(unsafe.Sizeof(PerfEventHeader{})) + 7) & ^uint32(7)) + alignedLen := ((unalignedLen + 7) & ^uint32(7)) if alignedLen > uint32(r.bufMask) { return 0, ErrCannotFit } @@ -120,6 +126,11 @@ func (r *PerfRing) Write(data []byte, eventType uint32) (int, error) { // Write data dataPos := (r.tail + uint64(unsafe.Sizeof(header))) & r.bufMask + if eventType == PERF_RECORD_SAMPLE { + // write the u32 size field + (*(*uint32)(unsafe.Pointer(&r.data[dataPos]))) = uint32(len(data)+4+7) & ^uint32(7) + dataPos = (dataPos + 4) & r.bufMask + } if dataPos+uint64(len(data)) <= uint64(len(r.data)) { // Data fits without wrapping copy(r.data[dataPos:], data) From 6c64c160fe411ad677afe306d7e879f4a1e5624b Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Fri, 21 Feb 2025 13:45:56 +0000 Subject: [PATCH 29/33] remove verbose printing of received perf ring messages --- cmd/collector/main.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/cmd/collector/main.go b/cmd/collector/main.go index 1fc4e86..5f456d7 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -232,8 +232,6 @@ func main() { break } - // print hex encoding of eventData - log.Printf("Event data: %x", eventData) // print parsed event log.Printf("Event - RMID: %d, Time Delta: %d ns, Cycles Delta: %d, Instructions Delta: %d, LLC Misses Delta: %d", event.Rmid, event.TimeDeltaNs, event.CyclesDelta, event.InstructionsDelta, event.LlcMissesDelta) From 0ec19f5def0a6a54754179398f625748be10be94 Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Fri, 21 Feb 2025 13:53:49 +0000 Subject: [PATCH 30/33] Add CPU ID tracking to perf reader and event logging --- cmd/collector/main.go | 8 ++++---- pkg/perf/README.md | 7 +++++-- pkg/perf/reader.go | 10 +++++----- pkg/perf/reader_test.go | 35 ++++++++++++++++++++++++++++------- 4 files changed, 42 insertions(+), 18 deletions(-) diff --git a/cmd/collector/main.go b/cmd/collector/main.go index 5f456d7..662287c 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -195,7 +195,7 @@ func main() { // Process all available events for !reader.Empty() { - ring, err := reader.CurrentRing() + ring, cpuID, err := reader.CurrentRing() if err != nil { log.Printf("Error getting current ring: %s", err) break @@ -207,7 +207,7 @@ func main() { if err := ring.PeekCopy((*[8]byte)(unsafe.Pointer(&lostCount))[:], 8); err != nil { log.Printf("Error reading lost count: %s", err) } else { - log.Printf("Lost %d samples", lostCount) + log.Printf("Lost %d samples on CPU %d", lostCount, cpuID) } reader.Pop() continue @@ -233,8 +233,8 @@ func main() { } // print parsed event - log.Printf("Event - RMID: %d, Time Delta: %d ns, Cycles Delta: %d, Instructions Delta: %d, LLC Misses Delta: %d", - event.Rmid, event.TimeDeltaNs, event.CyclesDelta, event.InstructionsDelta, event.LlcMissesDelta) + log.Printf("Event - CPU: %d, RMID: %d, Time Delta: %d ns, Cycles Delta: %d, Instructions Delta: %d, LLC Misses Delta: %d", + cpuID, event.Rmid, event.TimeDeltaNs, event.CyclesDelta, event.InstructionsDelta, event.LlcMissesDelta) totalEvents++ reader.Pop() diff --git a/pkg/perf/README.md b/pkg/perf/README.md index 6e58451..f62a6fe 100644 --- a/pkg/perf/README.md +++ b/pkg/perf/README.md @@ -150,7 +150,10 @@ defer reader.Close() // Read events in timestamp order for !reader.Empty() { - ring := reader.CurrentRing() + ring, ringIndex, err := reader.CurrentRing() + if err != nil { + // Handle error + } // Process event from ring reader.Pop() } @@ -190,7 +193,7 @@ defer reader.Close() // Read events in timestamp order for !reader.Empty() { - ring := reader.CurrentRing() + ring, _, _ := reader.CurrentRing() size, _ := ring.PeekSize() buf := make([]byte, size) ring.PeekCopy(buf, 0) diff --git a/pkg/perf/reader.go b/pkg/perf/reader.go index 6786df0..b062690 100644 --- a/pkg/perf/reader.go +++ b/pkg/perf/reader.go @@ -141,16 +141,16 @@ func (r *Reader) PeekTimestamp() (uint64, error) { return r.heap.entries[0].timestamp, nil } -// CurrentRing returns the ring containing the next event -func (r *Reader) CurrentRing() (*PerfRing, error) { +// CurrentRing returns the ring containing the next event and its index +func (r *Reader) CurrentRing() (*PerfRing, int, error) { if !r.active { - return nil, ErrNotActive + return nil, 0, ErrNotActive } if r.heap.size == 0 { - return nil, ErrBufferEmpty + return nil, 0, ErrBufferEmpty } entry := r.heap.entries[0] - return r.rings[entry.ringIndex], nil + return r.rings[entry.ringIndex], entry.ringIndex, nil } // Pop consumes the current event and updates the heap diff --git a/pkg/perf/reader_test.go b/pkg/perf/reader_test.go index a36b0a7..cd958cc 100644 --- a/pkg/perf/reader_test.go +++ b/pkg/perf/reader_test.go @@ -52,7 +52,7 @@ func TestReader(t *testing.T) { if _, err := reader.PeekTimestamp(); err != ErrNotActive { t.Errorf("expected ErrNotActive, got %v", err) } - if _, err := reader.CurrentRing(); err != ErrNotActive { + if _, _, err := reader.CurrentRing(); err != ErrNotActive { t.Errorf("expected ErrNotActive, got %v", err) } if err := reader.Pop(); err != ErrNotActive { @@ -119,13 +119,16 @@ func TestReader(t *testing.T) { } // Get current ring and verify it's not nil - ring, err := reader.CurrentRing() + ring, idx, err := reader.CurrentRing() if err != nil { t.Errorf("failed to get current ring: %v", err) } if ring == nil { t.Error("expected non-nil current ring") } + if idx < 0 || idx >= len(reader.rings) { + t.Errorf("ring index %d out of bounds [0, %d)", idx, len(reader.rings)) + } // Copy the ring's data into a new buffer size, err := ring.PeekSize() @@ -171,7 +174,7 @@ func TestReader(t *testing.T) { if _, err := reader.PeekTimestamp(); err != ErrNotActive { t.Errorf("expected ErrNotActive, got %v", err) } - if _, err := reader.CurrentRing(); err != ErrNotActive { + if _, _, err := reader.CurrentRing(); err != ErrNotActive { t.Errorf("expected ErrNotActive, got %v", err) } if err := reader.Pop(); err != ErrNotActive { @@ -237,10 +240,16 @@ func TestReaderLostRecords(t *testing.T) { t.Errorf("expected timestamp 100, got %d", ts) } - ring, err := reader.CurrentRing() + ring, idx, err := reader.CurrentRing() if err != nil { t.Errorf("failed to get current ring: %v", err) } + if ring != ring1 { + t.Error("expected event from ring1") + } + if idx != 0 { + t.Errorf("expected ring index 0, got %d", idx) + } if typ := ring.PeekType(); typ != PERF_RECORD_SAMPLE { t.Errorf("expected PERF_RECORD_SAMPLE, got %d", typ) } @@ -257,10 +266,16 @@ func TestReaderLostRecords(t *testing.T) { t.Errorf("expected timestamp 0 for lost event, got %d", ts) } - ring, err = reader.CurrentRing() + ring, idx, err = reader.CurrentRing() if err != nil { t.Errorf("failed to get current ring: %v", err) } + if ring != ring1 { + t.Error("expected lost event from ring1") + } + if idx != 0 { + t.Errorf("expected ring index 0, got %d", idx) + } if typ := ring.PeekType(); typ != PERF_RECORD_LOST { t.Errorf("expected PERF_RECORD_LOST, got %d", typ) } @@ -308,13 +323,16 @@ func TestReaderLostRecords(t *testing.T) { t.Errorf("expected timestamp 0 for lost event, got %d", ts) } - ring, err = reader.CurrentRing() + ring, idx, err = reader.CurrentRing() if err != nil { t.Errorf("failed to get current ring: %v", err) } if ring != ring2 { t.Error("expected lost event from ring2") } + if idx != 1 { + t.Errorf("expected ring index 1, got %d", idx) + } if typ := ring.PeekType(); typ != PERF_RECORD_LOST { t.Errorf("expected PERF_RECORD_LOST, got %d", typ) } @@ -331,13 +349,16 @@ func TestReaderLostRecords(t *testing.T) { t.Errorf("expected timestamp 100 for normal event, got %d", ts) } - ring, err = reader.CurrentRing() + ring, idx, err = reader.CurrentRing() if err != nil { t.Errorf("failed to get current ring: %v", err) } if ring != ring1 { t.Error("expected normal event from ring1") } + if idx != 0 { + t.Errorf("expected ring index 0, got %d", idx) + } if typ := ring.PeekType(); typ != PERF_RECORD_SAMPLE { t.Errorf("expected PERF_RECORD_SAMPLE, got %d", typ) } From 739058330edec4993c4256015ef475bd100441ea Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Fri, 21 Feb 2025 13:55:54 +0000 Subject: [PATCH 31/33] add timestamp in collector verbose prints --- cmd/collector/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/collector/main.go b/cmd/collector/main.go index 662287c..91a963c 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -233,8 +233,8 @@ func main() { } // print parsed event - log.Printf("Event - CPU: %d, RMID: %d, Time Delta: %d ns, Cycles Delta: %d, Instructions Delta: %d, LLC Misses Delta: %d", - cpuID, event.Rmid, event.TimeDeltaNs, event.CyclesDelta, event.InstructionsDelta, event.LlcMissesDelta) + log.Printf("Event - CPU: %d, RMID: %d, Time Delta: %d ns, Cycles Delta: %d, Instructions Delta: %d, LLC Misses Delta: %d, Timestamp: %d", + cpuID, event.Rmid, event.TimeDeltaNs, event.CyclesDelta, event.InstructionsDelta, event.LlcMissesDelta, event.Timestamp) totalEvents++ reader.Pop() From 789095952f92c0b7bd0405abc1b828b4a9ccdcb6 Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Fri, 21 Feb 2025 14:23:02 +0000 Subject: [PATCH 32/33] add barrier timestamp to improve event ordering --- cmd/collector/main.go | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/cmd/collector/main.go b/cmd/collector/main.go index 91a963c..aef8713 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -20,6 +20,11 @@ import ( // Note: taskCounterEvent is auto-generated by bpf2go // Note: taskCounterRmidMetadata is auto-generated by bpf2go +// nanotime returns monotonic time in nanoseconds. We get this from the runtime +// +//go:linkname nanotime runtime.nanotime +func nanotime() int64 + // dumpRmidMap dumps all valid RMIDs and their metadata func dumpRmidMap(objs *taskCounterObjects) { var key uint32 @@ -189,12 +194,28 @@ func main() { dumpRmidMap(&objs) // Dump RMID map before exiting return case <-ticker.C: + // Get current monotonic timestamp before starting the batch + startTimestamp := uint64(nanotime()) + log.Printf("Starting batch at timestamp: %d", startTimestamp) + if err := reader.Start(); err != nil { log.Fatal(err) } - // Process all available events + // Process all available events that occurred before startTimestamp for !reader.Empty() { + // Check if next event's timestamp is after our start timestamp + ts, err := reader.PeekTimestamp() + if err != nil { + log.Printf("Error peeking timestamp: %s", err) + break + } + + // Skip processing this batch if we see an event from the future + if ts > startTimestamp { + break + } + ring, cpuID, err := reader.CurrentRing() if err != nil { log.Printf("Error getting current ring: %s", err) From 73feea97d3c2c721a3e7f554fa63c79fc2685145 Mon Sep 17 00:00:00 2001 From: Jonathan Perry Date: Fri, 21 Feb 2025 14:42:56 +0000 Subject: [PATCH 33/33] fix large timestamp deltas at ebpf program start --- cmd/collector/task_counter.c | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/cmd/collector/task_counter.c b/cmd/collector/task_counter.c index 5d3f68c..3a9e1ad 100644 --- a/cmd/collector/task_counter.c +++ b/cmd/collector/task_counter.c @@ -226,8 +226,7 @@ int count_events(void *ctx) { e.rmid = args->rmid; - // Get current timestamp - __u64 now = bpf_ktime_get_ns(); + __u64 now; // Get previous counters __u32 zero = 0; @@ -260,13 +259,17 @@ int count_events(void *ctx) { } // Compute time delta and update timestamp - e.time_delta_ns = compute_delta(now, prev->timestamp); + now = bpf_ktime_get_ns(); + // if prev->timestamp is 0, this is the first event. We did not have the counter and timestamp values, + // so do not emit this event -- only use it to initialize the counters + if (prev->timestamp != 0) { + e.time_delta_ns = compute_delta(now, prev->timestamp); + e.timestamp = now; + // Submit the event to the perf event array + bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, &e, sizeof(e)); + } prev->timestamp = now; - - // Submit the event to the perf event array - e.timestamp = now; - bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, &e, sizeof(e)); - + increase_count(ctx); return 0;