Skip to content

Commit

Permalink
[chore][pkg/stanza] Cleanup journald input operator files (open-telem…
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored Apr 3, 2024
1 parent ce28e75 commit d3d9b70
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 134 deletions.
152 changes: 152 additions & 0 deletions pkg/stanza/operator/input/journald/config_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build linux

package journald // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/journald"

import (
"context"
"fmt"
"os/exec"
"regexp"
"sort"
"time"

jsoniter "github.com/json-iterator/go"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)

const waitDuration = 1 * time.Second

func init() {
operator.Register(operatorType, func() operator.Builder { return NewConfig() })
}

// Build will build a journald input operator from the supplied configuration
func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
inputOperator, err := c.InputConfig.Build(logger)
if err != nil {
return nil, err
}

args, err := c.buildArgs()
if err != nil {
return nil, err
}

return &Input{
InputOperator: inputOperator,
newCmd: func(ctx context.Context, cursor []byte) cmd {
if cursor != nil {
args = append(args, "--after-cursor", string(cursor))
}
return exec.CommandContext(ctx, "journalctl", args...) // #nosec - ...
// journalctl is an executable that is required for this operator to function
},
json: jsoniter.ConfigFastest,
}, nil
}

func (c Config) buildArgs() ([]string, error) {
args := make([]string, 0, 10)

// Export logs in UTC time
args = append(args, "--utc")

// Export logs as JSON
args = append(args, "--output=json")

// Continue watching logs until cancelled
args = append(args, "--follow")

switch c.StartAt {
case "end":
case "beginning":
args = append(args, "--no-tail")
default:
return nil, fmt.Errorf("invalid value '%s' for parameter 'start_at'", c.StartAt)
}

for _, unit := range c.Units {
args = append(args, "--unit", unit)
}

for _, identifier := range c.Identifiers {
args = append(args, "--identifier", identifier)
}

args = append(args, "--priority", c.Priority)

if len(c.Grep) > 0 {
args = append(args, "--grep", c.Grep)
}

if c.Dmesg {
args = append(args, "--dmesg")
}

switch {
case c.Directory != nil:
args = append(args, "--directory", *c.Directory)
case len(c.Files) > 0:
for _, file := range c.Files {
args = append(args, "--file", file)
}
}

if len(c.Matches) > 0 {
matches, err := c.buildMatchesConfig()
if err != nil {
return nil, err
}
args = append(args, matches...)
}

if c.All {
args = append(args, "--all")
}

return args, nil
}

func buildMatchConfig(mc MatchConfig) ([]string, error) {
re := regexp.MustCompile("^[_A-Z]+$")

// Sort keys to be consistent with every run and to be predictable for tests
sortedKeys := make([]string, 0, len(mc))
for key := range mc {
if !re.MatchString(key) {
return []string{}, fmt.Errorf("'%s' is not a valid Systemd field name", key)
}
sortedKeys = append(sortedKeys, key)
}
sort.Strings(sortedKeys)

configs := []string{}
for _, key := range sortedKeys {
configs = append(configs, fmt.Sprintf("%s=%s", key, mc[key]))
}

return configs, nil
}

func (c Config) buildMatchesConfig() ([]string, error) {
matches := []string{}

for i, mc := range c.Matches {
if i > 0 {
matches = append(matches, "+")
}
mcs, err := buildMatchConfig(mc)
if err != nil {
return []string{}, err
}

matches = append(matches, mcs...)
}

return matches, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"fmt"
"io"
"os/exec"
"regexp"
"sort"
"strconv"
"strings"
"sync"
Expand All @@ -27,138 +25,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)

const waitDuration = 1 * time.Second

func init() {
operator.Register(operatorType, func() operator.Builder { return NewConfig() })
}

// Build will build a journald input operator from the supplied configuration
func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
inputOperator, err := c.InputConfig.Build(logger)
if err != nil {
return nil, err
}

args, err := c.buildArgs()
if err != nil {
return nil, err
}

return &Input{
InputOperator: inputOperator,
newCmd: func(ctx context.Context, cursor []byte) cmd {
if cursor != nil {
args = append(args, "--after-cursor", string(cursor))
}
return exec.CommandContext(ctx, "journalctl", args...) // #nosec - ...
// journalctl is an executable that is required for this operator to function
},
json: jsoniter.ConfigFastest,
}, nil
}

func (c Config) buildArgs() ([]string, error) {
args := make([]string, 0, 10)

// Export logs in UTC time
args = append(args, "--utc")

// Export logs as JSON
args = append(args, "--output=json")

// Continue watching logs until cancelled
args = append(args, "--follow")

switch c.StartAt {
case "end":
case "beginning":
args = append(args, "--no-tail")
default:
return nil, fmt.Errorf("invalid value '%s' for parameter 'start_at'", c.StartAt)
}

for _, unit := range c.Units {
args = append(args, "--unit", unit)
}

for _, identifier := range c.Identifiers {
args = append(args, "--identifier", identifier)
}

args = append(args, "--priority", c.Priority)

if len(c.Grep) > 0 {
args = append(args, "--grep", c.Grep)
}

if c.Dmesg {
args = append(args, "--dmesg")
}

switch {
case c.Directory != nil:
args = append(args, "--directory", *c.Directory)
case len(c.Files) > 0:
for _, file := range c.Files {
args = append(args, "--file", file)
}
}

if len(c.Matches) > 0 {
matches, err := c.buildMatchesConfig()
if err != nil {
return nil, err
}
args = append(args, matches...)
}

if c.All {
args = append(args, "--all")
}

return args, nil
}

func buildMatchConfig(mc MatchConfig) ([]string, error) {
re := regexp.MustCompile("^[_A-Z]+$")

// Sort keys to be consistent with every run and to be predictable for tests
sortedKeys := make([]string, 0, len(mc))
for key := range mc {
if !re.MatchString(key) {
return []string{}, fmt.Errorf("'%s' is not a valid Systemd field name", key)
}
sortedKeys = append(sortedKeys, key)
}
sort.Strings(sortedKeys)

configs := []string{}
for _, key := range sortedKeys {
configs = append(configs, fmt.Sprintf("%s=%s", key, mc[key]))
}

return configs, nil
}

func (c Config) buildMatchesConfig() ([]string, error) {
matches := []string{}

for i, mc := range c.Matches {
if i > 0 {
matches = append(matches, "+")
}
mcs, err := buildMatchConfig(mc)
if err != nil {
return []string{}, err
}

matches = append(matches, mcs...)
}

return matches, nil
}

// Input is an operator that process logs using journald
type Input struct {
helper.InputOperator
Expand Down

0 comments on commit d3d9b70

Please sign in to comment.