Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Whitelist metrics scraped by self-monitor #982

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions internal/reconciler/telemetry/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/kyma-project/telemetry-manager/internal/k8sutils"
"github.com/kyma-project/telemetry-manager/internal/overrides"
"github.com/kyma-project/telemetry-manager/internal/resources/selfmonitor"
"github.com/kyma-project/telemetry-manager/internal/selfmonitor/alertrules"
"github.com/kyma-project/telemetry-manager/internal/selfmonitor/config"
"github.com/kyma-project/telemetry-manager/internal/webhookcert"
)
Expand Down Expand Up @@ -158,7 +157,7 @@ func (r *Reconciler) reconcileSelfMonitor(ctx context.Context, telemetry operato
return fmt.Errorf("failed to marshal selfmonitor config: %w", err)
}

rules := alertrules.MakeRules()
rules := config.MakeRules()
rulesYAML, err := yaml.Marshal(rules)
if err != nil {
return fmt.Errorf("failed to marshal rules: %w", err)
Expand Down
27 changes: 26 additions & 1 deletion internal/selfmonitor/config/config_builder.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"strings"
"time"
)

Expand Down Expand Up @@ -93,7 +94,7 @@ func makeScrapeConfig(scrapeNamespace string) []ScrapeConfig {
{
SourceLabels: []string{"__name__"},
Action: Keep,
Regex: "(otelcol_.*|fluentbit_.*|telemetry_.*)",
Regex: srapableMetricsRegex(),
},
// The following relabel configs add an artificial pipeline_name label to the Fluent Bit and OTel Collector metrics to simplify pipeline matching
// For Fluent Bit metrics, the pipeline_name is based on the name label. Note that a regex group matching Kubernetes resource names (alphanumerical chars and hyphens) is used to extract the pipeline name.
Expand All @@ -119,3 +120,27 @@ func makeScrapeConfig(scrapeNamespace string) []ScrapeConfig {
},
}
}

func srapableMetricsRegex() string {
fluentBitMetrics := []string{
metricFluentBitOutputProcBytesTotal,
metricFluentBitOutputDroppedRecordsTotal,
metricFluentBitInputBytesTotal,
metricFluentBitBufferUsageBytes,
}

otelCollectorMetrics := []string{
metricOtelCollectorExporterSent,
metricOtelCollectorExporterSendFailed,
metricOtelCollectorExporterQueueSize,
metricOtelCollectorExporterQueueCapacity,
metricOtelCollectorExporterEnqueueFailed,
metricOtelCollectorReceiverRefused,
}

for i := range otelCollectorMetrics {
otelCollectorMetrics[i] += "_.*"
}

return strings.Join(append(fluentBitMetrics, otelCollectorMetrics...), "|")
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package alertrules
package config

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
package alertrules
package config

import (
"fmt"
)

const (
fluentBitMetricsServiceName = "telemetry-fluent-bit-metrics"

metricFluentBitOutputProcBytesTotal = "fluentbit_output_proc_bytes_total"
metricFluentBitInputBytesTotal = "fluentbit_input_bytes_total"
metricFluentBitOutputDroppedRecordsTotal = "fluentbit_output_dropped_records_total"
metricFluentBitBufferUsageBytes = "telemetry_fsbuffer_usage_bytes"
)

type fluentBitRuleBuilder struct {
Expand All @@ -20,7 +29,7 @@ func (rb fluentBitRuleBuilder) rules() []Rule {
func (rb fluentBitRuleBuilder) exporterSentRule() Rule {
return Rule{
Alert: rb.namePrefix() + RuleNameLogAgentExporterSentLogs,
Expr: rate("fluentbit_output_proc_bytes_total", selectService(fluentBitMetricsServiceName)).
Expr: rate(metricFluentBitOutputProcBytesTotal, selectService(fluentBitMetricsServiceName)).
sumBy(labelPipelineName).
greaterThan(0).
build(),
Expand All @@ -30,7 +39,7 @@ func (rb fluentBitRuleBuilder) exporterSentRule() Rule {
func (rb fluentBitRuleBuilder) receiverReadRule() Rule {
return Rule{
Alert: rb.namePrefix() + RuleNameLogAgentReceiverReadLogs,
Expr: rate("fluentbit_input_bytes_total", selectService(fluentBitMetricsServiceName)).
Expr: rate(metricFluentBitInputBytesTotal, selectService(fluentBitMetricsServiceName)).
sumBy(labelPipelineName).
greaterThan(0).
build(),
Expand All @@ -40,7 +49,7 @@ func (rb fluentBitRuleBuilder) receiverReadRule() Rule {
func (rb fluentBitRuleBuilder) exporterDroppedRule() Rule {
return Rule{
Alert: rb.namePrefix() + RuleNameLogAgentExporterDroppedLogs,
Expr: rate("fluentbit_output_dropped_records_total", selectService(fluentBitMetricsServiceName)).
Expr: rate(metricFluentBitOutputDroppedRecordsTotal, selectService(fluentBitMetricsServiceName)).
sumBy(labelPipelineName).
greaterThan(0).
build(),
Expand All @@ -50,14 +59,14 @@ func (rb fluentBitRuleBuilder) exporterDroppedRule() Rule {
func (rb fluentBitRuleBuilder) bufferInUseRule() Rule {
return Rule{
Alert: rb.namePrefix() + RuleNameLogAgentBufferInUse,
Expr: "telemetry_fsbuffer_usage_bytes > 300000000",
Expr: fmt.Sprintf("%s > 300000000", metricFluentBitBufferUsageBytes),
}
}

func (rb fluentBitRuleBuilder) bufferFullRule() Rule {
return Rule{
Alert: rb.namePrefix() + RuleNameLogAgentBufferFull,
Expr: "telemetry_fsbuffer_usage_bytes > 900000000",
Expr: fmt.Sprintf("%s > 900000000", metricFluentBitBufferUsageBytes),
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
package alertrules
package config

import (
"fmt"
)

const (
metricOtelCollectorExporterSent = "otelcol_exporter_sent"
metricOtelCollectorExporterSendFailed = "otelcol_exporter_send_failed"
metricOtelCollectorExporterQueueSize = "otelcol_exporter_queue_size"
metricOtelCollectorExporterQueueCapacity = "otelcol_exporter_queue_capacity"
metricOtelCollectorExporterEnqueueFailed = "otelcol_exporter_enqueue_failed"
metricOtelCollectorReceiverRefused = "otelcol_receiver_refused"
)

type otelCollectorRuleBuilder struct {
serviceName string
dataType string
Expand All @@ -20,8 +29,12 @@ func (rb otelCollectorRuleBuilder) rules() []Rule {
}
}

func (rb otelCollectorRuleBuilder) formatMetricName(baseMetricName string) string {
return fmt.Sprintf("%s_%s", baseMetricName, rb.dataType)
}

func (rb otelCollectorRuleBuilder) exporterSentRule() Rule {
metric := fmt.Sprintf("otelcol_exporter_sent_%s", rb.dataType)
metric := rb.formatMetricName(metricOtelCollectorExporterSent)
return Rule{
Alert: rb.namePrefix + RuleNameGatewayExporterSentData,
Expr: rate(metric, selectService(rb.serviceName)).
Expand All @@ -32,7 +45,7 @@ func (rb otelCollectorRuleBuilder) exporterSentRule() Rule {
}

func (rb otelCollectorRuleBuilder) exporterDroppedRule() Rule {
metric := fmt.Sprintf("otelcol_exporter_send_failed_%s", rb.dataType)
metric := rb.formatMetricName(metricOtelCollectorExporterSendFailed)
return Rule{
Alert: rb.namePrefix + RuleNameGatewayExporterDroppedData,
Expr: rate(metric, selectService(rb.serviceName)).
Expand All @@ -45,15 +58,15 @@ func (rb otelCollectorRuleBuilder) exporterDroppedRule() Rule {
func (rb otelCollectorRuleBuilder) exporterQueueAlmostFullRule() Rule {
return Rule{
Alert: rb.namePrefix + RuleNameGatewayExporterQueueAlmostFull,
Expr: div("otelcol_exporter_queue_size", "otelcol_exporter_queue_capacity", selectService(rb.serviceName)).
Expr: div(metricOtelCollectorExporterQueueSize, metricOtelCollectorExporterQueueCapacity, selectService(rb.serviceName)).
maxBy(labelPipelineName).
greaterThan(0.8).
build(),
}
}

func (rb otelCollectorRuleBuilder) exporterEnqueueFailedRule() Rule {
metric := fmt.Sprintf("otelcol_exporter_enqueue_failed_%s", rb.dataType)
metric := rb.formatMetricName(metricOtelCollectorExporterEnqueueFailed)
return Rule{
Alert: rb.namePrefix + RuleNameGatewayExporterEnqueueFailed,
Expr: rate(metric, selectService(rb.serviceName)).
Expand All @@ -64,7 +77,7 @@ func (rb otelCollectorRuleBuilder) exporterEnqueueFailedRule() Rule {
}

func (rb otelCollectorRuleBuilder) receiverRefusedRule() Rule {
metric := fmt.Sprintf("otelcol_receiver_refused_%s", rb.dataType)
metric := rb.formatMetricName(metricOtelCollectorReceiverRefused)
return Rule{
Alert: rb.namePrefix + RuleNameGatewayReceiverRefusedData,
Expr: rate(metric, selectService(rb.serviceName)).
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package alertrules
package config

import (
"strings"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package alertrules
package config

import (
"testing"
Expand Down
2 changes: 1 addition & 1 deletion internal/selfmonitor/config/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ scrape_configs:
action: replace
metric_relabel_configs:
- source_labels: [__name__]
regex: (otelcol_.*|fluentbit_.*|telemetry_.*)
regex: fluentbit_output_proc_bytes_total|fluentbit_output_dropped_records_total|fluentbit_input_bytes_total|telemetry_fsbuffer_usage_bytes|otelcol_exporter_sent_.*|otelcol_exporter_send_failed_.*|otelcol_exporter_queue_size_.*|otelcol_exporter_queue_capacity_.*|otelcol_exporter_enqueue_failed_.*|otelcol_receiver_refused_.*
action: keep
- source_labels: [__name__, name]
regex: fluentbit_.+;([a-zA-Z0-9-]+)
Expand Down
32 changes: 16 additions & 16 deletions internal/selfmonitor/prober/log_pipeline_prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/kyma-project/telemetry-manager/internal/selfmonitor/alertrules"
"github.com/kyma-project/telemetry-manager/internal/selfmonitor/config"
)

type LogPipelineProber struct {
Expand Down Expand Up @@ -50,41 +50,41 @@ func (p *LogPipelineProber) Probe(ctx context.Context, pipelineName string) (Log
}

func (p *LogPipelineProber) allDataDropped(alerts []promv1.Alert, pipelineName string) bool {
exporterSentLogs := p.evaluateRule(alerts, alertrules.RuleNameLogAgentExporterSentLogs, pipelineName)
exporterDroppedLogs := p.evaluateRule(alerts, alertrules.RuleNameLogAgentExporterDroppedLogs, pipelineName)
bufferFull := p.evaluateRule(alerts, alertrules.RuleNameLogAgentBufferFull, pipelineName)
exporterSentLogs := p.evaluateRule(alerts, config.RuleNameLogAgentExporterSentLogs, pipelineName)
exporterDroppedLogs := p.evaluateRule(alerts, config.RuleNameLogAgentExporterDroppedLogs, pipelineName)
bufferFull := p.evaluateRule(alerts, config.RuleNameLogAgentBufferFull, pipelineName)
return !exporterSentLogs && (exporterDroppedLogs || bufferFull)
}

func (p *LogPipelineProber) someDataDropped(alerts []promv1.Alert, pipelineName string) bool {
exporterSentLogs := p.evaluateRule(alerts, alertrules.RuleNameLogAgentExporterSentLogs, pipelineName)
exporterDroppedLogs := p.evaluateRule(alerts, alertrules.RuleNameLogAgentExporterDroppedLogs, pipelineName)
bufferFull := p.evaluateRule(alerts, alertrules.RuleNameLogAgentBufferFull, pipelineName)
exporterSentLogs := p.evaluateRule(alerts, config.RuleNameLogAgentExporterSentLogs, pipelineName)
exporterDroppedLogs := p.evaluateRule(alerts, config.RuleNameLogAgentExporterDroppedLogs, pipelineName)
bufferFull := p.evaluateRule(alerts, config.RuleNameLogAgentBufferFull, pipelineName)
return exporterSentLogs && (exporterDroppedLogs || bufferFull)
}

func (p *LogPipelineProber) noLogsDelivered(alerts []promv1.Alert, pipelineName string) bool {
receiverReadLogs := p.evaluateRule(alerts, alertrules.RuleNameLogAgentReceiverReadLogs, pipelineName)
exporterSentLogs := p.evaluateRule(alerts, alertrules.RuleNameLogAgentExporterSentLogs, pipelineName)
receiverReadLogs := p.evaluateRule(alerts, config.RuleNameLogAgentReceiverReadLogs, pipelineName)
exporterSentLogs := p.evaluateRule(alerts, config.RuleNameLogAgentExporterSentLogs, pipelineName)
return receiverReadLogs && !exporterSentLogs
}

func (p *LogPipelineProber) bufferFillingUp(alerts []promv1.Alert, pipelineName string) bool {
return p.evaluateRule(alerts, alertrules.RuleNameLogAgentBufferInUse, pipelineName)
return p.evaluateRule(alerts, config.RuleNameLogAgentBufferInUse, pipelineName)
}

func (p *LogPipelineProber) healthy(alerts []promv1.Alert, pipelineName string) bool {
// The pipeline is healthy if none of the following conditions are met:
bufferInUse := p.evaluateRule(alerts, alertrules.RuleNameLogAgentBufferInUse, pipelineName)
bufferFull := p.evaluateRule(alerts, alertrules.RuleNameLogAgentBufferFull, pipelineName)
exporterDroppedLogs := p.evaluateRule(alerts, alertrules.RuleNameLogAgentExporterDroppedLogs, pipelineName)
bufferInUse := p.evaluateRule(alerts, config.RuleNameLogAgentBufferInUse, pipelineName)
bufferFull := p.evaluateRule(alerts, config.RuleNameLogAgentBufferFull, pipelineName)
exporterDroppedLogs := p.evaluateRule(alerts, config.RuleNameLogAgentExporterDroppedLogs, pipelineName)

// The pipeline is healthy if either no logs are being read or all logs are being sent
receiverReadLogs := p.evaluateRule(alerts, alertrules.RuleNameLogAgentReceiverReadLogs, pipelineName)
exporterSentLogs := p.evaluateRule(alerts, alertrules.RuleNameLogAgentExporterSentLogs, pipelineName)
receiverReadLogs := p.evaluateRule(alerts, config.RuleNameLogAgentReceiverReadLogs, pipelineName)
exporterSentLogs := p.evaluateRule(alerts, config.RuleNameLogAgentExporterSentLogs, pipelineName)
return !(bufferInUse || bufferFull || exporterDroppedLogs) && (!receiverReadLogs || exporterSentLogs)
}

func (p *LogPipelineProber) evaluateRule(alerts []promv1.Alert, alertName, pipelineName string) bool {
return evaluateRuleWithMatcher(alerts, alertName, pipelineName, alertrules.MatchesLogPipelineRule)
return evaluateRuleWithMatcher(alerts, alertName, pipelineName, config.MatchesLogPipelineRule)
}
30 changes: 15 additions & 15 deletions internal/selfmonitor/prober/otel_pipeline_prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/kyma-project/telemetry-manager/internal/selfmonitor/alertrules"
"github.com/kyma-project/telemetry-manager/internal/selfmonitor/config"
)

// OTelPipelineProber is a prober for OTel Collector pipelines
Expand All @@ -24,11 +24,11 @@ type OTelPipelineProbeResult struct {
}

func NewMetricPipelineProber(selfMonitorName types.NamespacedName) (*OTelPipelineProber, error) {
return newOTelPipelineProber(selfMonitorName, alertrules.MatchesMetricPipelineRule)
return newOTelPipelineProber(selfMonitorName, config.MatchesMetricPipelineRule)
}

func NewTracePipelineProber(selfMonitorName types.NamespacedName) (*OTelPipelineProber, error) {
return newOTelPipelineProber(selfMonitorName, alertrules.MatchesTracePipelineRule)
return newOTelPipelineProber(selfMonitorName, config.MatchesTracePipelineRule)
}

func newOTelPipelineProber(selfMonitorName types.NamespacedName, matcher matcherFunc) (*OTelPipelineProber, error) {
Expand Down Expand Up @@ -61,34 +61,34 @@ func (p *OTelPipelineProber) Probe(ctx context.Context, pipelineName string) (OT
}

func (p *OTelPipelineProber) allDataDropped(alerts []promv1.Alert, pipelineName string) bool {
exporterSentFiring := p.evaluateRule(alerts, alertrules.RuleNameGatewayExporterSentData, pipelineName)
exporterDroppedFiring := p.evaluateRule(alerts, alertrules.RuleNameGatewayExporterDroppedData, pipelineName)
exporterEnqueueFailedFiring := p.evaluateRule(alerts, alertrules.RuleNameGatewayExporterEnqueueFailed, pipelineName)
exporterSentFiring := p.evaluateRule(alerts, config.RuleNameGatewayExporterSentData, pipelineName)
exporterDroppedFiring := p.evaluateRule(alerts, config.RuleNameGatewayExporterDroppedData, pipelineName)
exporterEnqueueFailedFiring := p.evaluateRule(alerts, config.RuleNameGatewayExporterEnqueueFailed, pipelineName)

return !exporterSentFiring && (exporterDroppedFiring || exporterEnqueueFailedFiring)
}

func (p *OTelPipelineProber) someDataDropped(alerts []promv1.Alert, pipelineName string) bool {
exporterSentFiring := p.evaluateRule(alerts, alertrules.RuleNameGatewayExporterSentData, pipelineName)
exporterDroppedFiring := p.evaluateRule(alerts, alertrules.RuleNameGatewayExporterDroppedData, pipelineName)
exporterEnqueueFailedFiring := p.evaluateRule(alerts, alertrules.RuleNameGatewayExporterEnqueueFailed, pipelineName)
exporterSentFiring := p.evaluateRule(alerts, config.RuleNameGatewayExporterSentData, pipelineName)
exporterDroppedFiring := p.evaluateRule(alerts, config.RuleNameGatewayExporterDroppedData, pipelineName)
exporterEnqueueFailedFiring := p.evaluateRule(alerts, config.RuleNameGatewayExporterEnqueueFailed, pipelineName)

return exporterSentFiring && (exporterDroppedFiring || exporterEnqueueFailedFiring)
}

func (p *OTelPipelineProber) queueAlmostFull(alerts []promv1.Alert, pipelineName string) bool {
return p.evaluateRule(alerts, alertrules.RuleNameGatewayExporterQueueAlmostFull, pipelineName)
return p.evaluateRule(alerts, config.RuleNameGatewayExporterQueueAlmostFull, pipelineName)
}

func (p *OTelPipelineProber) throttling(alerts []promv1.Alert, pipelineName string) bool {
return p.evaluateRule(alerts, alertrules.RuleNameGatewayReceiverRefusedData, pipelineName)
return p.evaluateRule(alerts, config.RuleNameGatewayReceiverRefusedData, pipelineName)
}

func (p *OTelPipelineProber) healthy(alerts []promv1.Alert, pipelineName string) bool {
return !(p.evaluateRule(alerts, alertrules.RuleNameGatewayExporterDroppedData, pipelineName) ||
p.evaluateRule(alerts, alertrules.RuleNameGatewayExporterQueueAlmostFull, pipelineName) ||
p.evaluateRule(alerts, alertrules.RuleNameGatewayExporterEnqueueFailed, pipelineName) ||
p.evaluateRule(alerts, alertrules.RuleNameGatewayReceiverRefusedData, pipelineName))
return !(p.evaluateRule(alerts, config.RuleNameGatewayExporterDroppedData, pipelineName) ||
p.evaluateRule(alerts, config.RuleNameGatewayExporterQueueAlmostFull, pipelineName) ||
p.evaluateRule(alerts, config.RuleNameGatewayExporterEnqueueFailed, pipelineName) ||
p.evaluateRule(alerts, config.RuleNameGatewayReceiverRefusedData, pipelineName))
}

func (p *OTelPipelineProber) evaluateRule(alerts []promv1.Alert, alertName, pipelineName string) bool {
Expand Down
8 changes: 4 additions & 4 deletions internal/selfmonitor/webhook/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"

telemetryv1alpha1 "github.com/kyma-project/telemetry-manager/apis/telemetry/v1alpha1"
"github.com/kyma-project/telemetry-manager/internal/selfmonitor/alertrules"
"github.com/kyma-project/telemetry-manager/internal/selfmonitor/config"
)

type Handler struct {
Expand Down Expand Up @@ -137,7 +137,7 @@ func (h *Handler) toMetricPipelineReconcileEvents(ctx context.Context, alerts []
for i := range metricPipelines.Items {
pipelineName := metricPipelines.Items[i].GetName()
for _, alert := range alerts {
if alertrules.MatchesMetricPipelineRule(alert.Labels, alertrules.RulesAny, pipelineName) {
if config.MatchesMetricPipelineRule(alert.Labels, config.RulesAny, pipelineName) {
events = append(events, event.GenericEvent{Object: &metricPipelines.Items[i]})
}
}
Expand All @@ -156,7 +156,7 @@ func (h *Handler) toTracePipelineReconcileEvents(ctx context.Context, alerts []A
for i := range tracePipelines.Items {
pipelineName := tracePipelines.Items[i].GetName()
for _, alert := range alerts {
if alertrules.MatchesTracePipelineRule(alert.Labels, alertrules.RulesAny, pipelineName) {
if config.MatchesTracePipelineRule(alert.Labels, config.RulesAny, pipelineName) {
events = append(events, event.GenericEvent{Object: &tracePipelines.Items[i]})
}
}
Expand All @@ -175,7 +175,7 @@ func (h *Handler) toLogPipelineReconcileEvents(ctx context.Context, alerts []Ale
for i := range logPipelines.Items {
pipelineName := logPipelines.Items[i].GetName()
for _, alert := range alerts {
if alertrules.MatchesLogPipelineRule(alert.Labels, alertrules.RulesAny, pipelineName) {
if config.MatchesLogPipelineRule(alert.Labels, config.RulesAny, pipelineName) {
events = append(events, event.GenericEvent{Object: &logPipelines.Items[i]})
}
}
Expand Down
Loading