Skip to content

Commit

Permalink
internal/appsec/waf: metrics fixes (#1247)
Browse files Browse the repository at this point in the history
  • Loading branch information
Julio-Guerra authored Apr 18, 2022
1 parent e2c7741 commit 243d731
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 63 deletions.
87 changes: 47 additions & 40 deletions internal/appsec/waf.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/dyngo"
"gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/dyngo/instrumentation"
"gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/dyngo/instrumentation/grpcsec"
"gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/dyngo/instrumentation/httpsec"
"gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/waf"
Expand Down Expand Up @@ -95,12 +94,15 @@ func registerWAF(rules []byte, timeout time.Duration, limiter Limiter, obfCfg *O

// newWAFEventListener returns the WAF event listener to register in order to enable it.
func newHTTPWAFEventListener(handle *waf.Handle, addresses []string, timeout time.Duration, limiter Limiter) dyngo.EventListener {
var once sync.Once
var monitorRulesOnce sync.Once // per instantiation

return httpsec.OnHandlerOperationStart(func(op *httpsec.Operation, args httpsec.HandlerOperationArgs) {
var body interface{}

op.On(httpsec.OnSDKBodyOperationStart(func(op *httpsec.SDKBodyOperation, args httpsec.SDKBodyOperationArgs) {
body = args.Body
}))

// At the moment, AppSec doesn't block the requests, and so we can use the fact we are in monitoring-only mode
// to call the WAF only once at the end of the handler operation.
op.On(httpsec.OnHandlerOperationFinish(func(op *httpsec.Operation, res httpsec.HandlerOperationRes) {
Expand Down Expand Up @@ -141,22 +143,17 @@ func newHTTPWAFEventListener(handle *waf.Handle, addresses []string, timeout tim
values[serverResponseStatusAddr] = res.Status
}
}
wafRunStartTime := time.Now()
matches := runWAF(wafCtx, values, timeout)
overallWAFRunDuration := time.Since(wafRunStartTime)

// Log WAF metrics.
// Add WAF metrics.
rInfo := handle.RulesetInfo()
op.AddTag(wafTimeoutTag, float64(wafCtx.TotalTimeouts()))
// Rules version is set for every request to help the backend associate WAF duration metrics with rule version
op.AddTag(eventRulesVersionTag, rInfo.Version)
// time.Duration.Microseconds() is only as of go1.13, so we do it manually here
addWAFDurationTags(&op.TagsHolder, float64(wafCtx.TotalRuntime()), float64(overallWAFRunDuration.Nanoseconds()))
// Log the following metrics once per instantiation of a WAF handle
once.Do(func() {
addRulesetInfoTags(&op.TagsHolder, rInfo)
overallRuntimeNs, internalRuntimeNs := wafCtx.TotalRuntime()
addWAFMonitoringTags(op, rInfo.Version, overallRuntimeNs, internalRuntimeNs, wafCtx.TotalTimeouts())

// Add the following metrics once per instantiation of a WAF handle
monitorRulesOnce.Do(func() {
addRulesMonitoringTags(op, rInfo)
op.AddTag(ext.ManualKeep, samplernames.AppSec)
op.AddTag(wafVersionTag, waf.Version())
})

// Log the attacks if any
Expand All @@ -168,28 +165,29 @@ func newHTTPWAFEventListener(handle *waf.Handle, addresses []string, timeout tim
op.AddSecurityEvents(matches)
}
}))

})
}

// newGRPCWAFEventListener returns the WAF event listener to register in order
// to enable it.
func newGRPCWAFEventListener(handle *waf.Handle, _ []string, timeout time.Duration, limiter Limiter) dyngo.EventListener {
var monitorRulesOnce sync.Once // per instantiation

return grpcsec.OnHandlerOperationStart(func(op *grpcsec.HandlerOperation, handlerArgs grpcsec.HandlerOperationArgs) {
// Limit the maximum number of security events, as a streaming RPC could
// receive unlimited number of messages where we could find security events
const maxWAFEventsPerRequest = 10
var (
nbEvents uint32
logOnce sync.Once
metricsOnce sync.Once
wafRunDuration waf.AtomicU64
wafBindingsRunDuration waf.AtomicU64
wafTimeouts waf.AtomicU64
nbEvents uint32
logOnce sync.Once // per request
overallRuntimeNs waf.AtomicU64
internalRuntimeNs waf.AtomicU64
nbTimeouts waf.AtomicU64

events []json.RawMessage
mu sync.Mutex
mu sync.Mutex // events mutex
)

op.On(grpcsec.OnReceiveOperationFinish(func(_ grpcsec.ReceiveOperation, res grpcsec.ReceiveOperationRes) {
if atomic.LoadUint32(&nbEvents) == maxWAFEventsPerRequest {
logOnce.Do(func() {
Expand Down Expand Up @@ -219,14 +217,16 @@ func newGRPCWAFEventListener(handle *waf.Handle, _ []string, timeout time.Durati
if md := handlerArgs.Metadata; len(md) > 0 {
values[grpcServerRequestMetadata] = md
}
now := time.Now()
event := runWAF(wafCtx, values, timeout)

// WAF run durations are WAF context bound. As of now we need to keep track of those externally since
// we use a new WAF context for each callback. When we are able to re-use the same WAF context across
// callbacks, we can get rid of these variables and simply use the WAF bindings in OnHandlerOperationFinish.
wafBindingsRunDuration.Add(uint64(time.Since(now).Nanoseconds()))
wafRunDuration.Add(wafCtx.TotalRuntime())
wafTimeouts.Add(wafCtx.TotalTimeouts())
overall, internal := wafCtx.TotalRuntime()
overallRuntimeNs.Add(overall)
internalRuntimeNs.Add(internal)
nbTimeouts.Add(wafCtx.TotalTimeouts())

if len(event) == 0 {
return
}
Expand All @@ -236,18 +236,18 @@ func newGRPCWAFEventListener(handle *waf.Handle, _ []string, timeout time.Durati
events = append(events, event)
mu.Unlock()
}))

op.On(grpcsec.OnHandlerOperationFinish(func(op *grpcsec.HandlerOperation, _ grpcsec.HandlerOperationRes) {
rInfo := handle.RulesetInfo()
// Rules version is set for every request to help the backend associate WAF duration metrics with rule version
op.AddTag(eventRulesVersionTag, rInfo.Version)
op.AddTag(wafTimeoutTag, float64(wafTimeouts))
addWAFDurationTags(&op.TagsHolder, float64(wafRunDuration), float64(wafBindingsRunDuration))
addWAFMonitoringTags(op, rInfo.Version, overallRuntimeNs.Load(), internalRuntimeNs.Load(), nbTimeouts.Load())

// Log the following metrics once per instantiation of a WAF handle
metricsOnce.Do(func() {
addRulesetInfoTags(&op.TagsHolder, rInfo)
monitorRulesOnce.Do(func() {
addRulesMonitoringTags(op, rInfo)
op.AddTag(ext.ManualKeep, samplernames.AppSec)
op.AddTag(wafVersionTag, waf.Version())
})

// Log the events if any
if len(events) > 0 && limiter.Allow() {
op.AddSecurityEvents(events...)
}
Expand Down Expand Up @@ -324,9 +324,12 @@ func supportedAddresses(ruleAddresses []string) (supportedHTTP, supportedGRPC, n
return
}

// addRulesetInfoTags adds information retrieved from `rInfo` as tags in `th`
func addRulesetInfoTags(th *instrumentation.TagsHolder, rInfo waf.RulesetInfo) {
// Set map to nil if empty to help with json encoding
type tagsHolder interface {
AddTag(string, interface{})
}

// Add the tags related to security rules monitoring
func addRulesMonitoringTags(th tagsHolder, rInfo waf.RulesetInfo) {
if len(rInfo.Errors) == 0 {
rInfo.Errors = nil
}
Expand All @@ -337,10 +340,14 @@ func addRulesetInfoTags(th *instrumentation.TagsHolder, rInfo waf.RulesetInfo) {
th.AddTag(eventRulesErrorsTag, string(rulesetErrors)) // avoid the tracer's call to fmt.Sprintf on the value
th.AddTag(eventRulesLoadedTag, float64(rInfo.Loaded))
th.AddTag(eventRulesFailedTag, float64(rInfo.Failed))
th.AddTag(wafVersionTag, waf.Version())
}

// addWAFDurationTags converts the provided durations (expected in ns) to ms and adds them as tags in `th`
func addWAFDurationTags(th *instrumentation.TagsHolder, runtime float64, totalRuntime float64) {
th.AddTag(wafDurationTag, runtime/1e3)
th.AddTag(wafDurationExtTag, totalRuntime/1e3)
// Add the tags related to the monitoring of the WAF
func addWAFMonitoringTags(th tagsHolder, rulesVersion string, overallRuntimeNs, internalRuntimeNs, timeouts uint64) {
// Rules version is set for every request to help the backend associate WAF duration metrics with rule version
th.AddTag(eventRulesVersionTag, rulesVersion)
th.AddTag(wafTimeoutTag, float64(timeouts))
th.AddTag(wafDurationTag, float64(internalRuntimeNs)/1e3) // ns to us
th.AddTag(wafDurationExtTag, float64(overallRuntimeNs)/1e3) // ns to us
}
11 changes: 8 additions & 3 deletions internal/appsec/waf/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ import (
// RunError the WAF can return when running it.
type RunError int

// AtomicU64 can be used to perform atomic operations on an uint64 type
type AtomicU64 uint64

// RulesetInfo stores the information - provided by the WAF - about WAF rules initialization.
type RulesetInfo struct {
// Number of rules successfully loaded
Expand Down Expand Up @@ -59,6 +56,9 @@ func (e RunError) Error() string {
}
}

// AtomicU64 can be used to perform atomic operations on an uint64 type
type AtomicU64 uint64

// Add atomically sums the current atomic value with the provided value `v`.
func (a *AtomicU64) Add(v uint64) {
atomic.AddUint64((*uint64)(a), v)
Expand All @@ -68,3 +68,8 @@ func (a *AtomicU64) Add(v uint64) {
func (a *AtomicU64) Inc() {
atomic.AddUint64((*uint64)(a), 1)
}

// Load atomically loads the value.
func (a *AtomicU64) Load() uint64 {
return atomic.LoadUint64((*uint64)(a))
}
32 changes: 27 additions & 5 deletions internal/appsec/waf/waf.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func NewHandle(jsonRule []byte, keyRegex, valueRegex string) (*Handle, error) {
incNbLiveCObjects()

// Decode the ruleset information returned by the WAF
errors, err := decodeMap((*wafObject)(&wafRInfo.errors))
errors, err := decodeErrors((*wafObject)(&wafRInfo.errors))
if err != nil {
C.ddwaf_destroy(handle)
decNbLiveCObjects()
Expand Down Expand Up @@ -203,8 +203,10 @@ func (waf *Handle) Close() {
// become available. Each request must have its own Context.
type Context struct {
waf *Handle
// Cumulated WAF runtime - in nanoseconds - for this context.
// Cumulated internal WAF run time - in nanoseconds - for this context.
totalRuntimeNs AtomicU64
// Cumulated overall run time - in nanoseconds - for this context.
totalOverallRuntimeNs AtomicU64
// Cumulated timeout count for this context.
timeoutCount AtomicU64

Expand Down Expand Up @@ -238,6 +240,15 @@ func NewContext(waf *Handle) *Context {

// Run the WAF with the given Go values and timeout.
func (c *Context) Run(values map[string]interface{}, timeout time.Duration) (matches []byte, err error) {
now := time.Now()
defer func() {
dt := time.Since(now)
c.totalOverallRuntimeNs.Add(uint64(dt.Nanoseconds()))
}()
return c.run(values, timeout)
}

func (c *Context) run(values map[string]interface{}, timeout time.Duration) (matches []byte, err error) {
if len(values) == 0 {
return
}
Expand Down Expand Up @@ -275,13 +286,13 @@ func (c *Context) Close() {

// TotalRuntime returns the cumulated waf runtime across various run calls within the same WAF context.
// Returned time is in nanoseconds.
func (c *Context) TotalRuntime() uint64 {
return uint64(c.totalRuntimeNs)
func (c *Context) TotalRuntime() (overallRuntimeNs, internalRuntimeNs uint64) {
return c.totalOverallRuntimeNs.Load(), c.totalRuntimeNs.Load()
}

// TotalTimeouts returns the cumulated amount of WAF timeouts across various run calls within the same WAF context.
func (c *Context) TotalTimeouts() uint64 {
return uint64(c.timeoutCount)
return c.timeoutCount.Load()
}

// Translate libddwaf return values into return values suitable to a Go program.
Expand Down Expand Up @@ -584,6 +595,17 @@ func (e *encoder) encodeUint64(n uint64, wo *wafObject) error {
return e.encodeString(strconv.FormatUint(n, 10), wo)
}

func decodeErrors(wo *wafObject) (map[string]interface{}, error) {
v, err := decodeMap(wo)
if err != nil {
return nil, err
}
if len(v) == 0 {
v = nil // enforce a nil map when the ddwaf map was empty
}
return v, nil
}

func decodeObject(wo *wafObject) (v interface{}, err error) {
if wo == nil {
return nil, errNilObjectPtr
Expand Down
7 changes: 5 additions & 2 deletions internal/appsec/waf/waf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,11 @@ func TestMetrics(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, matches)
// Make sure that WAF runtime was set
require.Greater(t, wafCtx.TotalRuntime(), uint64(0), "wafCtx runtime metric is not set")
require.LessOrEqual(t, wafCtx.TotalRuntime(), uint64(elapsedNS), "wafCtx runtime metric is incorrect")
overall, internal := wafCtx.TotalRuntime()
require.Greater(t, overall, uint64(0))
require.Greater(t, internal, uint64(0))
require.Greater(t, overall, internal)
require.LessOrEqual(t, overall, uint64(elapsedNS))
})

t.Run("Timeouts", func(t *testing.T) {
Expand Down
17 changes: 4 additions & 13 deletions internal/appsec/waf_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,6 @@ import (

// Test that internal functions used to set span tags use the correct types
func TestTagsTypes(t *testing.T) {
const (
eventRulesErrorsTag = "_dd.appsec.event_rules.errors"
eventRulesLoadedTag = "_dd.appsec.event_rules.loaded"
eventRulesFailedTag = "_dd.appsec.event_rules.error_count"
wafDurationTag = "_dd.appsec.waf.duration"
wafDurationExtTag = "_dd.appsec.waf.duration_ext"
)

th := instrumentation.NewTagsHolder()
rInfo := waf.RulesetInfo{
Version: "1.3.0",
Expand All @@ -35,15 +27,14 @@ func TestTagsTypes(t *testing.T) {
Errors: map[string]interface{}{"test": []string{"1", "2"}},
}

addRulesetInfoTags(&th, rInfo)
addWAFDurationTags(&th, 1.0, 2.0)
addRulesMonitoringTags(&th, rInfo)
addWAFMonitoringTags(&th, "1.2.3", 2, 1, 3)

tags := th.Tags()
_, ok := tags[eventRulesErrorsTag].(string)
require.True(t, ok)

for _, tag := range []string{eventRulesLoadedTag, eventRulesFailedTag, wafDurationTag, wafDurationExtTag} {
_, ok := tags[tag].(float64)
require.True(t, ok)
for _, tag := range []string{eventRulesLoadedTag, eventRulesFailedTag, wafDurationTag, wafDurationExtTag, wafVersionTag} {
require.Contains(t, tags, tag)
}
}

0 comments on commit 243d731

Please sign in to comment.