Skip to content

Commit

Permalink
Upsert initial SLA lifecycle unconditionally & drop execution order f…
Browse files Browse the repository at this point in the history
…rom runtime updates
  • Loading branch information
yhabteab committed Aug 14, 2024
1 parent 43e7c38 commit 80a2088
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 229 deletions.
12 changes: 12 additions & 0 deletions cmd/icingadb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,18 @@ func run() int {
return nil
})

g.Go(func() error {
configInitSync.Wait()

if err := synctx.Err(); err != nil {
return err
}

logger.Info("Syncing Host and Service initial SLA lifecycle")

return icingadb.SyncCheckablesSlaLifecycle(synctx, db)
})

g.Go(func() error {
configInitSync.Wait()

Expand Down
243 changes: 130 additions & 113 deletions pkg/icingadb/runtime_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/icinga/icinga-go-library/redis"
"github.com/icinga/icinga-go-library/strcase"
"github.com/icinga/icinga-go-library/structify"
"github.com/icinga/icinga-go-library/types"
"github.com/icinga/icingadb/pkg/common"
"github.com/icinga/icingadb/pkg/contracts"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
Expand Down Expand Up @@ -73,7 +74,18 @@ func (r *RuntimeUpdates) Sync(

updateMessages := make(chan redis.XMessage, r.redis.Options.XReadCount)
upsertEntities := make(chan database.Entity, r.redis.Options.XReadCount)
deleteEntities := make(chan database.Entity, r.redis.Options.XReadCount)
deleteIds := make(chan interface{}, r.redis.Options.XReadCount)

upsertEntitiesMultiplexer := make(chan database.Entity, 1)
deleteIdsMultiplexer := make(chan any, 1)

var insertSlaEntities chan database.Entity
var updateSlaEntities chan database.Entity
switch s.Entity().(type) {
case *v1.Host, *v1.Service:
insertSlaEntities = make(chan database.Entity, r.redis.Options.XReadCount)
updateSlaEntities = make(chan database.Entity, r.redis.Options.XReadCount)
}

var upsertedFifo chan database.Entity
var deletedFifo chan interface{}
Expand All @@ -95,152 +107,157 @@ func (r *RuntimeUpdates) Sync(
r.logger.Debugf("Syncing runtime updates of %s", s.Name())

g.Go(structifyStream(
ctx, updateMessages, upsertEntities, upsertedFifo, deleteEntities, deletedFifo,
ctx, updateMessages, upsertEntitiesMultiplexer, upsertedFifo, deleteIdsMultiplexer, deletedFifo,
structify.MakeMapStructifier(
reflect.TypeOf(s.Entity()).Elem(),
"json",
contracts.SafeInit),
))

// upsertEntityFunc returns a closure that is used to upsert the regular Icinga DB entities.
// The returned func is used to directly start a separate goroutine that selects events
// sequentially (!allowParallel) from the given chan.
upsertEntityFunc := func(entities <-chan database.Entity) func() error {
return func() error {
var counter com.Counter
defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) {
if count := counter.Reset(); count > 0 {
r.logger.Infof("Upserted %d %s items", count, s.Name())
g.Go(func() error {
defer close(upsertEntities)
if insertSlaEntities != nil {
defer close(insertSlaEntities)
}

for {
select {
case <-ctx.Done():
return ctx.Err()
case entity, ok := <-upsertEntitiesMultiplexer:
if !ok {
return nil
}
}).Stop()

onSuccess := []database.OnSuccess[database.Entity]{
database.OnSuccessIncrement[database.Entity](&counter),
database.OnSuccessIncrement[database.Entity](stat),
select {
case upsertEntities <- entity:
case <-ctx.Done():
return ctx.Err()
}

if insertSlaEntities != nil {
select {
case insertSlaEntities <- entity:
case <-ctx.Done():
return ctx.Err()
}
}
}
if !allowParallel {
onSuccess = append(onSuccess, database.OnSuccessSendTo(upsertedFifo))
}
})

g.Go(func() error {
var counter com.Counter
defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) {
if count := counter.Reset(); count > 0 {
r.logger.Infof("Upserted %d %s items", count, s.Name())
}
}).Stop()

// Updates must be executed in order, ensure this by using a semaphore with maximum 1.
sem := semaphore.NewWeighted(1)
// Updates must be executed in order, ensure this by using a semaphore with maximum 1.
sem := semaphore.NewWeighted(1)

return r.db.NamedBulkExec(
ctx, upsertStmt, upsertCount, sem, entities, database.SplitOnDupId[database.Entity], onSuccess...,
)
onSuccess := []database.OnSuccess[database.Entity]{
database.OnSuccessIncrement[database.Entity](&counter), database.OnSuccessIncrement[database.Entity](stat),
}
if !allowParallel {
onSuccess = append(onSuccess, database.OnSuccessSendTo(upsertedFifo))
}
}

// deleteEntityFunc returns a closure that is used to delete the regular Icinga DB entities
// based on their ids. The returned func is used to directly start a separate goroutine that
// selects events sequentially (!allowParallel) from the given chan.
deleteEntityFunc := func(deleteIds <-chan any) func() error {
return func() error {
var counter com.Counter
defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) {
if count := counter.Reset(); count > 0 {
r.logger.Infof("Deleted %d %s items", count, s.Name())
}
}).Stop()
return r.db.NamedBulkExec(
ctx, upsertStmt, upsertCount, sem, upsertEntities, database.SplitOnDupId[database.Entity], onSuccess...,
)
})

onSuccess := []database.OnSuccess[any]{database.OnSuccessIncrement[any](&counter), database.OnSuccessIncrement[any](stat)}
if !allowParallel {
onSuccess = append(onSuccess, database.OnSuccessSendTo(deletedFifo))
g.Go(func() error {
var counter com.Counter
defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) {
if count := counter.Reset(); count > 0 {
r.logger.Infof("Inserted %d %s sla lifecycles", count, s.Name())
}
}).Stop()

sem := r.db.GetSemaphoreForTable(database.TableName(s.Entity()))
stmt, _ := r.db.BuildInsertIgnoreStmt(v1.NewSlaLifecycle())
return r.db.NamedBulkExec(
ctx, stmt, upsertCount, r.db.GetSemaphoreForTable(slaLifecycleTable),
CreateSlaLifecyclesFromCheckables(ctx, s.Entity(), g, insertSlaEntities, false),
com.NeverSplit[database.Entity], database.OnSuccessIncrement[database.Entity](&counter))
})

return r.db.BulkExec(ctx, r.db.BuildDeleteStmt(s.Entity()), deleteCount, sem, deleteIds, onSuccess...)
g.Go(func() error {
defer close(deleteIds)
if updateSlaEntities != nil {
defer close(updateSlaEntities)
}
}

// In order to always get the sla entries written even in case of system errors, we need to process these
// first. Otherwise, Icinga DB may be stopped after the regular queries have been processed, and deleted
// from the Redis stream, thus we won't be able to generate sla lifecycle for these entities.
//
// The general event process flow looks as follows:
// structifyStream() -> Reads `upsert` & `delete` events from redis and streams the entities to the
// respective chans `upsertEntities`, `deleteEntities` and waits for `upserted`
// and `deleted` chans (!allowParallel) before consuming the next one from redis.
// - Starts a goroutine that consumes from `upsertEntities` (when the current sync subject is of type checkable,
// this bulk inserts into the sla lifecycle table with semaphore 1 in a `INSERT ... IGNORE ON ERROR` fashion
// and forwards the entities to the next one, which then inserts the checkables into the regular Icinga DB
// tables). After successfully upserting the entities, (!allowParallel) they are passed sequentially to
// the `upserted` stream.
//
// - Starts another goroutine that consumes from `deleteEntities` concurrently. When the current sync subject is
// of type checkable, this performs sla lifecycle updates matching the checkables id and `delete_time` 0.
// Since the upgrade script should've generated a sla entry for all exiting Checkables, it's unlikely to
// happen, but when there is no tracked `created_at` event for a given checkable, this update is essentially
// a no-op, but forwards the original entities IDs nonetheless to the next one. And finally, the original
// checkables are deleted from the database and (!allowParallel) they are passed sequentially to the
// `deleted` stream.
switch s.Entity().(type) {
case *v1.Host, *v1.Service:
entities := make(chan database.Entity, 1)
g.Go(func() error {
defer close(entities)

var counter com.Counter
defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) {
if count := counter.Reset(); count > 0 {
r.logger.Infof("Upserted %d %s sla lifecycles", count, s.Name())
for {
select {
case <-ctx.Done():
return ctx.Err()
case deleteId, ok := <-deleteIdsMultiplexer:
if !ok {
return nil
}
}).Stop()

stmt, _ := r.db.BuildInsertIgnoreStmt(v1.NewSlaLifecycle())

// Not to mess up the already existing FIFO mechanism, we have to perform only a single query
// (semaphore 1) at a time, even though, the sla queries could be executed concurrently.
// After successfully upserting a lifecycle entity, the original checkable entity is streamed to "entities".
fromSlaEntities := CreateSlaLifecyclesFromCheckables(ctx, s.Entity(), g, upsertEntities, false)
return r.db.NamedBulkExec(
ctx, stmt, upsertCount, semaphore.NewWeighted(1), fromSlaEntities,
com.NeverSplit[database.Entity], database.OnSuccessIncrement[database.Entity](&counter),
OnSuccessApplyAndSendTo(entities, GetCheckableFromSlaLifecycle))
})

// Start the regular Icinga DB checkables upsert stream.
g.Go(upsertEntityFunc(entities))

// Start the regular Icinga DB checkables delete stream.
g.Go(deleteEntityFunc(StreamIDsFromUpdatedSlaLifecycles(ctx, r.db, s.Entity(), g, r.logger, deleteEntities)))
default:
// For non-checkables runtime updates of upsert event
g.Go(upsertEntityFunc(upsertEntities))

// For non-checkables runtime updates of delete event
deleteIds := make(chan any, r.redis.Options.XReadCount)
g.Go(func() error {
defer close(deleteIds)

for {
select {
case deleteIds <- deleteId:
case <-ctx.Done():
return ctx.Err()
case entity, ok := <-deleteEntities:
if !ok {
return nil
}
}

if updateSlaEntities != nil {
entity := factoryFunc()
entity.SetID(deleteId.(types.Binary))

select {
case deleteIds <- entity.ID():
case updateSlaEntities <- entity:
case <-ctx.Done():
return ctx.Err()
}
}
}
})
}
})

g.Go(deleteEntityFunc(deleteIds))
}
g.Go(func() error {
var counter com.Counter
defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) {
if count := counter.Reset(); count > 0 {
r.logger.Infof("Deleted %d %s items", count, s.Name())
}
}).Stop()

sem := r.db.GetSemaphoreForTable(database.TableName(s.Entity()))

onSuccess := []database.OnSuccess[any]{database.OnSuccessIncrement[any](&counter), database.OnSuccessIncrement[any](stat)}
if !allowParallel {
onSuccess = append(onSuccess, database.OnSuccessSendTo(deletedFifo))
}

return r.db.BulkExec(ctx, r.db.BuildDeleteStmt(s.Entity()), deleteCount, sem, deleteIds, onSuccess...)
})

g.Go(func() error {
var counter com.Counter
defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) {
if count := counter.Reset(); count > 0 {
r.logger.Infof("Updated %d %s sla lifecycles", count, s.Name())
}
}).Stop()

stmt := fmt.Sprintf(`UPDATE %s SET delete_time = :delete_time WHERE "id" = :id AND "delete_time" = 0`, slaLifecycleTable)
return r.db.NamedBulkExec(
ctx, stmt, deleteCount, r.db.GetSemaphoreForTable(slaLifecycleTable),
CreateSlaLifecyclesFromCheckables(ctx, s.Entity(), g, updateSlaEntities, true),
com.NeverSplit[database.Entity], database.OnSuccessIncrement[database.Entity](&counter))
})
}

// customvar and customvar_flat sync.
{
updateMessages := make(chan redis.XMessage, r.redis.Options.XReadCount)
upsertEntities := make(chan database.Entity, r.redis.Options.XReadCount)
deleteEntities := make(chan database.Entity, r.redis.Options.XReadCount)
deleteIds := make(chan interface{}, r.redis.Options.XReadCount)

cv := common.NewSyncSubject(v1.NewCustomvar)
cvFlat := common.NewSyncSubject(v1.NewCustomvarFlat)
Expand All @@ -250,7 +267,7 @@ func (r *RuntimeUpdates) Sync(

updateMessagesByKey["icinga:"+strcase.Delimited(cv.Name(), ':')] = updateMessages
g.Go(structifyStream(
ctx, updateMessages, upsertEntities, nil, deleteEntities, nil,
ctx, updateMessages, upsertEntities, nil, deleteIds, nil,
structify.MakeMapStructifier(
reflect.TypeOf(cv.Entity()).Elem(),
"json",
Expand Down Expand Up @@ -304,7 +321,7 @@ func (r *RuntimeUpdates) Sync(
var once sync.Once
for {
select {
case _, ok := <-deleteEntities:
case _, ok := <-deleteIds:
if !ok {
return nil
}
Expand Down Expand Up @@ -394,7 +411,7 @@ func (r *RuntimeUpdates) xRead(ctx context.Context, updateMessagesByKey map[stri
// Converted entities are inserted into the upsertEntities or deleteIds channel depending on the "runtime_type" message field.
func structifyStream(
ctx context.Context, updateMessages <-chan redis.XMessage, upsertEntities, upserted chan database.Entity,
deleteEntities chan database.Entity, deleted chan interface{}, structifier structify.MapStructifier,
deleteIds, deleted chan interface{}, structifier structify.MapStructifier,
) func() error {
if upserted == nil {
upserted = make(chan database.Entity)
Expand All @@ -409,7 +426,7 @@ func structifyStream(
return func() error {
defer func() {
close(upsertEntities)
close(deleteEntities)
close(deleteIds)
}()

for {
Expand Down Expand Up @@ -445,7 +462,7 @@ func structifyStream(
}
} else if runtimeType == "delete" {
select {
case deleteEntities <- entity:
case deleteIds <- entity.ID():
case <-ctx.Done():
return ctx.Err()
}
Expand Down
18 changes: 0 additions & 18 deletions pkg/icingadb/scoped_entity.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package icingadb

import (
"context"
"github.com/icinga/icinga-go-library/database"
)

Expand Down Expand Up @@ -30,20 +29,3 @@ func NewScopedEntity(entity database.Entity, scope interface{}) *ScopedEntity {
scope: scope,
}
}

// OnSuccessApplyAndSendTo applies the provided callback to all the rows of type "T" and streams them to the
// passed channel of type "U". The resulting closure is called with a context and stops as soon as this context
// is canceled or when there are no more records to stream.
func OnSuccessApplyAndSendTo[T any, U any](ch chan<- U, f func(T) U) database.OnSuccess[T] {
return func(ctx context.Context, rows []T) error {
for _, row := range rows {
select {
case ch <- f(row):
case <-ctx.Done():
return ctx.Err()
}
}

return nil
}
}
Loading

0 comments on commit 80a2088

Please sign in to comment.