diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index d5fe38f1a..2b4f56a84 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -277,6 +277,18 @@ func run() int { return nil }) + g.Go(func() error { + configInitSync.Wait() + + if err := synctx.Err(); err != nil { + return err + } + + logger.Info("Syncing Host and Service initial SLA lifecycle") + + return icingadb.SyncCheckablesSlaLifecycle(synctx, db) + }) + g.Go(func() error { configInitSync.Wait() diff --git a/pkg/icingadb/runtime_updates.go b/pkg/icingadb/runtime_updates.go index d81fe1b11..2759a89f2 100644 --- a/pkg/icingadb/runtime_updates.go +++ b/pkg/icingadb/runtime_updates.go @@ -10,6 +10,7 @@ import ( "github.com/icinga/icinga-go-library/redis" "github.com/icinga/icinga-go-library/strcase" "github.com/icinga/icinga-go-library/structify" + "github.com/icinga/icinga-go-library/types" "github.com/icinga/icingadb/pkg/common" "github.com/icinga/icingadb/pkg/contracts" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" @@ -71,9 +72,21 @@ func (r *RuntimeUpdates) Sync( s := common.NewSyncSubject(factoryFunc) stat := getCounterForEntity(s.Entity()) + // Multiplexer channels used to distribute the Redis entities to several consumers. + upsertEntitiesMultiplexer := make(chan database.Entity, 1) + deleteIdsMultiplexer := make(chan any, 1) + updateMessages := make(chan redis.XMessage, r.redis.Options.XReadCount) upsertEntities := make(chan database.Entity, r.redis.Options.XReadCount) - deleteEntities := make(chan database.Entity, r.redis.Options.XReadCount) + deleteIds := make(chan interface{}, r.redis.Options.XReadCount) + + var insertSlaEntities chan database.Entity + var updateSlaEntities chan database.Entity + switch s.Entity().(type) { + case *v1.Host, *v1.Service: + insertSlaEntities = make(chan database.Entity, r.redis.Options.XReadCount) + updateSlaEntities = make(chan database.Entity, r.redis.Options.XReadCount) + } var upsertedFifo chan database.Entity var deletedFifo chan interface{} @@ -95,152 +108,164 @@ func (r *RuntimeUpdates) Sync( r.logger.Debugf("Syncing runtime updates of %s", s.Name()) g.Go(structifyStream( - ctx, updateMessages, upsertEntities, upsertedFifo, deleteEntities, deletedFifo, + ctx, updateMessages, upsertEntitiesMultiplexer, upsertedFifo, deleteIdsMultiplexer, deletedFifo, structify.MakeMapStructifier( reflect.TypeOf(s.Entity()).Elem(), "json", contracts.SafeInit), )) - // upsertEntityFunc returns a closure that is used to upsert the regular Icinga DB entities. - // The returned func is used to directly start a separate goroutine that selects events - // sequentially (!allowParallel) from the given chan. - upsertEntityFunc := func(entities <-chan database.Entity) func() error { - return func() error { - var counter com.Counter - defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { - if count := counter.Reset(); count > 0 { - r.logger.Infof("Upserted %d %s items", count, s.Name()) + // This worker consumes the "upsert" event from Redis and redistributes the entities to the "upsertEntities" + // channel and for Host/Service entities also to the "insertSlaEntities" channel. + g.Go(func() error { + defer close(upsertEntities) + if insertSlaEntities != nil { + defer close(insertSlaEntities) + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case entity, ok := <-upsertEntitiesMultiplexer: + if !ok { + return nil } - }).Stop() - onSuccess := []database.OnSuccess[database.Entity]{ - database.OnSuccessIncrement[database.Entity](&counter), - database.OnSuccessIncrement[database.Entity](stat), + select { + case upsertEntities <- entity: + case <-ctx.Done(): + return ctx.Err() + } + + if insertSlaEntities != nil { + select { + case insertSlaEntities <- entity: + case <-ctx.Done(): + return ctx.Err() + } + } } - if !allowParallel { - onSuccess = append(onSuccess, database.OnSuccessSendTo(upsertedFifo)) + } + }) + + g.Go(func() error { + var counter com.Counter + defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + r.logger.Infof("Upserted %d %s items", count, s.Name()) } + }).Stop() - // Updates must be executed in order, ensure this by using a semaphore with maximum 1. - sem := semaphore.NewWeighted(1) + // Updates must be executed in order, ensure this by using a semaphore with maximum 1. + sem := semaphore.NewWeighted(1) - return r.db.NamedBulkExec( - ctx, upsertStmt, upsertCount, sem, entities, database.SplitOnDupId[database.Entity], onSuccess..., - ) + onSuccess := []database.OnSuccess[database.Entity]{ + database.OnSuccessIncrement[database.Entity](&counter), database.OnSuccessIncrement[database.Entity](stat), + } + if !allowParallel { + onSuccess = append(onSuccess, database.OnSuccessSendTo(upsertedFifo)) } - } - // deleteEntityFunc returns a closure that is used to delete the regular Icinga DB entities - // based on their ids. The returned func is used to directly start a separate goroutine that - // selects events sequentially (!allowParallel) from the given chan. - deleteEntityFunc := func(deleteIds <-chan any) func() error { - return func() error { - var counter com.Counter - defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { - if count := counter.Reset(); count > 0 { - r.logger.Infof("Deleted %d %s items", count, s.Name()) - } - }).Stop() + return r.db.NamedBulkExec( + ctx, upsertStmt, upsertCount, sem, upsertEntities, database.SplitOnDupId[database.Entity], onSuccess..., + ) + }) - onSuccess := []database.OnSuccess[any]{database.OnSuccessIncrement[any](&counter), database.OnSuccessIncrement[any](stat)} - if !allowParallel { - onSuccess = append(onSuccess, database.OnSuccessSendTo(deletedFifo)) + // Consumes from the "insertSlaEntities" channel and bulk inserts into the "sla_lifecycle" table. + g.Go(func() error { + var counter com.Counter + defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + r.logger.Infof("Inserted %d %s sla lifecycles", count, s.Name()) } + }).Stop() - sem := r.db.GetSemaphoreForTable(database.TableName(s.Entity())) + stmt, _ := r.db.BuildInsertIgnoreStmt(v1.NewSlaLifecycle()) + return r.db.NamedBulkExec( + ctx, stmt, upsertCount, r.db.GetSemaphoreForTable(slaLifecycleTable), + CreateSlaLifecyclesFromCheckables(ctx, s.Entity(), g, insertSlaEntities, false), + com.NeverSplit[database.Entity], database.OnSuccessIncrement[database.Entity](&counter)) + }) - return r.db.BulkExec(ctx, r.db.BuildDeleteStmt(s.Entity()), deleteCount, sem, deleteIds, onSuccess...) + // This worker consumes the "delete" event from Redis and redistributes the IDs to the "deleteIds" + // channel and for Host/Service entities also to the "updateSlaEntities" channel. + g.Go(func() error { + defer close(deleteIds) + if updateSlaEntities != nil { + defer close(updateSlaEntities) } - } - // In order to always get the sla entries written even in case of system errors, we need to process these - // first. Otherwise, Icinga DB may be stopped after the regular queries have been processed, and deleted - // from the Redis stream, thus we won't be able to generate sla lifecycle for these entities. - // - // The general event process flow looks as follows: - // structifyStream() -> Reads `upsert` & `delete` events from redis and streams the entities to the - // respective chans `upsertEntities`, `deleteEntities` and waits for `upserted` - // and `deleted` chans (!allowParallel) before consuming the next one from redis. - // - Starts a goroutine that consumes from `upsertEntities` (when the current sync subject is of type checkable, - // this bulk inserts into the sla lifecycle table with semaphore 1 in a `INSERT ... IGNORE ON ERROR` fashion - // and forwards the entities to the next one, which then inserts the checkables into the regular Icinga DB - // tables). After successfully upserting the entities, (!allowParallel) they are passed sequentially to - // the `upserted` stream. - // - // - Starts another goroutine that consumes from `deleteEntities` concurrently. When the current sync subject is - // of type checkable, this performs sla lifecycle updates matching the checkables id and `delete_time` 0. - // Since the upgrade script should've generated a sla entry for all exiting Checkables, it's unlikely to - // happen, but when there is no tracked `created_at` event for a given checkable, this update is essentially - // a no-op, but forwards the original entities IDs nonetheless to the next one. And finally, the original - // checkables are deleted from the database and (!allowParallel) they are passed sequentially to the - // `deleted` stream. - switch s.Entity().(type) { - case *v1.Host, *v1.Service: - entities := make(chan database.Entity, 1) - g.Go(func() error { - defer close(entities) - - var counter com.Counter - defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { - if count := counter.Reset(); count > 0 { - r.logger.Infof("Upserted %d %s sla lifecycles", count, s.Name()) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case deleteId, ok := <-deleteIdsMultiplexer: + if !ok { + return nil } - }).Stop() - - stmt, _ := r.db.BuildInsertIgnoreStmt(v1.NewSlaLifecycle()) - - // Not to mess up the already existing FIFO mechanism, we have to perform only a single query - // (semaphore 1) at a time, even though, the sla queries could be executed concurrently. - // After successfully upserting a lifecycle entity, the original checkable entity is streamed to "entities". - fromSlaEntities := CreateSlaLifecyclesFromCheckables(ctx, s.Entity(), g, upsertEntities, false) - return r.db.NamedBulkExec( - ctx, stmt, upsertCount, semaphore.NewWeighted(1), fromSlaEntities, - com.NeverSplit[database.Entity], database.OnSuccessIncrement[database.Entity](&counter), - OnSuccessApplyAndSendTo(entities, GetCheckableFromSlaLifecycle)) - }) - - // Start the regular Icinga DB checkables upsert stream. - g.Go(upsertEntityFunc(entities)) - // Start the regular Icinga DB checkables delete stream. - g.Go(deleteEntityFunc(StreamIDsFromUpdatedSlaLifecycles(ctx, r.db, s.Entity(), g, r.logger, deleteEntities))) - default: - // For non-checkables runtime updates of upsert event - g.Go(upsertEntityFunc(upsertEntities)) - - // For non-checkables runtime updates of delete event - deleteIds := make(chan any, r.redis.Options.XReadCount) - g.Go(func() error { - defer close(deleteIds) - - for { select { + case deleteIds <- deleteId: case <-ctx.Done(): return ctx.Err() - case entity, ok := <-deleteEntities: - if !ok { - return nil - } + } + + if updateSlaEntities != nil { + entity := factoryFunc() + entity.SetID(deleteId.(types.Binary)) select { - case deleteIds <- entity.ID(): + case updateSlaEntities <- entity: case <-ctx.Done(): return ctx.Err() } } } - }) + } + }) - g.Go(deleteEntityFunc(deleteIds)) - } + g.Go(func() error { + var counter com.Counter + defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + r.logger.Infof("Deleted %d %s items", count, s.Name()) + } + }).Stop() + + sem := r.db.GetSemaphoreForTable(database.TableName(s.Entity())) + + onSuccess := []database.OnSuccess[any]{database.OnSuccessIncrement[any](&counter), database.OnSuccessIncrement[any](stat)} + if !allowParallel { + onSuccess = append(onSuccess, database.OnSuccessSendTo(deletedFifo)) + } + + return r.db.BulkExec(ctx, r.db.BuildDeleteStmt(s.Entity()), deleteCount, sem, deleteIds, onSuccess...) + }) + + // Consumes from the "updateSlaEntities" channel and updates the "delete_time" of each + // SLA lifecycle entry with "delete_time = 0" to now. + g.Go(func() error { + var counter com.Counter + defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + r.logger.Infof("Updated %d %s sla lifecycles", count, s.Name()) + } + }).Stop() + + stmt := fmt.Sprintf(`UPDATE %s SET delete_time = :delete_time WHERE "id" = :id AND "delete_time" = 0`, slaLifecycleTable) + return r.db.NamedBulkExec( + ctx, stmt, deleteCount, r.db.GetSemaphoreForTable(slaLifecycleTable), + CreateSlaLifecyclesFromCheckables(ctx, s.Entity(), g, updateSlaEntities, true), + com.NeverSplit[database.Entity], database.OnSuccessIncrement[database.Entity](&counter)) + }) } // customvar and customvar_flat sync. { updateMessages := make(chan redis.XMessage, r.redis.Options.XReadCount) upsertEntities := make(chan database.Entity, r.redis.Options.XReadCount) - deleteEntities := make(chan database.Entity, r.redis.Options.XReadCount) + deleteIds := make(chan interface{}, r.redis.Options.XReadCount) cv := common.NewSyncSubject(v1.NewCustomvar) cvFlat := common.NewSyncSubject(v1.NewCustomvarFlat) @@ -250,7 +275,7 @@ func (r *RuntimeUpdates) Sync( updateMessagesByKey["icinga:"+strcase.Delimited(cv.Name(), ':')] = updateMessages g.Go(structifyStream( - ctx, updateMessages, upsertEntities, nil, deleteEntities, nil, + ctx, updateMessages, upsertEntities, nil, deleteIds, nil, structify.MakeMapStructifier( reflect.TypeOf(cv.Entity()).Elem(), "json", @@ -304,7 +329,7 @@ func (r *RuntimeUpdates) Sync( var once sync.Once for { select { - case _, ok := <-deleteEntities: + case _, ok := <-deleteIds: if !ok { return nil } @@ -394,7 +419,7 @@ func (r *RuntimeUpdates) xRead(ctx context.Context, updateMessagesByKey map[stri // Converted entities are inserted into the upsertEntities or deleteIds channel depending on the "runtime_type" message field. func structifyStream( ctx context.Context, updateMessages <-chan redis.XMessage, upsertEntities, upserted chan database.Entity, - deleteEntities chan database.Entity, deleted chan interface{}, structifier structify.MapStructifier, + deleteIds, deleted chan interface{}, structifier structify.MapStructifier, ) func() error { if upserted == nil { upserted = make(chan database.Entity) @@ -409,7 +434,7 @@ func structifyStream( return func() error { defer func() { close(upsertEntities) - close(deleteEntities) + close(deleteIds) }() for { @@ -445,7 +470,7 @@ func structifyStream( } } else if runtimeType == "delete" { select { - case deleteEntities <- entity: + case deleteIds <- entity.ID(): case <-ctx.Done(): return ctx.Err() } diff --git a/pkg/icingadb/scoped_entity.go b/pkg/icingadb/scoped_entity.go index 5589cc213..5a77e7c53 100644 --- a/pkg/icingadb/scoped_entity.go +++ b/pkg/icingadb/scoped_entity.go @@ -1,7 +1,6 @@ package icingadb import ( - "context" "github.com/icinga/icinga-go-library/database" ) @@ -30,20 +29,3 @@ func NewScopedEntity(entity database.Entity, scope interface{}) *ScopedEntity { scope: scope, } } - -// OnSuccessApplyAndSendTo applies the provided callback to all the rows of type "T" and streams them to the -// passed channel of type "U". The resulting closure is called with a context and stops as soon as this context -// is canceled or when there are no more records to stream. -func OnSuccessApplyAndSendTo[T any, U any](ch chan<- U, f func(T) U) database.OnSuccess[T] { - return func(ctx context.Context, rows []T) error { - for _, row := range rows { - select { - case ch <- f(row): - case <-ctx.Done(): - return ctx.Err() - } - } - - return nil - } -} diff --git a/pkg/icingadb/sla_lifecycle.go b/pkg/icingadb/sla_lifecycle.go index 8a45c738d..25511d6d6 100644 --- a/pkg/icingadb/sla_lifecycle.go +++ b/pkg/icingadb/sla_lifecycle.go @@ -3,10 +3,9 @@ package icingadb import ( "context" "fmt" - "github.com/icinga/icinga-go-library/com" + "github.com/icinga/icinga-go-library/backoff" "github.com/icinga/icinga-go-library/database" - "github.com/icinga/icinga-go-library/logging" - "github.com/icinga/icinga-go-library/periodic" + "github.com/icinga/icinga-go-library/retry" "github.com/icinga/icinga-go-library/types" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/pkg/errors" @@ -14,19 +13,8 @@ import ( "time" ) -// tableName defines the table name of v1.SlaLifecycle type. -var tableName = database.TableName(v1.NewSlaLifecycle()) - -// GetCheckableFromSlaLifecycle returns the original checkable from which the specified sla lifecycle were transformed. -// When the passed entity is not of type *SlaLifecycle, it is returned as is. -func GetCheckableFromSlaLifecycle(e database.Entity) database.Entity { - s, ok := e.(*v1.SlaLifecycle) - if !ok { - return e - } - - return s.SourceEntity -} +// slaLifecycleTable defines the table name of v1.SlaLifecycle type. +var slaLifecycleTable = database.TableName(v1.NewSlaLifecycle()) // CreateSlaLifecyclesFromCheckables transforms the given checkables to sla lifecycle struct // and streams them into a returned channel. @@ -52,13 +40,15 @@ func CreateSlaLifecyclesFromCheckables( return nil } - now := time.Now() - sl := &v1.SlaLifecycle{ EnvironmentMeta: v1.EnvironmentMeta{EnvironmentId: env.Id}, - CreateTime: types.UnixMilli(now), + CreateTime: types.UnixMilli(time.Now()), DeleteTime: types.UnixMilli(time.Unix(0, 0)), - SourceEntity: checkable, + } + + if isDeleteEvent { + sl.DeleteTime = types.UnixMilli(time.Now()) + sl.CreateTime = types.UnixMilli(time.Unix(0, 0)) } switch subject.(type) { @@ -68,20 +58,11 @@ func CreateSlaLifecyclesFromCheckables( case *v1.Service: sl.Id = checkable.ID().(types.Binary) sl.ServiceId = sl.Id - if service, ok := checkable.(*v1.Service); ok { - // checkable may be of type v1.EntityWithChecksum if this is a deletion event triggered - // by the initial config sync as determined by the config delta calculation. - sl.HostId = service.HostId - } + sl.HostId = checkable.(*v1.Service).HostId default: return errors.Errorf("sla lifecycle for type %T is not supported", checkable) } - if isDeleteEvent { - sl.DeleteTime = types.UnixMilli(now) - sl.CreateTime = types.UnixMilli(time.Unix(0, 0)) - } - select { case slaLifecycles <- sl: case <-ctx.Done(): @@ -94,37 +75,49 @@ func CreateSlaLifecyclesFromCheckables( return slaLifecycles } -// StreamIDsFromUpdatedSlaLifecycles updates the `delete_time` of the sla lifecycle table for each of the Checkables -// consumed from the provided "entities" chan and upon successful execution of the query streams the original IDs -// of the entities into the returned channel. +// SyncCheckablesSlaLifecycle inserts one `create_time` sla lifecycle entry for each of the checkables from +// the `host` and `service` tables and updates the `delete_time` of each of the sla lifecycle entries whose +// host/service IDs cannot be found in the `host/service` tables. // // It's unlikely, but when a given Checkable doesn't already have a `create_time` entry in the database, the update -// query won't update anything. Either way the entities IDs are streamed into the returned chan. -func StreamIDsFromUpdatedSlaLifecycles( - ctx context.Context, db *database.DB, subject database.Entity, g *errgroup.Group, logger *logging.Logger, entities <-chan database.Entity, -) <-chan any { - deleteEntityIDs := make(chan any, 1) - - g.Go(func() error { - defer close(deleteEntityIDs) - - var counter com.Counter - defer periodic.Start(ctx, logger.Interval(), func(_ periodic.Tick) { - if count := counter.Reset(); count > 0 { - logger.Infof("Updated %d %s sla lifecycles", count, types.Name(subject)) +// query won't update anything. Likewise, the insert statements may also become a no-op if the Checkables already +// have a `create_time` entry with ´delete_time = 0`. +// +// This function retries any database errors for at least `5m` before giving up and failing with an error. +func SyncCheckablesSlaLifecycle(ctx context.Context, db *database.DB) error { + hostInsertStmtFmt := ` +INSERT INTO %[1]s (id, environment_id, host_id, create_time) + SELECT id, environment_id, id, %[2]d AS create_time + FROM host WHERE NOT EXISTS(SELECT 1 FROM %[1]s WHERE service_id IS NULL AND delete_time = 0 AND host_id = host.id);` + + hostUpdateStmtFmt := ` +UPDATE %[1]s SET delete_time = %[2]d + WHERE service_id IS NULL AND delete_time = 0 AND NOT EXISTS(SELECT 1 FROM host WHERE host.id = %[1]s.id)` + + serviceInsertStmtFmt := ` +INSERT INTO %[1]s (id, environment_id, host_id, service_id, create_time) + SELECT id, environment_id, host_id, id, %[2]d AS create_time + FROM service WHERE NOT EXISTS(SELECT 1 FROM %[1]s WHERE delete_time = 0 AND service_id = service.id);` + + serviceUpdateStmtFmt := ` +UPDATE %[1]s SET delete_time = %[2]d + WHERE delete_time = 0 AND service_id IS NOT NULL AND NOT EXISTS(SELECT 1 FROM service WHERE service.id = %[1]s.id)` + + return retry.WithBackoff( + ctx, + func(context.Context) error { + eventTime := time.Now().UnixMilli() + for _, queryFmt := range []string{hostInsertStmtFmt, hostUpdateStmtFmt, serviceInsertStmtFmt, serviceUpdateStmtFmt} { + query := fmt.Sprintf(queryFmt, slaLifecycleTable, eventTime) + if _, err := db.ExecContext(ctx, query); err != nil { + return database.CantPerformQuery(err, query) + } } - }).Stop() - - sem := db.GetSemaphoreForTable(tableName) - stmt := fmt.Sprintf(`UPDATE %s SET delete_time = :delete_time WHERE "id" = :id AND "delete_time" = 0`, tableName) - - // extractEntityId is used as a callback for the on success mechanism to extract the checkables id. - extractEntityId := func(e database.Entity) any { return e.(*v1.SlaLifecycle).SourceEntity.ID() } - - return db.NamedBulkExec( - ctx, stmt, 1, sem, CreateSlaLifecyclesFromCheckables(ctx, subject, g, entities, true), - com.NeverSplit[database.Entity], OnSuccessApplyAndSendTo[database.Entity, any](deleteEntityIDs, extractEntityId)) - }) - return deleteEntityIDs + return nil + }, + retry.Retryable, + backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), + db.GetDefaultRetrySettings(), + ) } diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index e056ba7b5..6b39ee64f 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -111,12 +111,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { // Create if len(delta.Create) > 0 { s.logger.Infof("Inserting %d items of type %s", len(delta.Create), strcase.Delimited(types.Name(delta.Subject.Entity()), ' ')) - createStreamedFunc := func(entities <-chan database.Entity) func() error { - return func() error { - return s.db.CreateStreamed(ctx, entities, database.OnSuccessIncrement[database.Entity](stat)) - } - } - + var entities <-chan database.Entity if delta.Subject.WithChecksum() { pairs, errs := s.redis.HMYield( ctx, @@ -128,30 +123,16 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, delta.Subject.Factory(), pairs, runtime.NumCPU()) // Let errors from CreateEntities cancel our group. com.ErrgroupReceive(g, errs) - entities, errs := icingaredis.SetChecksums(ctx, entitiesWithoutChecksum, delta.Create, runtime.NumCPU()) + entities, errs = icingaredis.SetChecksums(ctx, entitiesWithoutChecksum, delta.Create, runtime.NumCPU()) // Let errors from SetChecksums cancel our group. com.ErrgroupReceive(g, errs) - - switch delta.Subject.Entity().(type) { - case *v1.Host, *v1.Service: - s.logger.Infof("Inserting %d %s sla lifecycle", len(delta.Create), delta.Subject.Name()) - - createdEntities := make(chan database.Entity, len(delta.Create)) - g.Go(func() error { - defer close(createdEntities) - - return s.db.CreateIgnoreStreamed( - ctx, CreateSlaLifecyclesFromCheckables(ctx, delta.Subject.Entity(), g, entities, false), - OnSuccessApplyAndSendTo(createdEntities, GetCheckableFromSlaLifecycle)) - }) - - g.Go(createStreamedFunc(createdEntities)) - default: - g.Go(createStreamedFunc(entities)) - } } else { - g.Go(createStreamedFunc(delta.Create.Entities(ctx))) + entities = delta.Create.Entities(ctx) } + + g.Go(func() error { + return s.db.CreateStreamed(ctx, entities, database.OnSuccessIncrement[database.Entity](stat)) + }) } // Update @@ -181,19 +162,9 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { // Delete if len(delta.Delete) > 0 { s.logger.Infof("Deleting %d items of type %s", len(delta.Delete), strcase.Delimited(types.Name(delta.Subject.Entity()), ' ')) - entity := delta.Subject.Entity() - switch entity.(type) { - case *v1.Host, *v1.Service: - g.Go(func() error { - return s.db.DeleteStreamed( - ctx, entity, StreamIDsFromUpdatedSlaLifecycles(ctx, s.db, entity, g, s.logger, delta.Delete.Entities(ctx)), - database.OnSuccessIncrement[any](stat)) - }) - default: - g.Go(func() error { - return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs(), database.OnSuccessIncrement[any](stat)) - }) - } + g.Go(func() error { + return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs(), database.OnSuccessIncrement[any](stat)) + }) } return g.Wait() diff --git a/pkg/icingadb/v1/sla_lifecycle.go b/pkg/icingadb/v1/sla_lifecycle.go index 5fa78ffe2..3e8853a42 100644 --- a/pkg/icingadb/v1/sla_lifecycle.go +++ b/pkg/icingadb/v1/sla_lifecycle.go @@ -12,9 +12,6 @@ type SlaLifecycle struct { ServiceId types.Binary `json:"service_id"` CreateTime types.UnixMilli `json:"create_time"` DeleteTime types.UnixMilli `json:"delete_time"` - - // The original checkable entity from which this sla lifecycle were transformed - SourceEntity database.Entity `json:"-" db:"-"` } func NewSlaLifecycle() database.Entity {