Skip to content

Commit

Permalink
feat: Detect fields based on per-tenant configuration and put them in…
Browse files Browse the repository at this point in the history
…to structured metadata at ingest time (#15188)

This PR introduces a new feature that allows for extraction of "fields" into structured metadata at ingest time.

Fields can either be regular labels, structured metadata keys, or keys from `logfmt` or `json` formatted log lines. 

The fields are defined in a per-tenant configuration as `map[string][]string`, where the key is the target key of the structured metadata, and the value is the list of source fields in given order and the order given above.

Example configuration:

```yaml
limits_config:
  discover_generic_fields:
    fields:
      trace_id:
        - "trace_id"
        - "TRACE_ID"
        - "traceID"
        - "TraceID"
      org_id:
        - "org_id"
        - "tenant_id"
        - "user_id"
```

While parsing of log lines comes with a certain penalty at ingest time (increased latency and CPU usage on distributors), the idea is to extract certain fields once to avoid parsing the log lines every single time at query time. This is mainly useful in combination with bloom filters.

**JSONpath support**

Should the value of the config map support jsonpath expression, such as

```
limits_config:
  discover_generic_fields:
    fields:
      ticket_id:
        - "message.ticket.id"
```

Where the log line looks like this:

```json
{"timestamp": 1733128051000, "message": {"ticket": {"id": "2024-d95f87018cdb1f10"}}}
```

---
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Jan 7, 2025
1 parent d8dc10f commit 7033091
Show file tree
Hide file tree
Showing 9 changed files with 327 additions and 73 deletions.
6 changes: 6 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3400,6 +3400,12 @@ The `limits_config` block configures global and per-tenant limits in Loki. The v
# CLI flag: -validation.increment-duplicate-timestamps
[increment_duplicate_timestamp: <boolean> | default = false]

# Experimental: Detect fields from stream labels, structured metadata, or
# json/logfmt formatted log line and put them into structured metadata of the
# log entry.
discover_generic_fields:
[fields: <map of string to list of strings>]

# If no service_name label exists, Loki maps a single label from the configured
# list to service_name. If none of the configured labels exist in the stream,
# label is set to unknown_service. Empty list disables setting the label.
Expand Down
26 changes: 20 additions & 6 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math"
"net/http"
"runtime/pprof"
"slices"
"sort"
"strconv"
Expand Down Expand Up @@ -460,8 +461,9 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log

now := time.Now()
validationContext := d.validator.getValidationContextForTime(now, tenantID)
levelDetector := newLevelDetector(validationContext)
shouldDiscoverLevels := levelDetector.shouldDiscoverLogLevels()
fieldDetector := newFieldDetector(validationContext)
shouldDiscoverLevels := fieldDetector.shouldDiscoverLogLevels()
shouldDiscoverGenericFields := fieldDetector.shouldDiscoverGenericFields()

shardStreamsCfg := d.validator.Limits.ShardStreams(tenantID)
maybeShardByRate := func(stream logproto.Stream, pushSize int) {
Expand Down Expand Up @@ -547,10 +549,22 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}
}
if shouldDiscoverLevels {
logLevel, ok := levelDetector.extractLogLevel(lbs, structuredMetadata, entry)
if ok {
entry.StructuredMetadata = append(entry.StructuredMetadata, logLevel)
}
pprof.Do(ctx, pprof.Labels("action", "discover_log_level"), func(_ context.Context) {
logLevel, ok := fieldDetector.extractLogLevel(lbs, structuredMetadata, entry)
if ok {
entry.StructuredMetadata = append(entry.StructuredMetadata, logLevel)
}
})
}
if shouldDiscoverGenericFields {
pprof.Do(ctx, pprof.Labels("action", "discover_generic_fields"), func(_ context.Context) {
for field, hints := range fieldDetector.validationContext.discoverGenericFields {
extracted, ok := fieldDetector.extractGenericField(field, hints, lbs, structuredMetadata, entry)
if ok {
entry.StructuredMetadata = append(entry.StructuredMetadata, extracted)
}
}
})
}
stream.Entries[n] = entry

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (

"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/logql/log/jsonexpr"
"github.com/grafana/loki/v3/pkg/logql/log/logfmt"
"github.com/grafana/loki/v3/pkg/util/constants"
)
Expand All @@ -31,46 +33,43 @@ var (
errorAbbrv = []byte("err")
critical = []byte("critical")
fatal = []byte("fatal")

defaultAllowedLevelFields = []string{"level", "LEVEL", "Level", "severity", "SEVERITY", "Severity", "lvl", "LVL", "Lvl"}
)

func allowedLabelsForLevel(allowedFields []string) map[string]struct{} {
func allowedLabelsForLevel(allowedFields []string) []string {
if len(allowedFields) == 0 {
return map[string]struct{}{
"level": {}, "LEVEL": {}, "Level": {},
"severity": {}, "SEVERITY": {}, "Severity": {},
"lvl": {}, "LVL": {}, "Lvl": {},
}
}
allowedFieldsMap := make(map[string]struct{}, len(allowedFields))
for _, field := range allowedFields {
allowedFieldsMap[field] = struct{}{}
return defaultAllowedLevelFields
}
return allowedFieldsMap
return allowedFields
}

type LevelDetector struct {
validationContext validationContext
allowedLabels map[string]struct{}
type FieldDetector struct {
validationContext validationContext
allowedLevelLabels []string
}

func newLevelDetector(validationContext validationContext) *LevelDetector {
logLevelFields := validationContext.logLevelFields
return &LevelDetector{
validationContext: validationContext,
allowedLabels: allowedLabelsForLevel(logLevelFields),
func newFieldDetector(validationContext validationContext) *FieldDetector {
return &FieldDetector{
validationContext: validationContext,
allowedLevelLabels: allowedLabelsForLevel(validationContext.logLevelFields),
}
}

func (l *LevelDetector) shouldDiscoverLogLevels() bool {
func (l *FieldDetector) shouldDiscoverLogLevels() bool {
return l.validationContext.allowStructuredMetadata && l.validationContext.discoverLogLevels
}

func (l *LevelDetector) extractLogLevel(labels labels.Labels, structuredMetadata labels.Labels, entry logproto.Entry) (logproto.LabelAdapter, bool) {
levelFromLabel, hasLevelLabel := l.hasAnyLevelLabels(labels)
func (l *FieldDetector) shouldDiscoverGenericFields() bool {
return l.validationContext.allowStructuredMetadata && len(l.validationContext.discoverGenericFields) > 0
}

func (l *FieldDetector) extractLogLevel(labels labels.Labels, structuredMetadata labels.Labels, entry logproto.Entry) (logproto.LabelAdapter, bool) {
levelFromLabel, hasLevelLabel := labelsContainAny(labels, l.allowedLevelLabels)
var logLevel string
if hasLevelLabel {
logLevel = levelFromLabel
} else if levelFromMetadata, ok := l.hasAnyLevelLabels(structuredMetadata); ok {
} else if levelFromMetadata, ok := labelsContainAny(structuredMetadata, l.allowedLevelLabels); ok {
logLevel = levelFromMetadata
} else {
logLevel = l.detectLogLevelFromLogEntry(entry, structuredMetadata)
Expand All @@ -85,16 +84,33 @@ func (l *LevelDetector) extractLogLevel(labels labels.Labels, structuredMetadata
}, true
}

func (l *LevelDetector) hasAnyLevelLabels(labels labels.Labels) (string, bool) {
for lbl := range l.allowedLabels {
if labels.Has(lbl) {
return labels.Get(lbl), true
func (l *FieldDetector) extractGenericField(name string, hints []string, labels labels.Labels, structuredMetadata labels.Labels, entry logproto.Entry) (logproto.LabelAdapter, bool) {

var value string
if v, ok := labelsContainAny(labels, hints); ok {
value = v
} else if v, ok := labelsContainAny(structuredMetadata, hints); ok {
value = v
} else {
value = l.detectGenericFieldFromLogEntry(entry, hints)
}

if value == "" {
return logproto.LabelAdapter{}, false
}
return logproto.LabelAdapter{Name: name, Value: value}, true
}

func labelsContainAny(labels labels.Labels, names []string) (string, bool) {
for _, name := range names {
if labels.Has(name) {
return labels.Get(name), true
}
}
return "", false
}

func (l *LevelDetector) detectLogLevelFromLogEntry(entry logproto.Entry, structuredMetadata labels.Labels) string {
func (l *FieldDetector) detectLogLevelFromLogEntry(entry logproto.Entry, structuredMetadata labels.Labels) string {
// otlp logs have a severity number, using which we are defining the log levels.
// Significance of severity number is explained in otel docs here https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber
if otlpSeverityNumberTxt := structuredMetadata.Get(push.OTLPSeverityNumber); otlpSeverityNumberTxt != "" {
Expand Down Expand Up @@ -123,13 +139,24 @@ func (l *LevelDetector) detectLogLevelFromLogEntry(entry logproto.Entry, structu
return l.extractLogLevelFromLogLine(entry.Line)
}

func (l *LevelDetector) extractLogLevelFromLogLine(log string) string {
logSlice := unsafe.Slice(unsafe.StringData(log), len(log))
func (l *FieldDetector) detectGenericFieldFromLogEntry(entry logproto.Entry, hints []string) string {
lineBytes := unsafe.Slice(unsafe.StringData(entry.Line), len(entry.Line))
var v []byte
if isJSON(entry.Line) {
v = getValueUsingJSONParser(lineBytes, hints)
} else if isLogFmt(lineBytes) {
v = getValueUsingLogfmtParser(lineBytes, hints)
}
return string(v)
}

func (l *FieldDetector) extractLogLevelFromLogLine(log string) string {
lineBytes := unsafe.Slice(unsafe.StringData(log), len(log))
var v []byte
if isJSON(log) {
v = l.getValueUsingJSONParser(logSlice)
} else if isLogFmt(logSlice) {
v = l.getValueUsingLogfmtParser(logSlice)
v = getValueUsingJSONParser(lineBytes, l.allowedLevelLabels)
} else if isLogFmt(lineBytes) {
v = getValueUsingLogfmtParser(lineBytes, l.allowedLevelLabels)
} else {
return detectLevelFromLogLine(log)
}
Expand All @@ -154,24 +181,42 @@ func (l *LevelDetector) extractLogLevelFromLogLine(log string) string {
}
}

func (l *LevelDetector) getValueUsingLogfmtParser(line []byte) []byte {
func getValueUsingLogfmtParser(line []byte, hints []string) []byte {
d := logfmt.NewDecoder(line)
// In order to have the same behaviour as the JSON field extraction,
// the full line needs to be parsed to extract all possible matching fields.
pos := len(hints) // the index of the hint that matches
var res []byte
for !d.EOL() && d.ScanKeyval() {
if _, ok := l.allowedLabels[string(d.Key())]; ok {
return d.Value()
k := unsafe.String(unsafe.SliceData(d.Key()), len(d.Key()))
for x, hint := range hints {
if strings.EqualFold(k, hint) && x < pos {
res, pos = d.Value(), x
// If there is only a single hint, or the matching hint is the first one,
// we can stop parsing the rest of the line and return early.
if x == 0 {
return res
}
}
}
}
return nil
return res
}

func (l *LevelDetector) getValueUsingJSONParser(log []byte) []byte {
for allowedLabel := range l.allowedLabels {
l, _, _, err := jsonparser.Get(log, allowedLabel)
if err == nil {
return l
func getValueUsingJSONParser(line []byte, hints []string) []byte {
var res []byte
for _, allowedLabel := range hints {
parsed, err := jsonexpr.Parse(allowedLabel, false)
if err != nil {
continue
}
l, _, _, err := jsonparser.Get(line, log.JSONPathToStrings(parsed)...)
if err != nil {
continue
}
return l
}
return nil
return res
}

func isLogFmt(line []byte) bool {
Expand Down
Loading

0 comments on commit 7033091

Please sign in to comment.