Skip to content

Commit

Permalink
Revert "fix: Flaky self-monitoring conditions (#1735)"
Browse files Browse the repository at this point in the history
This reverts commit 4ac0a2b.
  • Loading branch information
TeodorSAP committed Jan 14, 2025
1 parent b185a2a commit dc8fbc0
Show file tree
Hide file tree
Showing 22 changed files with 469 additions and 348 deletions.
18 changes: 9 additions & 9 deletions internal/selfmonitor/config/config_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,25 +128,25 @@ func makeScrapeConfig(scrapeNamespace string) []ScrapeConfig {

func scrapableMetricsRegex() string {
fluentBitMetrics := []string{
fluentBitOutputProcBytesTotal,
fluentBitOutputDroppedRecordsTotal,
fluentBitInputBytesTotal,
fluentBitBufferUsageBytes,
metricFluentBitOutputProcBytesTotal,
metricFluentBitOutputDroppedRecordsTotal,
metricFluentBitInputBytesTotal,
metricFluentBitBufferUsageBytes,
}

otelCollectorMetrics := []string{
otelExporterSent,
otelExporterSendFailed,
otelExporterEnqueueFailed,
otelReceiverRefused,
metricOtelCollectorExporterSent,
metricOtelCollectorExporterSendFailed,
metricOtelCollectorExporterEnqueueFailed,
metricOtelCollectorReceiverRefused,
}

for i := range otelCollectorMetrics {
otelCollectorMetrics[i] += "_.*"
}

// exporter_queue_size and exporter_queue_capacity do not have a suffix
otelCollectorMetrics = append(otelCollectorMetrics, otelExporterQueueSize, otelExporterQueueCapacity)
otelCollectorMetrics = append(otelCollectorMetrics, metricOtelCollectorExporterQueueSize, metricOtelCollectorExporterQueueCapacity)

return strings.Join(append(fluentBitMetrics,
otelCollectorMetrics...), "|")
Expand Down
10 changes: 5 additions & 5 deletions internal/selfmonitor/config/config_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ func TestMakeConfigMarshalling(t *testing.T) {
ConfigPath: "/dummy-configpath/",
AlertRuleFileName: "dymma-alerts.yml",
})
configYaml, err := yaml.Marshal(config)
monitorConfigYaml, err := yaml.Marshal(config)
require.NoError(t, err)

goldenFilePath := filepath.Join("testdata", "config.yaml")
goldenFile, err := os.ReadFile(goldenFilePath)
require.NoError(t, err, "failed to load golden file")
require.Equal(t, string(goldenFile), string(configYaml))
goldenMonitoringConfigPath := filepath.Join("testdata", "config.yaml")
goldenMonitoringFile, err := os.ReadFile(goldenMonitoringConfigPath)
require.NoError(t, err, "failed to load golden monitoring file")
require.Equal(t, string(goldenMonitoringFile), string(monitorConfigYaml))
}
8 changes: 0 additions & 8 deletions internal/selfmonitor/config/expr_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,6 @@ func and(exprs ...string) string {
return strings.Join(wrapInParentheses(exprs), " and ")
}

func or(exprs ...string) string {
return strings.Join(wrapInParentheses(exprs), " or ")
}

func unless(exprs ...string) string {
return strings.Join(wrapInParentheses(exprs), " unless ")
}

func wrapInParentheses(input []string) []string {
wrapped := make([]string, len(input))
for i, str := range input {
Expand Down
106 changes: 48 additions & 58 deletions internal/selfmonitor/config/fluent_bit_rule_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ const (
fluentBitMetricsServiceName = "telemetry-fluent-bit-metrics"
fluentBitSidecarMetricsServiceName = "telemetry-fluent-bit-exporter-metrics"

// Fluent Bit metrics
fluentBitOutputProcBytesTotal = "fluentbit_output_proc_bytes_total"
fluentBitInputBytesTotal = "fluentbit_input_bytes_total"
fluentBitOutputDroppedRecordsTotal = "fluentbit_output_dropped_records_total"
fluentBitBufferUsageBytes = "telemetry_fsbuffer_usage_bytes"
metricFluentBitOutputProcBytesTotal = "fluentbit_output_proc_bytes_total"
metricFluentBitInputBytesTotal = "fluentbit_input_bytes_total"
metricFluentBitOutputDroppedRecordsTotal = "fluentbit_output_dropped_records_total"
metricFluentBitBufferUsageBytes = "telemetry_fsbuffer_usage_bytes"

bufferUsage300MB = 300000000
bufferUsage900MB = 900000000
Expand All @@ -26,79 +25,70 @@ type fluentBitRuleBuilder struct {

func (rb fluentBitRuleBuilder) rules() []Rule {
return []Rule{
rb.makeRule(RuleNameLogAgentAllDataDropped, rb.allDataDroppedExpr()),
rb.makeRule(RuleNameLogAgentSomeDataDropped, rb.someDataDroppedExpr()),
rb.makeRule(RuleNameLogAgentBufferInUse, rb.bufferInUseExpr()),
rb.makeRule(RuleNameLogAgentBufferFull, rb.bufferFullExpr()),
rb.makeRule(RuleNameLogAgentNoLogsDelivered, rb.noLogsDeliveredExpr()),
rb.exporterSentRule(),
rb.exporterDroppedRule(),
rb.bufferInUseRule(),
rb.bufferFullRule(),
rb.noLogsDeliveredRule(),
}
}

// Checks if all data is dropped due to a full buffer or exporter issues, with nothing successfully sent.
func (rb fluentBitRuleBuilder) allDataDroppedExpr() string {
return unless(
or(rb.bufferFullExpr(), rb.exporterDroppedExpr()),
rb.exporterSentExpr(),
)
}

// Checks if some data is dropped while some is still successfully sent.
func (rb fluentBitRuleBuilder) someDataDroppedExpr() string {
return and(
or(rb.bufferFullExpr(), rb.exporterDroppedExpr()),
rb.exporterSentExpr(),
)
}

// Checks if the exporter drop rate is greater than 0.
func (rb fluentBitRuleBuilder) exporterDroppedExpr() string {
return rate(fluentBitOutputDroppedRecordsTotal, selectService(fluentBitMetricsServiceName)).
sumBy(labelPipelineName).
greaterThan(0).
build()
func (rb fluentBitRuleBuilder) exporterSentRule() Rule {
return Rule{
Alert: rb.namePrefix() + RuleNameLogAgentExporterSentLogs,
Expr: rate(metricFluentBitOutputProcBytesTotal, selectService(fluentBitMetricsServiceName)).
sumBy(labelPipelineName).
greaterThan(0).
build(),
}
}

// Check if the exporter send rate is greater than 0.
func (rb fluentBitRuleBuilder) exporterSentExpr() string {
return rate(fluentBitOutputProcBytesTotal, selectService(fluentBitMetricsServiceName)).
sumBy(labelPipelineName).
greaterThan(0).
build()
func (rb fluentBitRuleBuilder) exporterDroppedRule() Rule {
return Rule{
Alert: rb.namePrefix() + RuleNameLogAgentExporterDroppedLogs,
Expr: rate(metricFluentBitOutputDroppedRecordsTotal, selectService(fluentBitMetricsServiceName)).
sumBy(labelPipelineName).
greaterThan(0).
build(),
}
}

// Check if the buffer usage is significant.
func (rb fluentBitRuleBuilder) bufferInUseExpr() string {
return instant(fluentBitBufferUsageBytes, selectService(fluentBitSidecarMetricsServiceName)).
greaterThan(bufferUsage300MB).
build()
func (rb fluentBitRuleBuilder) bufferInUseRule() Rule {
return Rule{
Alert: rb.namePrefix() + RuleNameLogAgentBufferInUse,
Expr: instant(metricFluentBitBufferUsageBytes, selectService(fluentBitSidecarMetricsServiceName)).
greaterThan(bufferUsage300MB).
build(),
}
}

// Check if the buffer usage is approaching the limit (1GB).
func (rb fluentBitRuleBuilder) bufferFullExpr() string {
return instant(fluentBitBufferUsageBytes, selectService(fluentBitSidecarMetricsServiceName)).
greaterThan(bufferUsage900MB).
build()
func (rb fluentBitRuleBuilder) bufferFullRule() Rule {
return Rule{
Alert: rb.namePrefix() + RuleNameLogAgentBufferFull,
Expr: instant(metricFluentBitBufferUsageBytes, selectService(fluentBitSidecarMetricsServiceName)).
greaterThan(bufferUsage900MB).
build(),
}
}

// Checks if logs are read but not sent by the exporter.
func (rb fluentBitRuleBuilder) noLogsDeliveredExpr() string {
receiverReadExpr := rate(fluentBitInputBytesTotal, selectService(fluentBitMetricsServiceName)).
func (rb fluentBitRuleBuilder) noLogsDeliveredRule() Rule {
receiverReadExpr := rate(metricFluentBitInputBytesTotal, selectService(fluentBitMetricsServiceName)).
sumBy(labelPipelineName).
greaterThan(0).
build()

exporterNotSentExpr := rate(fluentBitOutputProcBytesTotal, selectService(fluentBitMetricsServiceName)).
exporterNotSentExpr := rate(metricFluentBitOutputProcBytesTotal, selectService(fluentBitMetricsServiceName)).
sumBy(labelPipelineName).
equal(0).
build()

return and(receiverReadExpr, exporterNotSentExpr)
}

func (rb fluentBitRuleBuilder) makeRule(baseName, expr string) Rule {
return Rule{
Alert: ruleNamePrefix(typeLogPipeline) + baseName,
Expr: expr,
Alert: rb.namePrefix() + RuleNameLogAgentNoLogsDelivered,
Expr: and(receiverReadExpr, exporterNotSentExpr),
For: alertWaitTime,
}
}

func (rb fluentBitRuleBuilder) namePrefix() string {
return ruleNamePrefix(typeLogPipeline)
}
133 changes: 55 additions & 78 deletions internal/selfmonitor/config/otelcol_rule_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,12 @@ import (
)

const (
// OTel Collector metrics

// following metrics should be used with data type suffixes (metric points, spans, etc.)
otelExporterSent = "otelcol_exporter_sent"
otelExporterSendFailed = "otelcol_exporter_send_failed"
otelExporterEnqueueFailed = "otelcol_exporter_enqueue_failed"
otelReceiverRefused = "otelcol_receiver_refused"

// queue size/capacacity metrics do not have data type suffixes unlike other metrics
otelExporterQueueSize = "otelcol_exporter_queue_size"
otelExporterQueueCapacity = "otelcol_exporter_queue_capacity"

thresholdQueueAlmostFull = 0.8
metricOtelCollectorExporterSent = "otelcol_exporter_sent"
metricOtelCollectorExporterSendFailed = "otelcol_exporter_send_failed"
metricOtelCollectorExporterQueueSize = "otelcol_exporter_queue_size"
metricOtelCollectorExporterQueueCapacity = "otelcol_exporter_queue_capacity"
metricOtelCollectorExporterEnqueueFailed = "otelcol_exporter_enqueue_failed"
metricOtelCollectorReceiverRefused = "otelcol_receiver_refused"
)

type otelCollectorRuleBuilder struct {
Expand All @@ -28,88 +21,72 @@ type otelCollectorRuleBuilder struct {

func (rb otelCollectorRuleBuilder) rules() []Rule {
return []Rule{
rb.makeRule(RuleNameGatewayAllDataDropped, rb.allDataDroppedExpr()),
rb.makeRule(RuleNameGatewaySomeDataDropped, rb.someDataDroppedExpr()),
rb.makeRule(RuleNameGatewayQueueAlmostFull, rb.queueAlmostFullExpr()),
rb.makeRule(RuleNameGatewayThrottling, rb.throttlingExpr()),
rb.exporterSentRule(),
rb.exporterDroppedRule(),
rb.exporterQueueAlmostFullRule(),
rb.exporterEnqueueFailedRule(),
rb.receiverRefusedRule(),
}
}

// Checks if all data is dropped due to a full buffer or exporter issues, with nothing successfully sent.
func (rb otelCollectorRuleBuilder) allDataDroppedExpr() string {
return unless(
or(rb.exporterEnqueueFailedExpr(), rb.exporterDroppedExpr()),
rb.exporterSentExpr(),
)
}

// Checks if some data is dropped while some is still successfully sent.
func (rb otelCollectorRuleBuilder) someDataDroppedExpr() string {
return and(
or(rb.exporterEnqueueFailedExpr(), rb.exporterDroppedExpr()),
rb.exporterSentExpr(),
)
}

// Check if the exporter drop rate is greater than 0.
func (rb otelCollectorRuleBuilder) exporterSentExpr() string {
metricName := rb.appendDataType(otelExporterSent)

return rate(metricName, selectService(rb.serviceName)).
sumBy(labelPipelineName).
greaterThan(0).
build()
func (rb otelCollectorRuleBuilder) formatMetricName(baseMetricName string) string {
return fmt.Sprintf("%s_%s", baseMetricName, rb.dataType)
}

// Check if the exporter send rate is greater than 0.
func (rb otelCollectorRuleBuilder) exporterDroppedExpr() string {
metricName := rb.appendDataType(otelExporterSendFailed)
func (rb otelCollectorRuleBuilder) exporterSentRule() Rule {
metric := rb.formatMetricName(metricOtelCollectorExporterSent)

return rate(metricName, selectService(rb.serviceName)).
sumBy(labelPipelineName).
greaterThan(0).
build()
return Rule{
Alert: rb.namePrefix + RuleNameGatewayExporterSentData,
Expr: rate(metric, selectService(rb.serviceName)).
sumBy(labelPipelineName).
greaterThan(0).
build(),
}
}

// Check if the exporter enqueue failure rate is greater than 0.
func (rb otelCollectorRuleBuilder) exporterEnqueueFailedExpr() string {
metricName := rb.appendDataType(otelExporterEnqueueFailed)
func (rb otelCollectorRuleBuilder) exporterDroppedRule() Rule {
metric := rb.formatMetricName(metricOtelCollectorExporterSendFailed)

return rate(metricName, selectService(rb.serviceName)).
sumBy(labelPipelineName).
greaterThan(0).
build()
return Rule{
Alert: rb.namePrefix + RuleNameGatewayExporterDroppedData,
Expr: rate(metric, selectService(rb.serviceName)).
sumBy(labelPipelineName).
greaterThan(0).
build(),
}
}

// Check if the queue is almost full.
func (rb otelCollectorRuleBuilder) queueAlmostFullExpr() string {
numMetric := otelExporterQueueSize
denumMetric := otelExporterQueueCapacity

return div(numMetric, denumMetric, ignoringLabelsMatch("data_type"), selectService(rb.serviceName)).
maxBy(labelPipelineName).
greaterThan(thresholdQueueAlmostFull).
build()
func (rb otelCollectorRuleBuilder) exporterQueueAlmostFullRule() Rule {
return Rule{
Alert: rb.namePrefix + RuleNameGatewayExporterQueueAlmostFull,
Expr: div(metricOtelCollectorExporterQueueSize, metricOtelCollectorExporterQueueCapacity, ignoringLabelsMatch("data_type"), selectService(rb.serviceName)).
maxBy(labelPipelineName).
greaterThan(0.8). //nolint:mnd // alert on 80% full
build(),
}
}

// Check if the receiver data refusal rate is greater than 0.
func (rb otelCollectorRuleBuilder) throttlingExpr() string {
metricName := rb.appendDataType(otelReceiverRefused)
func (rb otelCollectorRuleBuilder) exporterEnqueueFailedRule() Rule {
metric := rb.formatMetricName(metricOtelCollectorExporterEnqueueFailed)

return rate(metricName, selectService(rb.serviceName)).
sumBy(labelReceiver).
greaterThan(0).
build()
return Rule{
Alert: rb.namePrefix + RuleNameGatewayExporterEnqueueFailed,
Expr: rate(metric, selectService(rb.serviceName)).
sumBy(labelPipelineName).
greaterThan(0).
build(),
}
}

func (rb otelCollectorRuleBuilder) appendDataType(baseMetricName string) string {
return fmt.Sprintf("%s_%s", baseMetricName, rb.dataType)
}
func (rb otelCollectorRuleBuilder) receiverRefusedRule() Rule {
metric := rb.formatMetricName(metricOtelCollectorReceiverRefused)

func (rb otelCollectorRuleBuilder) makeRule(baseName, expr string) Rule {
return Rule{
Alert: rb.namePrefix + baseName,
Expr: expr,
For: alertWaitTime,
Alert: rb.namePrefix + RuleNameGatewayReceiverRefusedData,
Expr: rate(metric, selectService(rb.serviceName)).
sumBy(labelReceiver).
greaterThan(0).
build(),
}
}
20 changes: 11 additions & 9 deletions internal/selfmonitor/config/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,19 @@ import (

const (
// OTEL Collector rule names. Note that the actual full names will be prefixed with Metric or Trace
RuleNameGatewayAllDataDropped = "GatewayAllDataDropped"
RuleNameGatewaySomeDataDropped = "GatewaySomeDataDropped"
RuleNameGatewayQueueAlmostFull = "GatewayQueueAlmostFull"
RuleNameGatewayThrottling = "GatewayThrottling"
RuleNameGatewayExporterSentData = "GatewayExporterSentData"
RuleNameGatewayExporterDroppedData = "GatewayExporterDroppedData"
RuleNameGatewayExporterQueueAlmostFull = "GatewayExporterQueueAlmostFull"
RuleNameGatewayExporterEnqueueFailed = "GatewayExporterEnqueueFailed"
RuleNameGatewayReceiverRefusedData = "GatewayReceiverRefusedData"

// Fluent Bit rule names. Note that the actual full names will be prefixed with Log
RuleNameLogAgentAllDataDropped = "AgentAllDataDropped"
RuleNameLogAgentSomeDataDropped = "AgentSomeDataDropped"
RuleNameLogAgentBufferInUse = "AgentBufferInUse"
RuleNameLogAgentBufferFull = "AgentBufferFull"
RuleNameLogAgentNoLogsDelivered = "AgentNoLogsDelivered"
RuleNameLogAgentExporterSentLogs = "AgentExporterSentLogs"
RuleNameLogAgentReceiverReadLogs = "AgentReceiverReadLogs"
RuleNameLogAgentExporterDroppedLogs = "AgentExporterDroppedLogs"
RuleNameLogAgentBufferInUse = "AgentBufferInUse"
RuleNameLogAgentBufferFull = "AgentBufferFull"
RuleNameLogAgentNoLogsDelivered = "AgentNoLogsDelivered"

// Common rule labels
labelService = "service"
Expand Down
Loading

0 comments on commit dc8fbc0

Please sign in to comment.