Skip to content

Add windowsevent stage loki process #2545

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Feb 4, 2025
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ Main (unreleased)

### Features

- Add the possibility to export span events as logs in `otelcol.connector.spanlogs`. (@steve-hb)
- (_Experimental_) Add a `stage.windowsevent` block in the `loki.process` component. This aims to replace the existing `stage.eventlogmessage`. (@wildum)

### Enhancements

- Add the possibility to export span events as logs in `otelcol.connector.spanlogs`. (@steve-hb)

- (_Experimental_) Log instance label key in `database_observability.mysql` (@cristiangreco)

- (_Experimental_) Improve parsing of truncated queries in `database_observability.mysql` (@cristiangreco)
Expand Down
90 changes: 90 additions & 0 deletions docs/sources/reference/components/loki/loki.process.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ The following blocks are supported inside the definition of `loki.process`:
| stage.template | [stage.template][] | Configures a `template` processing stage. | no |
| stage.tenant | [stage.tenant][] | Configures a `tenant` processing stage. | no |
| stage.timestamp | [stage.timestamp][] | Configures a `timestamp` processing stage. | no |
| stage.windowsevent | [stage.windowsevent][] | Configures a `windowsevent` processing stage. | no |

A user can provide any number of these stage blocks nested inside `loki.process`; these will run in order of appearance in the configuration file.

Expand Down Expand Up @@ -98,6 +99,7 @@ A user can provide any number of these stage blocks nested inside `loki.process`
[stage.template]: #stagetemplate-block
[stage.tenant]: #stagetenant-block
[stage.timestamp]: #stagetimestamp-block
[stage.windowsevent]: #stagewindowsevent-block


### stage.cri block
Expand Down Expand Up @@ -242,6 +244,8 @@ stage.drop {

### stage.eventlogmessage block

Deprecated in favor of the [stage.windowsevent block][stage.windowsevent].

The `eventlogmessage` stage extracts data from the Message string that appears in the Windows Event Log.

The following arguments are supported:
Expand Down Expand Up @@ -1694,6 +1698,92 @@ loki.process "example" {
The `json` stage extracts the IP address from the `client_ip` key in the log line.
Then the extracted `ip` value is given as source to geoip stage. The geoip stage performs a lookup on the IP and populates the shared map with the data from the city database results in addition to the custom lookups. Lastly, the custom lookup fields from the shared map are added as labels.

### stage.windowsevent block

The `windowsevent` stage extracts data from the message string in the Windows Event Log.

The following arguments are supported:

| Name | Type | Description | Default | Required |
|-----------------------|----------|--------------------------------------------------------|-----------|----------|
| `source` | `string` | Name of the field in the extracted data to parse. | `message` | no |
| `overwrite_existing` | `bool` | Whether to overwrite existing extracted data fields. | `false` | no |
| `drop_invalid_labels` | `bool` | Whether to drop fields that are not valid label names. | `false` | no |

When `overwrite_existing` is set to `true`, the stage overwrites existing extracted data fields with the same name.
If set to `false`, the `_extracted` suffix is appended to an existing field name.

When `drop_invalid_labels` is set to `true`, the stage drops fields that aren't valid label names.
If set to `false`, the stage will automatically convert them into valid labels, replacing invalid characters with underscores.

The `windowsevent` stage expects the message to be structured in sections that are split by empty lines.

The first section of the input is treated as a whole block and stored in the extracted map with the key `Description`.

Sections following the Description are expected to contain key-value pairs in the format key:value.

If the first line of a section has no value, for example "Subject:", the key will act as a prefix for subsequent keys in the same section.

If a line within a section does not include the `:` symbol, it is considered part of the previous entry's value. The line is appended to the previous value, separated by a comma.

Lines in a section without a preceding valid entry (key-value pair) are ignored and discarded.

#### Example with `loki.source.windowsevent`

```alloy
loki.source.windowsevent "security" {
eventlog_name = "Security"
forward_to = [loki.process.default.receiver]
}

loki.process "default" {
forward_to = [loki.write.default.receiver]

stage.json {
expressions = {
message = "",
Overwritten = "",
}
}

stage.windowsevent {
source = "message"
overwrite_existing = true
}

stage.labels {
values = {
Description = "",
Subject_SecurityID = "",
ReadOP = "Subject_ReadOperation",
}
}
}
```

The `loki.source.windowsevent` component forwards Windows security events to the `loki.process` component.

Given the following event:
```
{"event_id": 1, "Overwritten": "old", "message": ""Special privileges assigned to new logon.\r\n\r\nSubject:\r\n\tSecurity ID:\t\tS-1-1-1\r\n\tAccount Name:\t\tSYSTEM\r\n\tAccount Domain:\t\tNT AUTHORITY\r\n\tLogon ID:\t\t0xAAA\r\n\r\nPrivileges:\t\tSeAssignPrimaryTokenPrivilege\r\n\t\t\tSeTcbPrivilege\r\n\t\t\tSeSecurityPrivilege\r\n\t\t\tSeTakeOwnershipPrivilege\r\n\t\t\tSeLoadDriverPrivilege\r\n\t\t\tSeBackupPrivilege\r\n\t\t\tSeRestorePrivilege\r\n\t\t\tSeDebugPrivilege\r\n\t\t\tSeAuditPrivilege\r\n\t\t\tSeSystemEnvironmentPrivilege\r\n\t\t\tSeImpersonatePrivilege\r\n\t\t\tSeDelegateSessionUserImpersonatePrivilege""}
```

The `json` stage would create the following key-value pairs in the set of extracted data:

- `message`: `"Special privileges assigned to new logon.\r\n\r\nSubject:\r\n\tSecurity ID:\t\tS-1-1-1\r\n\tAccount Name:\t\tSYSTEM\r\n\tAccount Domain:\t\tNT AUTHORITY\r\n\tLogon ID:\t\t0xAAA\r\n\r\nPrivileges:\t\tSeAssignPrimaryTokenPrivilege\r\n\t\t\tSeTcbPrivilege\r\n\t\t\tSeSecurityPrivilege"`
- `Overwritten`: `old`

The `windowsevent` stage will parse the value of `message` from the extracted data and append/overwrite the following key-value pairs to the set of extracted data:

- `Description`: "Special privileges assigned to new logon.",
- `Subject_SecurityID`: "S-1-1-1",
- `Subject_AccountName`: "SYSTEM",
- `Subject_AccountDomain`: "NT AUTHORITY",
- `Subject_LogonID`: "0xAAA",
- `Privileges`: "SeAssignPrimaryTokenPrivilege,SeTcbPrivilege,SeSecurityPrivilege",

Finally the `labels` stage will use the extracted values `Description`, `Subject_SecurityID` and `Subject_ReadOperation` to add them as labels of the log entry before forwarding it to a `loki.write` component.

## Exported fields

The following fields are exported and can be referenced by other components:
Expand Down
2 changes: 1 addition & 1 deletion internal/component/loki/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (c *Component) Update(args component.Arguments) error {
c.entryHandler.Stop()
}

pipeline, err := stages.NewPipeline(c.opts.Logger, newArgs.Stages, &c.opts.ID, c.opts.Registerer)
pipeline, err := stages.NewPipeline(c.opts.Logger, newArgs.Stages, &c.opts.ID, c.opts.Registerer, c.opts.MinStability)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion internal/component/loki/process/stages/decolorize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"

"github.com/grafana/alloy/internal/featuregate"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

Expand Down Expand Up @@ -40,7 +41,7 @@ func TestPipeline_Decolorize(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()

pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer)
pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable)
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 2 additions & 1 deletion internal/component/loki/process/stages/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/util"
)

Expand Down Expand Up @@ -433,7 +434,7 @@ func TestDropPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
plName := "test_drop_pipeline"
logger := util.TestAlloyLogger(t)
pl, err := NewPipeline(logger, loadConfig(testDropAlloy), &plName, registry)
pl, err := NewPipeline(logger, loadConfig(testDropAlloy), &plName, registry, featuregate.StabilityGenerallyAvailable)
require.NoError(t, err)
out := processEntries(pl,
newEntry(nil, nil, testMatchLogLineApp1, time.Now()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/syntax"
)

Expand Down Expand Up @@ -107,7 +108,7 @@ func TestEventLogMessage_simple(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()

pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer)
pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable)
assert.NoError(t, err, "Expected pipeline creation to not result in error")
out := processEntries(pl,
newEntry(map[string]interface{}{
Expand Down Expand Up @@ -267,7 +268,7 @@ func TestEventLogMessage_Real(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()

pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer)
pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable)
assert.NoError(t, err, "Expected pipeline creation to not result in error")
out := processEntries(pl,
newEntry(map[string]interface{}{testData.sourcekey: testData.msgdata}, nil, testData.msgdata, time.Now()))[0]
Expand Down Expand Up @@ -323,7 +324,7 @@ func TestEventLogMessage_invalid(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()

pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer)
pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable)
assert.NoError(t, err, "Expected pipeline creation to not result in error")
out := processEntries(pl,
newEntry(map[string]interface{}{testData.sourcekey: testData.msgdata}, nil, testData.msgdata, time.Now()))[0]
Expand All @@ -335,7 +336,7 @@ func TestEventLogMessage_invalid(t *testing.T) {
func TestEventLogMessage_invalidString(t *testing.T) {
t.Parallel()

pl, err := NewPipeline(util_log.Logger, loadConfig(testEvtLogMsgYamlDefaults), nil, prometheus.DefaultRegisterer)
pl, err := NewPipeline(util_log.Logger, loadConfig(testEvtLogMsgYamlDefaults), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable)
assert.NoError(t, err, "Expected pipeline creation to not result in error")
out := processEntries(pl,
newEntry(map[string]interface{}{"message": nil}, nil, "", time.Now()))
Expand Down
9 changes: 5 additions & 4 deletions internal/component/loki/process/stages/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strings"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/syntax"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -55,7 +56,7 @@ func (args *CRIConfig) Validate() error {

// NewDocker creates a predefined pipeline for parsing entries in the Docker
// json log format.
func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, error) {
func NewDocker(logger log.Logger, registerer prometheus.Registerer, minStability featuregate.Stability) (Stage, error) {
stages := []StageConfig{
{
JSONConfig: &JSONConfig{
Expand Down Expand Up @@ -83,7 +84,7 @@ func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, erro
},
},
}
return NewPipeline(logger, stages, nil, registerer)
return NewPipeline(logger, stages, nil, registerer, minStability)
}

type cri struct {
Expand Down Expand Up @@ -169,7 +170,7 @@ func (c *cri) ensureTruncateIfRequired(e *Entry) {

// NewCRI creates a predefined pipeline for parsing entries in the CRI log
// format.
func NewCRI(logger log.Logger, config CRIConfig, registerer prometheus.Registerer) (Stage, error) {
func NewCRI(logger log.Logger, config CRIConfig, registerer prometheus.Registerer, minStability featuregate.Stability) (Stage, error) {
base := []StageConfig{
{
RegexConfig: &RegexConfig{
Expand Down Expand Up @@ -199,7 +200,7 @@ func NewCRI(logger log.Logger, config CRIConfig, registerer prometheus.Registere
},
}

p, err := NewPipeline(logger, base, nil, registerer)
p, err := NewPipeline(logger, base, nil, registerer, minStability)
if err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions internal/component/loki/process/stages/extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/alloy/internal/featuregate"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

Expand Down Expand Up @@ -68,7 +69,7 @@ func TestNewDocker(t *testing.T) {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := NewDocker(util_log.Logger, prometheus.DefaultRegisterer)
p, err := NewDocker(util_log.Logger, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable)
if err != nil {
t.Fatalf("failed to create Docker parser: %s", err)
}
Expand Down Expand Up @@ -192,7 +193,7 @@ func TestCRI_tags(t *testing.T) {
MaxPartialLineSize: tt.maxPartialLineSize,
MaxPartialLineSizeTruncate: tt.maxPartialLineSizeTruncate,
}
p, err := NewCRI(util_log.Logger, cfg, prometheus.DefaultRegisterer)
p, err := NewCRI(util_log.Logger, cfg, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable)
require.NoError(t, err)

got := make([]string, 0)
Expand Down Expand Up @@ -275,7 +276,7 @@ func TestNewCri(t *testing.T) {
t.Run(tName, func(t *testing.T) {
t.Parallel()
cfg := DefaultCRIConfig
p, err := NewCRI(util_log.Logger, cfg, prometheus.DefaultRegisterer)
p, err := NewCRI(util_log.Logger, cfg, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable)
if err != nil {
t.Fatalf("failed to create CRI parser: %s", err)
}
Expand Down
5 changes: 3 additions & 2 deletions internal/component/loki/process/stages/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"

"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/util"
"github.com/grafana/alloy/syntax"
)
Expand Down Expand Up @@ -79,7 +80,7 @@ func TestPipeline_JSON(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()

pl, err := NewPipeline(logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer)
pl, err := NewPipeline(logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable)
assert.NoError(t, err, "Expected pipeline creation to not result in error")
out := processEntries(pl, newEntry(nil, nil, testData.entry, time.Now()))[0]
assert.Equal(t, testData.expectedExtract, out.Extracted)
Expand Down Expand Up @@ -341,7 +342,7 @@ func TestJSONParser_Parse(t *testing.T) {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := New(logger, nil, tt.config, nil)
p, err := New(logger, nil, tt.config, nil, featuregate.StabilityGenerallyAvailable)
assert.NoError(t, err, "failed to create json parser: %s", err)
out := processEntries(p, newEntry(tt.extracted, nil, tt.entry, time.Now()))[0]

Expand Down
5 changes: 3 additions & 2 deletions internal/component/loki/process/stages/labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"

"github.com/grafana/alloy/internal/featuregate"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

Expand Down Expand Up @@ -40,7 +41,7 @@ var testLabelsLogLineWithMissingKey = `
`

func TestLabelsPipeline_Labels(t *testing.T) {
pl, err := NewPipeline(util_log.Logger, loadConfig(testLabelsYaml), nil, prometheus.DefaultRegisterer)
pl, err := NewPipeline(util_log.Logger, loadConfig(testLabelsYaml), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable)
if err != nil {
t.Fatal(err)
}
Expand All @@ -57,7 +58,7 @@ func TestLabelsPipelineWithMissingKey_Labels(t *testing.T) {
var buf bytes.Buffer
w := log.NewSyncWriter(&buf)
logger := log.NewLogfmtLogger(w)
pl, err := NewPipeline(logger, loadConfig(testLabelsYaml), nil, prometheus.DefaultRegisterer)
pl, err := NewPipeline(logger, loadConfig(testLabelsYaml), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable)
if err != nil {
t.Fatal(err)
}
Expand Down
7 changes: 4 additions & 3 deletions internal/component/loki/process/stages/limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/alloy/internal/featuregate"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

Expand Down Expand Up @@ -58,7 +59,7 @@ var plName = "testPipeline"
// TestLimitPipeline is used to verify we properly parse the yaml config and create a working pipeline
func TestLimitWaitPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitWaitAlloy), &plName, registry)
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitWaitAlloy), &plName, registry, featuregate.StabilityGenerallyAvailable)
logs := make([]Entry, 0)
logCount := 5
for i := 0; i < logCount; i++ {
Expand All @@ -76,7 +77,7 @@ func TestLimitWaitPipeline(t *testing.T) {
// TestLimitPipeline is used to verify we properly parse the yaml config and create a working pipeline
func TestLimitDropPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitDropAlloy), &plName, registry)
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitDropAlloy), &plName, registry, featuregate.StabilityGenerallyAvailable)
logs := make([]Entry, 0)
logCount := 10
for i := 0; i < logCount; i++ {
Expand All @@ -94,7 +95,7 @@ func TestLimitDropPipeline(t *testing.T) {
// TestLimitByLabelPipeline is used to verify we properly parse the yaml config and create a working pipeline
func TestLimitByLabelPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitByLabelAlloy), &plName, registry)
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitByLabelAlloy), &plName, registry, featuregate.StabilityGenerallyAvailable)
logs := make([]Entry, 0)
logCount := 5
for i := 0; i < logCount; i++ {
Expand Down
Loading
Loading