Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use sync/atomic#Pointer instead of our own wrapper #812

Merged
merged 3 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/icingadb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func run() int {
// the heartbeat is not read while HA gets stuck when updating the instance table.
var heartbeat *icingaredis.Heartbeat
var ha *icingadb.HA
var telemetrySyncStats *atomic.Pointer[telemetry.SuccessfulSync]
{
rc, err := cmd.Redis(logs.GetChildLogger("redis"))
if err != nil {
Expand All @@ -116,7 +117,7 @@ func run() int {
ha = icingadb.NewHA(ctx, db, heartbeat, logs.GetChildLogger("high-availability"))

telemetryLogger := logs.GetChildLogger("telemetry")
telemetry.StartHeartbeat(ctx, rc, telemetryLogger, ha, heartbeat)
telemetrySyncStats = telemetry.StartHeartbeat(ctx, rc, telemetryLogger, ha, heartbeat)
telemetry.WriteStats(ctx, rc, telemetryLogger)
}
// Closing ha on exit ensures that this instance retracts its heartbeat
Expand Down Expand Up @@ -250,7 +251,7 @@ func run() int {
logger := logs.GetChildLogger("config-sync")

if synctx.Err() == nil {
telemetry.LastSuccessfulSync.Store(telemetry.SuccessfulSync{
telemetrySyncStats.Store(&telemetry.SuccessfulSync{
FinishMilli: syncEnd.UnixMilli(),
DurationMilli: elapsed.Milliseconds(),
})
Expand Down
22 changes: 14 additions & 8 deletions pkg/icingadb/ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"encoding/hex"
"github.com/google/uuid"
"github.com/icinga/icinga-go-library/backoff"
"github.com/icinga/icinga-go-library/com"
"github.com/icinga/icinga-go-library/database"
"github.com/icinga/icinga-go-library/logging"
"github.com/icinga/icinga-go-library/retry"
Expand All @@ -19,6 +18,7 @@ import (
"github.com/pkg/errors"
"go.uber.org/zap"
"sync"
"sync/atomic"
"time"
)

Expand All @@ -35,7 +35,7 @@ type haState struct {

// HA provides high availability and indicates whether a Takeover or Handover must be made.
type HA struct {
state com.Atomic[haState]
state atomic.Pointer[haState]
ctx context.Context
cancelCtx context.CancelFunc
instanceId types.Binary
Expand Down Expand Up @@ -71,6 +71,8 @@ func NewHA(ctx context.Context, db *database.DB, heartbeat *icingaredis.Heartbea
done: make(chan struct{}),
}

ha.state.Store(&haState{})

go ha.controller()

return ha
Expand Down Expand Up @@ -121,7 +123,8 @@ func (h *HA) Takeover() chan string {

// State returns the status quo.
func (h *HA) State() (responsibleTsMilli int64, responsible, otherResponsible bool) {
state, _ := h.state.Load()
state := h.state.Load()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been a zero value if uninitialized, now it's a nil ptr in this case. Now we have to make sure HA#State() don't get a nil ptr.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return state.responsibleTsMilli, state.responsible, state.otherResponsible
}

Expand Down Expand Up @@ -428,9 +431,12 @@ func (h *HA) realize(

h.signalTakeover(takeover)
} else if otherResponsible {
if state, _ := h.state.Load(); !state.otherResponsible {
state.otherResponsible = true
h.state.Store(state)
if state := h.state.Load(); !state.otherResponsible {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we're now loading by-ref, state.otherResponsible = true below affects all readers. But this shouldn't be too bad as that's just a bool and we're storing it anyway right away.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

// Dereference pointer to create a copy of the value it points to.
// Ensures that any modifications do not directly affect the original data unless explicitly stored back.
newState := *state
newState.otherResponsible = true
h.state.Store(&newState)
}
}

Expand Down Expand Up @@ -496,7 +502,7 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus, envId types.Binar
// signalHandover gives up HA.responsible and notifies the HA.Handover chan.
func (h *HA) signalHandover(reason string) {
if h.responsible {
h.state.Store(haState{
h.state.Store(&haState{
responsibleTsMilli: time.Now().UnixMilli(),
responsible: false,
otherResponsible: false,
Expand All @@ -514,7 +520,7 @@ func (h *HA) signalHandover(reason string) {
// signalTakeover claims HA.responsible and notifies the HA.Takeover chan.
func (h *HA) signalTakeover(reason string) {
if !h.responsible {
h.state.Store(haState{
h.state.Store(&haState{
Al2Klimov marked this conversation as resolved.
Show resolved Hide resolved
responsibleTsMilli: time.Now().UnixMilli(),
responsible: true,
otherResponsible: false,
Expand Down
18 changes: 10 additions & 8 deletions pkg/icingaredis/telemetry/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package telemetry
import (
"context"
"fmt"
"github.com/icinga/icinga-go-library/com"
"github.com/icinga/icinga-go-library/logging"
"github.com/icinga/icinga-go-library/periodic"
"github.com/icinga/icinga-go-library/redis"
Expand Down Expand Up @@ -80,16 +79,17 @@ func GetCurrentDbConnErr() (string, int64) {
// OngoingSyncStartMilli is to be updated by the main() function.
var OngoingSyncStartMilli int64

// LastSuccessfulSync is to be updated by the main() function.
var LastSuccessfulSync com.Atomic[SuccessfulSync]

var boolToStr = map[bool]string{false: "0", true: "1"}
var startTime = time.Now().UnixMilli()

// StartHeartbeat periodically writes heartbeats to Redis for being monitored by Icinga 2.
// It returns an atomic pointer to SuccessfulSync,
// which contains synchronisation statistics that the caller should update.
func StartHeartbeat(
ctx context.Context, client *redis.Client, logger *logging.Logger, ha ha, heartbeat *icingaredis.Heartbeat,
) {
) *atomic.Pointer[SuccessfulSync] {
var syncStats atomic.Pointer[SuccessfulSync]
syncStats.Store(&SuccessfulSync{})
goMetrics := NewGoMetrics()

const interval = time.Second
Expand All @@ -101,7 +101,7 @@ func StartHeartbeat(
heartbeat := heartbeat.LastReceived()
responsibleTsMilli, responsible, otherResponsible := ha.State()
ongoingSyncStart := atomic.LoadInt64(&OngoingSyncStartMilli)
sync, _ := LastSuccessfulSync.Load()
lastSync := syncStats.Load()
dbConnErr, dbConnErrSinceMilli := GetCurrentDbConnErr()
now := time.Now()

Expand All @@ -117,8 +117,8 @@ func StartHeartbeat(
"ha-responsible-ts": strconv.FormatInt(responsibleTsMilli, 10),
"ha-other-responsible": boolToStr[otherResponsible],
"sync-ongoing-since": strconv.FormatInt(ongoingSyncStart, 10),
"sync-success-finish": strconv.FormatInt(sync.FinishMilli, 10),
"sync-success-duration": strconv.FormatInt(sync.DurationMilli, 10),
"sync-success-finish": strconv.FormatInt(lastSync.FinishMilli, 10),
"sync-success-duration": strconv.FormatInt(lastSync.DurationMilli, 10),
}

ctx, cancel := context.WithDeadline(ctx, tick.Time.Add(interval))
Expand All @@ -145,6 +145,8 @@ func StartHeartbeat(
silenceUntil = time.Time{}
}
})

return &syncStats
}

type goMetrics struct {
Expand Down
Loading