Skip to content

Commit

Permalink
feat: LogPipeline flow health status based on Fluent Bit alerts (#963)
Browse files Browse the repository at this point in the history
  • Loading branch information
Stanislav Khalash authored Apr 12, 2024
1 parent a30db4e commit 0d0a49a
Show file tree
Hide file tree
Showing 30 changed files with 1,427 additions and 337 deletions.
20 changes: 13 additions & 7 deletions controllers/telemetry/logpipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"

telemetryv1alpha1 "github.com/kyma-project/telemetry-manager/apis/telemetry/v1alpha1"
"github.com/kyma-project/telemetry-manager/internal/predicate"
Expand All @@ -35,16 +37,15 @@ import (
// LogPipelineController reconciles a LogPipeline object
type LogPipelineController struct {
client.Client

reconciler *logpipeline.Reconciler
config logpipeline.Config
reconcileTriggerChan <-chan event.GenericEvent
reconciler *logpipeline.Reconciler
}

func NewLogPipelineController(client client.Client, reconciler *logpipeline.Reconciler, config logpipeline.Config) *LogPipelineController {
func NewLogPipelineController(client client.Client, reconcileTriggerChan <-chan event.GenericEvent, reconciler *logpipeline.Reconciler) *LogPipelineController {
return &LogPipelineController{
Client: client,
reconciler: reconciler,
config: config,
Client: client,
reconcileTriggerChan: reconcileTriggerChan,
reconciler: reconciler,
}
}

Expand All @@ -55,6 +56,11 @@ func (r *LogPipelineController) Reconcile(ctx context.Context, req ctrl.Request)
func (r *LogPipelineController) SetupWithManager(mgr ctrl.Manager) error {
b := ctrl.NewControllerManagedBy(mgr).For(&telemetryv1alpha1.LogPipeline{})

b.WatchesRawSource(
&source.Channel{Source: r.reconcileTriggerChan},
&handler.EnqueueRequestForObject{},
)

ownedResourceTypesToWatch := []client.Object{
&appsv1.DaemonSet{},
&corev1.ConfigMap{},
Expand Down
5 changes: 3 additions & 2 deletions controllers/telemetry/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,9 @@ var _ = BeforeSuite(func() {

logPipelineController := NewLogPipelineController(
client,
logpipeline.NewReconciler(client, testLogPipelineConfig, &k8sutils.DaemonSetProber{Client: client}, overridesHandler),
testLogPipelineConfig)
make(chan event.GenericEvent),
logpipeline.NewReconciler(client, testLogPipelineConfig, &k8sutils.DaemonSetProber{Client: client}, false, nil, overridesHandler))

err = logPipelineController.SetupWithManager(mgr)
Expect(err).ToNot(HaveOccurred())

Expand Down
6 changes: 6 additions & 0 deletions internal/conditions/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
ReasonSomeDataDropped = "SomeTelemetryDataDropped"
ReasonBufferFillingUp = "BufferFillingUp"
ReasonGatewayThrottling = "GatewayThrottling"
ReasonNoLogsDelivered = "NoLogsDelivered"
ReasonFlowHealthy = "Healthy"
ReasonTLSCertificateInvalid = "TLSCertificateInvalid"
ReasonTLSPrivateKeyInvalid = "TLSPrivateKeyInvalid"
Expand Down Expand Up @@ -93,6 +94,11 @@ var logPipelineMessages = map[string]string{
ReasonFluentBitDSReady: "Fluent Bit DaemonSet is ready",
ReasonUnsupportedLokiOutput: "grafana-loki output is not supported anymore. For integration with a custom Loki installation, use the `custom` output and follow https://kyma-project.io/#/telemetry-manager/user/integration/loki/README",
ReasonLogComponentsRunning: "All log components are running",
ReasonAllDataDropped: "All logs dropped: backend unreachable or rejecting",
ReasonSomeDataDropped: "Some logs dropped: backend unreachable or rejecting",
ReasonBufferFillingUp: "Buffer nearing capacity: incoming log rate exceeds export rate",
ReasonNoLogsDelivered: "No logs delivered to backend",
ReasonFlowHealthy: "Logs are flowing normally to backend",
ReasonTLSCertificateInvalid: "TLS certificate invalid: %s",
ReasonTLSPrivateKeyInvalid: "TLS private key invalid: %s",
ReasonTLSCertificateExpired: "TLS certificate expired on %s",
Expand Down
55 changes: 55 additions & 0 deletions internal/reconciler/logpipeline/mocks/flow_health_prober.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 26 additions & 10 deletions internal/reconciler/logpipeline/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
commonresources "github.com/kyma-project/telemetry-manager/internal/resources/common"
"github.com/kyma-project/telemetry-manager/internal/resources/fluentbit"
"github.com/kyma-project/telemetry-manager/internal/secretref"
"github.com/kyma-project/telemetry-manager/internal/selfmonitor/prober"
"github.com/kyma-project/telemetry-manager/internal/tls/cert"
)

Expand Down Expand Up @@ -72,23 +73,38 @@ type TLSCertValidator interface {
ValidateCertificate(certPEM []byte, keyPEM []byte) cert.TLSCertValidationResult
}

//go:generate mockery --name FlowHealthProber --filename flow_health_prober.go
type FlowHealthProber interface {
Probe(ctx context.Context, pipelineName string) (prober.LogPipelineProbeResult, error)
}

type Reconciler struct {
client.Client
config Config
prober DaemonSetProber
allLogPipelines prometheus.Gauge
unsupportedLogPipelines prometheus.Gauge
syncer syncer
overridesHandler *overrides.Handler
istioStatusChecker istiostatus.Checker
tlsCertValidator TLSCertValidator
config Config
prober DaemonSetProber
flowHealthProbingEnabled bool
flowHealthProber FlowHealthProber
allLogPipelines prometheus.Gauge
unsupportedLogPipelines prometheus.Gauge
syncer syncer
overridesHandler *overrides.Handler
istioStatusChecker istiostatus.Checker
tlsCertValidator TLSCertValidator
}

func NewReconciler(client client.Client, config Config, prober DaemonSetProber, overridesHandler *overrides.Handler) *Reconciler {
func NewReconciler(
client client.Client,
config Config,
agentProber DaemonSetProber,
flowHealthProbingEnabled bool,
flowHealthProber FlowHealthProber,
overridesHandler *overrides.Handler) *Reconciler {
var r Reconciler
r.Client = client
r.config = config
r.prober = prober
r.prober = agentProber
r.flowHealthProbingEnabled = flowHealthProbingEnabled
r.flowHealthProber = flowHealthProber
r.allLogPipelines = prometheus.NewGauge(prometheus.GaugeOpts{Name: "telemetry_all_logpipelines", Help: "Number of log pipelines."})
r.unsupportedLogPipelines = prometheus.NewGauge(prometheus.GaugeOpts{Name: "telemetry_unsupported_logpipelines", Help: "Number of log pipelines with custom filters or outputs."})
metrics.Registry.MustRegister(r.allLogPipelines, r.unsupportedLogPipelines)
Expand Down
53 changes: 53 additions & 0 deletions internal/reconciler/logpipeline/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
telemetryv1alpha1 "github.com/kyma-project/telemetry-manager/apis/telemetry/v1alpha1"
"github.com/kyma-project/telemetry-manager/internal/conditions"
"github.com/kyma-project/telemetry-manager/internal/secretref"
"github.com/kyma-project/telemetry-manager/internal/selfmonitor/prober"
)

const twoWeeks = time.Hour * 24 * 7 * 2
Expand Down Expand Up @@ -47,6 +48,11 @@ func (r *Reconciler) updateStatus(ctx context.Context, pipelineName string) erro

r.setAgentHealthyCondition(ctx, &pipeline)
r.setFluentBitConfigGeneratedCondition(ctx, &pipeline)

if r.flowHealthProbingEnabled {
r.setFlowHealthCondition(ctx, &pipeline)
}

r.setLegacyConditions(ctx, &pipeline)

if err := r.Status().Update(ctx, &pipeline); err != nil {
Expand Down Expand Up @@ -151,6 +157,53 @@ func (r *Reconciler) setFluentBitConfigGeneratedCondition(ctx context.Context, p
meta.SetStatusCondition(&pipeline.Status.Conditions, condition)
}

func (r *Reconciler) setFlowHealthCondition(ctx context.Context, pipeline *telemetryv1alpha1.LogPipeline) {
var reason string
var status metav1.ConditionStatus

probeResult, err := r.flowHealthProber.Probe(ctx, pipeline.Name)
if err == nil {
logf.FromContext(ctx).V(1).Info("Probed flow health", "result", probeResult)

reason = flowHealthReasonFor(probeResult)
if probeResult.Healthy {
status = metav1.ConditionTrue
} else {
status = metav1.ConditionFalse
}
} else {
logf.FromContext(ctx).Error(err, "Failed to probe flow health")

reason = conditions.ReasonFlowHealthy
status = metav1.ConditionUnknown
}

condition := metav1.Condition{
Type: conditions.TypeFlowHealthy,
Status: status,
Reason: reason,
Message: conditions.MessageForLogPipeline(reason),
ObservedGeneration: pipeline.Generation,
}

meta.SetStatusCondition(&pipeline.Status.Conditions, condition)
}

func flowHealthReasonFor(probeResult prober.LogPipelineProbeResult) string {
switch {
case probeResult.AllDataDropped:
return conditions.ReasonAllDataDropped
case probeResult.SomeDataDropped:
return conditions.ReasonSomeDataDropped
case probeResult.NoLogsDelivered:
return conditions.ReasonNoLogsDelivered
case probeResult.BufferFillingUp:
return conditions.ReasonBufferFillingUp
default:
return conditions.ReasonFlowHealthy
}
}

func (r *Reconciler) setLegacyConditions(ctx context.Context, pipeline *telemetryv1alpha1.LogPipeline) {
if pipeline.Spec.Output.IsLokiDefined() {
conditions.HandlePendingCondition(&pipeline.Status.Conditions, pipeline.Generation,
Expand Down
Loading

0 comments on commit 0d0a49a

Please sign in to comment.