Skip to content

Commit

Permalink
feat: Optional application/runtime/otlp input (#1542)
Browse files Browse the repository at this point in the history
  • Loading branch information
k15r authored Oct 24, 2024
1 parent 01bb55a commit 30051c9
Show file tree
Hide file tree
Showing 48 changed files with 980 additions and 340 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.env.overrides

# Finder .DS_Store file
.DS_Store

Expand Down
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
include .env
-include .env.overrides

# Environment Variables
IMG ?= $(ENV_IMG)
Expand Down Expand Up @@ -110,7 +111,9 @@ manifests-dev: $(CONTROLLER_GEN) ## Generate WebhookConfiguration, ClusterRole a
.PHONY: generate
generate: $(CONTROLLER_GEN) $(MOCKERY) $(STRINGER) ## Generate code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations.
$(MOCKERY)
$(STRINGER) --type OutputType internal/reconciler/logpipeline/reconciler.go
$(STRINGER) --type Mode apis/telemetry/v1alpha1/logpipeline_types.go
$(STRINGER) --type Mode apis/telemetry/v1beta1/logpipeline_types.go
$(STRINGER) --type FeatureFlag internal/featureflags/featureflags.go
$(CONTROLLER_GEN) object:headerFile="hack/boilerplate.go.txt" paths="./..."

.PHONY: fmt
Expand Down Expand Up @@ -147,7 +150,7 @@ build: generate fmt vet tidy ## Build manager binary.

check-clean: ## Check if repo is clean up-to-date. Used after code generation
@echo "Checking if all generated files are up-to-date"
@git diff --name-only --exit-code || (echo "Generated files are not up-to-date. Please run 'make generate manifests manifests-dev' to update them." && exit 1)
@git diff --name-only --exit-code || (echo "Generated files are not up-to-date. Please run 'make generate manifests manifests-dev crd-docs-gen' to update them." && exit 1)


tls.key:
Expand Down
109 changes: 89 additions & 20 deletions apis/telemetry/v1alpha1/logpipeline_conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,9 @@ func (lp *LogPipeline) ConvertTo(dstRaw conversion.Hub) error {

dst.ObjectMeta = src.ObjectMeta

srcAppInput := src.Spec.Input.Application
dst.Spec.Input = telemetryv1beta1.LogPipelineInput{
Runtime: telemetryv1beta1.LogPipelineRuntimeInput{
Enabled: srcAppInput.Enabled,
Namespaces: telemetryv1beta1.LogPipelineInputNamespaces(srcAppInput.Namespaces),
Containers: telemetryv1beta1.LogPipelineInputContainers(srcAppInput.Containers),
KeepAnnotations: srcAppInput.KeepAnnotations,
DropLabels: srcAppInput.DropLabels,
KeepOriginalBody: srcAppInput.KeepOriginalBody,
},
}
dst.Spec.Input = telemetryv1beta1.LogPipelineInput{}
dst.Spec.Input.Runtime = v1Alpha1ApplicationToV1Beta1(src.Spec.Input.Application)
dst.Spec.Input.OTLP = v1Alpha1OTLPInputToV1Beta1(src.Spec.Input.OTLP)

for _, f := range src.Spec.Files {
dst.Spec.Files = append(dst.Spec.Files, telemetryv1beta1.LogPipelineFileMount(f))
Expand Down Expand Up @@ -76,6 +68,48 @@ func (lp *LogPipeline) ConvertTo(dstRaw conversion.Hub) error {
return nil
}

func v1Alpha1OTLPInputToV1Beta1(otlp *OTLPInput) *telemetryv1beta1.OTLPInput {
if otlp == nil {
return nil
}

input := &telemetryv1beta1.OTLPInput{
Disabled: otlp.Disabled,
}
if otlp.Namespaces != nil {
input.Namespaces = &telemetryv1beta1.NamespaceSelector{
Include: otlp.Namespaces.Include,
Exclude: otlp.Namespaces.Exclude,
}
}

return input
}

func v1Alpha1ApplicationToV1Beta1(application *ApplicationInput) *telemetryv1beta1.LogPipelineRuntimeInput {
if application == nil {
return nil
}

runtime := &telemetryv1beta1.LogPipelineRuntimeInput{
Enabled: application.Enabled,
Namespaces: telemetryv1beta1.LogPipelineInputNamespaces{
Include: application.Namespaces.Include,
Exclude: application.Namespaces.Exclude,
System: application.Namespaces.System,
},
Containers: telemetryv1beta1.LogPipelineInputContainers{
Include: application.Containers.Include,
Exclude: application.Containers.Exclude,
},
KeepAnnotations: application.KeepAnnotations,
DropLabels: application.DropLabels,
KeepOriginalBody: application.KeepOriginalBody,
}

return runtime
}

func v1Alpha1OtlpTLSToV1Beta1(tls *OtlpTLS) *telemetryv1beta1.OutputTLS {
if tls == nil {
return nil
Expand Down Expand Up @@ -191,15 +225,8 @@ func (lp *LogPipeline) ConvertFrom(srcRaw conversion.Hub) error {

dst.ObjectMeta = src.ObjectMeta

srcRuntimeInput := src.Spec.Input.Runtime
dst.Spec.Input.Application = ApplicationInput{
Enabled: srcRuntimeInput.Enabled,
Namespaces: InputNamespaces(srcRuntimeInput.Namespaces),
Containers: InputContainers(srcRuntimeInput.Containers),
KeepAnnotations: srcRuntimeInput.KeepAnnotations,
DropLabels: srcRuntimeInput.DropLabels,
KeepOriginalBody: srcRuntimeInput.KeepOriginalBody,
}
dst.Spec.Input.Application = v1Beta1RuntimeToV1Alpha1(src.Spec.Input.Runtime)
dst.Spec.Input.OTLP = v1Beta1OTLPInputToV1Alpha1(src.Spec.Input.OTLP)

for _, f := range src.Spec.Files {
dst.Spec.Files = append(dst.Spec.Files, FileMount(f))
Expand Down Expand Up @@ -243,6 +270,48 @@ func (lp *LogPipeline) ConvertFrom(srcRaw conversion.Hub) error {
return nil
}

func v1Beta1RuntimeToV1Alpha1(runtime *telemetryv1beta1.LogPipelineRuntimeInput) *ApplicationInput {
if runtime == nil {
return nil
}

application := &ApplicationInput{
Enabled: runtime.Enabled,
Namespaces: InputNamespaces{
Include: runtime.Namespaces.Include,
Exclude: runtime.Namespaces.Exclude,
System: runtime.Namespaces.System,
},
Containers: InputContainers{
Include: runtime.Containers.Include,
Exclude: runtime.Containers.Exclude,
},
KeepAnnotations: runtime.KeepAnnotations,
DropLabels: runtime.DropLabels,
KeepOriginalBody: runtime.KeepOriginalBody,
}

return application
}

func v1Beta1OTLPInputToV1Alpha1(otlp *telemetryv1beta1.OTLPInput) *OTLPInput {
if otlp == nil {
return nil
}

input := &OTLPInput{
Disabled: otlp.Disabled,
}
if otlp.Namespaces != nil {
input.Namespaces = &NamespaceSelector{
Include: otlp.Namespaces.Include,
Exclude: otlp.Namespaces.Exclude,
}
}

return input
}

func v1Beta1OtlpTLSToV1Alpha1(tls *telemetryv1beta1.OutputTLS) *OtlpTLS {
if tls == nil {
return nil
Expand Down
24 changes: 22 additions & 2 deletions apis/telemetry/v1alpha1/logpipeline_conversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestConvertTo(t *testing.T) {
},
Spec: LogPipelineSpec{
Input: Input{
Application: ApplicationInput{
Application: &ApplicationInput{
Enabled: ptr.To(true),
Namespaces: InputNamespaces{
Include: []string{"default", "kube-system"},
Expand All @@ -33,6 +33,13 @@ func TestConvertTo(t *testing.T) {
DropLabels: true,
KeepOriginalBody: ptr.To(true),
},
OTLP: &OTLPInput{
Disabled: true,
Namespaces: &NamespaceSelector{
Include: []string{"include", "include2"},
Exclude: []string{"exclude", "exclude2"},
},
},
},
Files: []FileMount{
{Name: "file1", Content: "file1-content"},
Expand Down Expand Up @@ -157,7 +164,7 @@ func TestConvertFrom(t *testing.T) {
},
Spec: telemetryv1beta1.LogPipelineSpec{
Input: telemetryv1beta1.LogPipelineInput{
Runtime: telemetryv1beta1.LogPipelineRuntimeInput{
Runtime: &telemetryv1beta1.LogPipelineRuntimeInput{
Enabled: ptr.To(true),
Namespaces: telemetryv1beta1.LogPipelineInputNamespaces{
Include: []string{"default", "kube-system"},
Expand All @@ -172,6 +179,13 @@ func TestConvertFrom(t *testing.T) {
DropLabels: true,
KeepOriginalBody: ptr.To(true),
},
OTLP: &telemetryv1beta1.OTLPInput{
Disabled: true,
Namespaces: &telemetryv1beta1.NamespaceSelector{
Include: []string{"include", "include2"},
Exclude: []string{"exclude", "exclude2"},
},
},
},
Files: []telemetryv1beta1.LogPipelineFileMount{
{Name: "file1", Content: "file1-content"},
Expand Down Expand Up @@ -293,6 +307,12 @@ func requireLogPipelinesEquivalent(t *testing.T, x *LogPipeline, y *telemetryv1b
require.Equal(t, xAppInput.DropLabels, yRuntimeInput.DropLabels, "drop labels mismatch")
require.Equal(t, xAppInput.KeepOriginalBody, yRuntimeInput.KeepOriginalBody, "keep original body mismatch")

xOTLPInput := x.Spec.Input.OTLP
yOTLPInput := y.Spec.Input.OTLP
require.Equal(t, xOTLPInput.Disabled, yOTLPInput.Disabled, "OTLP input disabled mismatch")
require.Equal(t, xOTLPInput.Namespaces.Include, yOTLPInput.Namespaces.Include, "OTLP included namespaces mismatch")
require.Equal(t, xOTLPInput.Namespaces.Exclude, yOTLPInput.Namespaces.Exclude, "OTLP excluded namespaces mismatch")

require.Len(t, y.Spec.Files, 1, "expected one file")
require.Equal(t, x.Spec.Files[0].Name, y.Spec.Files[0].Name, "file name mismatch")

Expand Down
23 changes: 22 additions & 1 deletion apis/telemetry/v1alpha1/logpipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,19 @@ package v1alpha1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/kyma-project/telemetry-manager/internal/featureflags"
)

type Mode int

const (
OTel Mode = iota
FluentBit
)

// LogPipelineSpec defines the desired state of LogPipeline
// +kubebuilder:validation:XValidation:rule="!((has(self.output.http) || has(self.output.custom)) && has(self.input.otlp))", message="otlp input is only supported with otlp output"
type LogPipelineSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
Expand All @@ -38,7 +48,10 @@ type LogPipelineSpec struct {
// Input describes a log input for a LogPipeline.
type Input struct {
// Configures in more detail from which containers application logs are enabled as input.
Application ApplicationInput `json:"application,omitempty"`
Application *ApplicationInput `json:"application,omitempty"`

// Configures an endpoint to receive logs from a OTLP source.
OTLP *OTLPInput `json:"otlp,omitempty"`
}

// ApplicationInput specifies the default type of Input that handles application logs from runtime containers. It configures in more detail from which containers logs are selected as input.
Expand Down Expand Up @@ -147,6 +160,10 @@ func (o *Output) IsHTTPDefined() bool {
return o.HTTP != nil && o.HTTP.Host.IsDefined()
}

func (o *Output) IsOTLPDefined() bool {
return o.Otlp != nil
}

func (o *Output) IsAnyDefined() bool {
return o.pluginCount() > 0
}
Expand All @@ -165,6 +182,10 @@ func (o *Output) pluginCount() int {
plugins++
}

if featureflags.IsEnabled(featureflags.LogPipelineOTLP) && o.IsOTLPDefined() {
plugins++
}

return plugins
}

Expand Down
35 changes: 33 additions & 2 deletions apis/telemetry/v1alpha1/logpipeline_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ var (
ErrInvalidPipelineDefinition = errors.New("invalid log pipeline definition")
)

func (lp *LogPipeline) PipelineMode() Mode {
if lp.Spec.Output.Otlp != nil {
return OTel
}

return FluentBit
}

func (lp *LogPipeline) Validate() error {
if err := lp.validateOutput(); err != nil {
return err
Expand Down Expand Up @@ -117,6 +125,7 @@ func secretRefAndValueIsPresent(v ValueType) bool {
}

func (lp *LogPipeline) validateFilters() error {
// TODO[k15r]: validate Filters in OTLP mode
for _, filterPlugin := range lp.Spec.Filters {
if err := validateCustomFilter(filterPlugin.Custom); err != nil {
return err
Expand Down Expand Up @@ -161,12 +170,34 @@ func (lp *LogPipeline) validateInput() error {
return nil
}

var containers = input.Application.Containers
switch lp.PipelineMode() {
case OTel:
return lp.validateApplication()
case FluentBit:
if lp.Spec.Input.OTLP != nil {
return fmt.Errorf("%w: cannot use OTLP input for pipeline in FluentBit mode", ErrInvalidPipelineDefinition)
}

return lp.validateApplication()
}

return nil
}

func (lp *LogPipeline) validateApplication() error {
application := lp.Spec.Input.Application
if application == nil {
return nil
}

containers := application.Containers

if len(containers.Include) > 0 && len(containers.Exclude) > 0 {
return fmt.Errorf("%w: Cannot define both 'input.application.containers.include' and 'input.application.containers.exclude'", ErrInvalidPipelineDefinition)
}

var namespaces = input.Application.Namespaces
namespaces := application.Namespaces

if (len(namespaces.Include) > 0 && len(namespaces.Exclude) > 0) ||
(len(namespaces.Include) > 0 && namespaces.System) ||
(len(namespaces.Exclude) > 0 && namespaces.System) {
Expand Down
Loading

0 comments on commit 30051c9

Please sign in to comment.