Skip to content

Commit

Permalink
Merge pull request #692 from Icinga/ha-logging-i688
Browse files Browse the repository at this point in the history
Enhance HA "Taking over", "Handing over" logging
  • Loading branch information
julianbrost authored Apr 4, 2024
2 parents 2826af4 + 779afd1 commit 80abf2b
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 57 deletions.
8 changes: 4 additions & 4 deletions cmd/icingadb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ func run() int {
hactx, cancelHactx := context.WithCancel(ctx)
for hactx.Err() == nil {
select {
case <-ha.Takeover():
logger.Info("Taking over")
case takeoverReason := <-ha.Takeover():
logger.Infow("Taking over", zap.String("reason", takeoverReason))

go func() {
for hactx.Err() == nil {
Expand Down Expand Up @@ -324,8 +324,8 @@ func run() int {
}
}
}()
case <-ha.Handover():
logger.Warn("Handing over")
case handoverReason := <-ha.Handover():
logger.Warnw("Handing over", zap.String("reason", handoverReason))

cancelHactx()
case <-hactx.Done():
Expand Down
131 changes: 83 additions & 48 deletions pkg/icingadb/ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import (
"time"
)

var timeout = 60 * time.Second
// peerTimeout defines the timeout for HA heartbeats, being used to detect absent nodes.
//
// Because this timeout relies on icingaredis.Timeout, it is icingaredis.Timeout plus a short grace period.
const peerTimeout = icingaredis.Timeout + 5*time.Second

type haState struct {
responsibleTsMilli int64
Expand All @@ -43,8 +46,8 @@ type HA struct {
heartbeat *icingaredis.Heartbeat
logger *logging.Logger
responsible bool
handover chan struct{}
takeover chan struct{}
handover chan string
takeover chan string
done chan struct{}
errOnce sync.Once
errMu sync.Mutex
Expand All @@ -64,8 +67,8 @@ func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger
db: db,
heartbeat: heartbeat,
logger: logger,
handover: make(chan struct{}),
takeover: make(chan struct{}),
handover: make(chan string),
takeover: make(chan string),
done: make(chan struct{}),
}

Expand Down Expand Up @@ -107,13 +110,13 @@ func (h *HA) Err() error {
return h.err
}

// Handover returns a channel with which handovers are signaled.
func (h *HA) Handover() chan struct{} {
// Handover returns a channel with which handovers and their reasons are signaled.
func (h *HA) Handover() chan string {
return h.handover
}

// Takeover returns a channel with which takeovers are signaled.
func (h *HA) Takeover() chan struct{} {
// Takeover returns a channel with which takeovers and their reasons are signaled.
func (h *HA) Takeover() chan string {
return h.takeover
}

Expand Down Expand Up @@ -141,9 +144,10 @@ func (h *HA) controller() {

oldInstancesRemoved := false

logTicker := time.NewTicker(time.Second * 60)
defer logTicker.Stop()
shouldLog := true
// Suppress recurring log messages in the realize method to be only logged this often.
routineLogTicker := time.NewTicker(5 * time.Minute)
defer routineLogTicker.Stop()
shouldLogRoutineEvents := true

for {
select {
Expand All @@ -158,9 +162,9 @@ func (h *HA) controller() {
if tt.After(now.Add(1 * time.Second)) {
h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt))
}
if tt.Before(now.Add(-1 * timeout)) {
if tt.Before(now.Add(-1 * peerTimeout)) {
h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt))
h.signalHandover()
h.signalHandover("received heartbeat from the past")
h.realizeLostHeartbeat()
continue
}
Expand Down Expand Up @@ -192,8 +196,8 @@ func (h *HA) controller() {
}

select {
case <-logTicker.C:
shouldLog = true
case <-routineLogTicker.C:
shouldLogRoutineEvents = true
default:
}

Expand All @@ -204,10 +208,10 @@ func (h *HA) controller() {
} else {
realizeCtx, cancelRealizeCtx = context.WithCancel(h.ctx)
}
err = h.realize(realizeCtx, s, t, envId, shouldLog)
err = h.realize(realizeCtx, s, t, envId, shouldLogRoutineEvents)
cancelRealizeCtx()
if errors.Is(err, context.DeadlineExceeded) {
h.signalHandover()
h.signalHandover("context deadline exceeded")
continue
}
if err != nil {
Expand All @@ -219,10 +223,10 @@ func (h *HA) controller() {
oldInstancesRemoved = true
}

shouldLog = false
shouldLogRoutineEvents = false
} else {
h.logger.Error("Lost heartbeat")
h.signalHandover()
h.signalHandover("lost heartbeat")
h.realizeLostHeartbeat()
}
case <-h.heartbeat.Done():
Expand All @@ -235,13 +239,25 @@ func (h *HA) controller() {
}
}

func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *types.UnixMilli, envId types.Binary, shouldLog bool) error {
var takeover, otherResponsible bool
// realize a HA cycle triggered by a heartbeat event.
//
// shouldLogRoutineEvents indicates if recurrent events should be logged.
func (h *HA) realize(
ctx context.Context,
s *icingaredisv1.IcingaStatus,
t *types.UnixMilli,
envId types.Binary,
shouldLogRoutineEvents bool,
) error {
var (
takeover string
otherResponsible bool
)

err := retry.WithBackoff(
ctx,
func(ctx context.Context) error {
takeover = false
takeover = ""
otherResponsible = false
isoLvl := sql.LevelSerializable
selectLock := ""
Expand All @@ -259,25 +275,41 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
}

query := h.db.Rebind("SELECT id, heartbeat FROM icingadb_instance "+
"WHERE environment_id = ? AND responsible = ? AND id <> ? AND heartbeat > ?") + selectLock
"WHERE environment_id = ? AND responsible = ? AND id <> ?") + selectLock

instance := &v1.IcingadbInstance{}
errQuery := tx.QueryRowxContext(ctx, query, envId, "y", h.instanceId).StructScan(instance)

switch {
case errQuery == nil:
fields := []any{
zap.String("instance_id", instance.Id.String()),
zap.String("environment", envId.String()),
zap.Time("heartbeat", instance.Heartbeat.Time()),
zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time())),
}

errQuery := tx.QueryRowxContext(
ctx, query, envId, "y", h.instanceId, time.Now().Add(-1*timeout).UnixMilli(),
).StructScan(instance)
switch errQuery {
case nil:
otherResponsible = true
if shouldLog {
h.logger.Infow("Another instance is active",
zap.String("instance_id", instance.Id.String()),
zap.String("environment", envId.String()),
"heartbeat", instance.Heartbeat,
zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time())))
if instance.Heartbeat.Time().Before(time.Now().Add(-1 * peerTimeout)) {
takeover = "other instance's heartbeat has expired"
h.logger.Debugw("Preparing to take over HA as other instance's heartbeat has expired", fields...)
} else {
otherResponsible = true
if shouldLogRoutineEvents {
h.logger.Infow("Another instance is active", fields...)
}
}
case sql.ErrNoRows:
takeover = true

case errors.Is(errQuery, sql.ErrNoRows):
fields := []any{
zap.String("instance_id", h.instanceId.String()),
zap.String("environment", envId.String())}
if !h.responsible {
takeover = "no other instance is active"
h.logger.Debugw("Preparing to take over HA as no instance is active", fields...)
} else if h.responsible && shouldLogRoutineEvents {
h.logger.Debugw("Continuing being the active instance", fields...)
}

default:
return internal.CantPerformQuery(errQuery, query)
}
Expand All @@ -292,7 +324,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
EnvironmentId: envId,
},
Heartbeat: *t,
Responsible: types.Bool{Bool: takeover || h.responsible, Valid: true},
Responsible: types.Bool{Bool: takeover != "" || h.responsible, Valid: true},
EndpointId: s.EndpointId,
Icinga2Version: s.Version,
Icinga2StartTime: s.ProgramStart,
Expand All @@ -309,7 +341,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
return internal.CantPerformQuery(err, stmt)
}

if takeover {
if takeover != "" {
stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE environment_id = ? AND id <> ?")
_, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId)

Expand Down Expand Up @@ -343,14 +375,14 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
return err
}

if takeover {
if takeover != "" {
// Insert the environment after each heartbeat takeover if it does not already exist in the database
// as the environment may have changed, although this is likely to happen very rarely.
if err := h.insertEnvironment(); err != nil {
return errors.Wrap(err, "can't insert environment")
}

h.signalTakeover()
h.signalTakeover(takeover)
} else if otherResponsible {
if state, _ := h.state.Load(); !state.otherResponsible {
state.otherResponsible = true
Expand All @@ -361,6 +393,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
return nil
}

// realizeLostHeartbeat updates "responsible = n" for this HA into the database.
func (h *HA) realizeLostHeartbeat() {
stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE id = ?")
if _, err := h.db.ExecContext(h.ctx, stmt, "n", h.instanceId); err != nil && !utils.IsContextCanceled(err) {
Expand Down Expand Up @@ -394,10 +427,10 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus, envId types.Binar
select {
case <-h.ctx.Done():
return
case <-time.After(timeout):
case <-time.After(peerTimeout):
query := h.db.Rebind("DELETE FROM icingadb_instance " +
"WHERE id <> ? AND environment_id = ? AND endpoint_id = ? AND heartbeat < ?")
heartbeat := types.UnixMilli(time.Now().Add(-timeout))
heartbeat := types.UnixMilli(time.Now().Add(-1 * peerTimeout))
result, err := h.db.ExecContext(h.ctx, query, h.instanceId, envId,
s.EndpointId, heartbeat)
if err != nil {
Expand All @@ -416,7 +449,8 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus, envId types.Binar
}
}

func (h *HA) signalHandover() {
// signalHandover gives up HA.responsible and notifies the HA.Handover chan.
func (h *HA) signalHandover(reason string) {
if h.responsible {
h.state.Store(haState{
responsibleTsMilli: time.Now().UnixMilli(),
Expand All @@ -425,15 +459,16 @@ func (h *HA) signalHandover() {
})

select {
case h.handover <- struct{}{}:
case h.handover <- reason:
h.responsible = false
case <-h.ctx.Done():
// Noop
}
}
}

func (h *HA) signalTakeover() {
// signalTakeover claims HA.responsible and notifies the HA.Takeover chan.
func (h *HA) signalTakeover(reason string) {
if !h.responsible {
h.state.Store(haState{
responsibleTsMilli: time.Now().UnixMilli(),
Expand All @@ -442,7 +477,7 @@ func (h *HA) signalTakeover() {
})

select {
case h.takeover <- struct{}{}:
case h.takeover <- reason:
h.responsible = true
case <-h.ctx.Done():
// Noop
Expand Down
10 changes: 5 additions & 5 deletions pkg/icingaredis/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
"time"
)

// timeout defines how long a heartbeat may be absent if a heartbeat has already been received.
// Timeout defines how long a heartbeat may be absent if a heartbeat has already been received.
// After this time, a heartbeat loss is propagated.
var timeout = 60 * time.Second
const Timeout = time.Minute

// Heartbeat periodically reads heartbeats from a Redis stream and signals in Beat channels when they are received.
// Also signals on if the heartbeat is Lost.
Expand Down Expand Up @@ -141,9 +141,9 @@ func (h *Heartbeat) controller(ctx context.Context) {

atomic.StoreInt64(&h.lastReceivedMs, m.received.UnixMilli())
h.sendEvent(m)
case <-time.After(timeout):
case <-time.After(Timeout):
if h.active {
h.logger.Warnw("Lost Icinga heartbeat", zap.Duration("timeout", timeout))
h.logger.Warnw("Lost Icinga heartbeat", zap.Duration("timeout", Timeout))
h.sendEvent(nil)
h.active = false
} else {
Expand Down Expand Up @@ -217,5 +217,5 @@ func (m *HeartbeatMessage) EnvironmentID() (types.Binary, error) {

// ExpiryTime returns the timestamp when the heartbeat expires.
func (m *HeartbeatMessage) ExpiryTime() time.Time {
return m.received.Add(timeout)
return m.received.Add(Timeout)
}

0 comments on commit 80abf2b

Please sign in to comment.