diff --git a/cmd/expirator/main.go b/cmd/expirator/main.go index 1a9abac0a2..3d3d353401 100644 --- a/cmd/expirator/main.go +++ b/cmd/expirator/main.go @@ -82,7 +82,7 @@ func main() { fatalOnError(err) - slog.Info("Trial cleanup job finished successfully!") + slog.Info("Expirator job finished successfully!") err = conn.Close() if err != nil { diff --git a/cmd/subaccountsync/main.go b/cmd/subaccountsync/main.go index 3676a86f6b..51db9a758a 100644 --- a/cmd/subaccountsync/main.go +++ b/cmd/subaccountsync/main.go @@ -1,18 +1,135 @@ package main import ( + "context" "fmt" "log/slog" "os" -) + "os/signal" + "time" + + "github.com/kyma-project/kyma-environment-broker/internal/events" + "github.com/kyma-project/kyma-environment-broker/internal/storage" + "github.com/sirupsen/logrus" + + "github.com/kyma-project/kyma-environment-broker/internal/kymacustomresource" + "k8s.io/apimachinery/pkg/runtime/schema" -const ( - AppPrefix = "subaccount-sync" + kebConfig "github.com/kyma-project/kyma-environment-broker/internal/config" + "github.com/vrischmann/envconfig" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + "sigs.k8s.io/controller-runtime/pkg/client/config" + + subsync "github.com/kyma-project/kyma-environment-broker/internal/subaccountsync" ) +const AppPrefix = "subaccount_sync" + func main() { - // create slog logger - logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) - logger.Info(fmt.Sprintf("%s app stub started", AppPrefix)) + // create context + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) + defer cancel() + + cli := getK8sClient() + + // create and fill config + var cfg subsync.Config + err := envconfig.InitWithPrefix(&cfg, AppPrefix) + if err != nil { + fatalOnError(err) + } + + logLevel := new(slog.LevelVar) + logLevel.Set(cfg.GetLogLevel()) + slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: logLevel, + })).With("service", "subaccount-sync")) + + slog.Info(fmt.Sprintf("Configuration: event window size:%s, event sync interval:%s, accounts sync interval: %s, storage sync interval: %s, queue sleep interval: %s", + cfg.EventsWindowSize, cfg.EventsSyncInterval, cfg.AccountsSyncInterval, cfg.StorageSyncInterval, cfg.SyncQueueSleepInterval)) + + // create config provider - provider still uses logrus logger + configProvider := kebConfig.NewConfigProvider( + kebConfig.NewConfigMapReader(ctx, cli, logrus.WithField("service", "storage"), cfg.KymaVersion), + kebConfig.NewConfigMapKeysValidator(), + kebConfig.NewConfigMapConverter()) + + // create Kyma GVR + kymaGVR := getResourceKindProvider(cfg.KymaVersion, configProvider) + + // create DB connection + cipher := storage.NewEncrypter(cfg.Database.SecretKey) + db, dbConn, err := storage.NewFromConfig(cfg.Database, events.Config{}, cipher, logrus.WithField("service", "storage")) + if err != nil { + fatalOnError(err) + } + defer func() { + if r := recover(); r != nil { + slog.Error("Recovered from panic. Error:\n", r) + } + err = dbConn.Close() + if err != nil { + slog.Warn(fmt.Sprintf("failed to close database connection: %s", err.Error())) + } + }() + + // create dynamic K8s client + dynamicK8sClient := createDynamicK8sClient() + + // create service + syncService := subsync.NewSyncService(AppPrefix, ctx, cfg, kymaGVR, db, dynamicK8sClient) + syncService.Run() +} + +func getK8sClient() client.Client { + k8sCfg, err := config.GetConfig() + fatalOnError(err) + cli, err := createK8sClient(k8sCfg) + fatalOnError(err) + return cli +} + +func createK8sClient(cfg *rest.Config) (client.Client, error) { + mapper, err := apiutil.NewDiscoveryRESTMapper(cfg) + if err != nil { + err = wait.Poll(time.Second, time.Minute, func() (bool, error) { + mapper, err = apiutil.NewDiscoveryRESTMapper(cfg) + if err != nil { + return false, nil + } + return true, nil + }) + if err != nil { + return nil, fmt.Errorf("while waiting for client mapper: %w", err) + } + } + cli, err := client.New(cfg, client.Options{Mapper: mapper}) + if err != nil { + return nil, fmt.Errorf("while creating a client: %w", err) + } + return cli, nil +} + +func createDynamicK8sClient() dynamic.Interface { + kcpK8sConfig := config.GetConfigOrDie() + clusterClient, err := dynamic.NewForConfig(kcpK8sConfig) + fatalOnError(err) + return clusterClient +} + +func getResourceKindProvider(kymaVersion string, configProvider *kebConfig.ConfigProvider) schema.GroupVersionResource { + resourceKindProvider := kymacustomresource.NewResourceKindProvider(kymaVersion, configProvider) + kymaGVR, err := resourceKindProvider.DefaultGvr() + fatalOnError(err) + return kymaGVR +} +func fatalOnError(err error) { + if err != nil { + panic(err) + } } diff --git a/go.mod b/go.mod index 6233ad4dae..3298a97c1b 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,7 @@ require ( golang.org/x/exp v0.0.0-20240213143201-ec583247a57a golang.org/x/mod v0.17.0 golang.org/x/oauth2 v0.19.0 + golang.org/x/time v0.5.0 gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 k8s.io/api v0.29.2 @@ -79,6 +80,7 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/google/gnostic v0.7.0 // indirect github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 // indirect + github.com/google/go-cmp v0.6.0 // indirect github.com/google/go-querystring v1.1.0 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect @@ -128,7 +130,6 @@ require ( golang.org/x/sys v0.18.0 // indirect golang.org/x/term v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect - golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.18.0 // indirect google.golang.org/protobuf v1.33.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect diff --git a/internal/subaccountsync/cis_accounts.go b/internal/subaccountsync/cis_accounts.go new file mode 100644 index 0000000000..0f8668181a --- /dev/null +++ b/internal/subaccountsync/cis_accounts.go @@ -0,0 +1,68 @@ +package subaccountsync + +import ( + "encoding/json" + "fmt" + "io" + "net/http" +) + +func (c *RateLimitedCisClient) buildSubaccountRequest(subaccountID string) (*http.Request, error) { + request, err := http.NewRequest(http.MethodGet, fmt.Sprintf(subaccountServicePath, c.config.ServiceURL, subaccountID), nil) + if err != nil { + return nil, fmt.Errorf("while creating request: %w", err) + } + q := request.URL.Query() + request.URL.RawQuery = q.Encode() + return request, nil +} + +func (c *RateLimitedCisClient) fetchDataForSetOfSubaccounts(subaccounts subaccountsSetType) (statesFromCisType, error) { + + subaccountsDataFromAccounts := make(statesFromCisType) + for subaccount := range subaccounts { + accountData, err := c.GetSubaccountData(string(subaccount)) + if err != nil { + c.log.Error(fmt.Sprintf("while getting subaccount data: %s", err)) + } else { + c.log.Debug(fmt.Sprintf("getting for subaccount %s", subaccount)) + subaccountsDataFromAccounts[subaccount] = accountData + } + } + return subaccountsDataFromAccounts, nil +} + +func (c *RateLimitedCisClient) GetSubaccountData(subaccountID string) (CisStateType, error) { + request, err := c.buildSubaccountRequest(subaccountID) + if err != nil { + return CisStateType{}, fmt.Errorf("while building request for accounts technical service: %w", err) + } + + response, err := c.httpClient.Do(request) + if err != nil { + return CisStateType{}, fmt.Errorf("while executing request to accounts technical service: %w", err) + } + + defer func(Body io.ReadCloser) { + err := Body.Close() + if err != nil { + c.log.Warn(fmt.Sprintf("failed to close response body: %s", err.Error())) + } + }(response.Body) + + if response.StatusCode == http.StatusNotFound { + return CisStateType{}, nil + } + + if response.StatusCode != http.StatusOK { + return CisStateType{}, fmt.Errorf("while processing response: %s", c.handleErrorStatusCode(response)) + } + + var cisResponse CisStateType + err = json.NewDecoder(response.Body).Decode(&cisResponse) + if err != nil { + return CisStateType{}, fmt.Errorf("while decoding CIS account response: %w", err) + } + + return cisResponse, nil +} diff --git a/internal/subaccountsync/cis_events.go b/internal/subaccountsync/cis_events.go new file mode 100644 index 0000000000..9925908349 --- /dev/null +++ b/internal/subaccountsync/cis_events.go @@ -0,0 +1,101 @@ +package subaccountsync + +import ( + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "strconv" +) + +func (c *RateLimitedCisClient) buildEventRequest(page int, fromActionTime int64) (*http.Request, error) { + request, err := http.NewRequest(http.MethodGet, fmt.Sprintf(eventServicePath, c.config.ServiceURL), nil) + if err != nil { + return nil, fmt.Errorf("while creating request: %v", err) + } + q := request.URL.Query() + q.Add("eventType", eventType) + q.Add("pageSize", c.config.PageSize) + q.Add("pageNum", strconv.Itoa(page)) + q.Add("fromActionTime", strconv.FormatInt(fromActionTime, 10)) + q.Add("sortField", "actionTime") + q.Add("sortOrder", "ASC") + + request.URL.RawQuery = q.Encode() + + return request, nil +} + +func (c *RateLimitedCisClient) FetchEventsWindow(fromActionTime int64) ([]Event, error) { + var events []Event + var currentPage int + for { + cisResponse, err := c.fetchEventsPage(currentPage, fromActionTime) + if err != nil { + c.log.Error(fmt.Sprintf("while getting subaccount events for %d page: %v", currentPage, err)) + return nil, err + } + events = append(events, cisResponse.Events...) + currentPage++ + if cisResponse.TotalPages == currentPage { + break + } + } + c.log.Debug(fmt.Sprintf("Fetched event window - pages: %d, events: %d, from epoch: %d", currentPage, len(events), fromActionTime)) + return events, nil +} + +func (c *RateLimitedCisClient) fetchEventsPage(page int, fromActionTime int64) (CisEventsResponse, error) { + request, err := c.buildEventRequest(page, fromActionTime) + if err != nil { + return CisEventsResponse{}, fmt.Errorf("while building request for event service: %v", err) + } + + response, err := c.httpClient.Do(request) + if err != nil { + return CisEventsResponse{}, fmt.Errorf("while executing request to event service: %v", err) + } + defer func(Body io.ReadCloser) { + err := Body.Close() + if err != nil { + c.log.Warn(fmt.Sprintf("failed to close response body: %s", err.Error())) + } + }(response.Body) + + if response.StatusCode != http.StatusOK { + return CisEventsResponse{}, fmt.Errorf("while processing response: %s", c.handleErrorStatusCode(response)) + } + + var cisResponse CisEventsResponse + err = json.NewDecoder(response.Body).Decode(&cisResponse) + if err != nil { + return CisEventsResponse{}, fmt.Errorf("while decoding CIS events response: %v", err) + } + + return cisResponse, nil +} + +func (c *RateLimitedCisClient) getEventsForSubaccounts(fromActionTime int64, logs slog.Logger, subaccountsMap subaccountsSetType) ([]Event, error) { + rawEvents, err := c.FetchEventsWindow(fromActionTime) + if err != nil { + logs.Error(fmt.Sprintf("while getting subaccount delete events: %s", err)) + return nil, err + } + + // filter events to get only the ones in subaccounts map + filteredEventsFromCis := filterEvents(rawEvents, subaccountsMap) + logs.Info(fmt.Sprintf("Raw events: %d, filtered: %d", len(rawEvents), len(filteredEventsFromCis))) + + return filteredEventsFromCis, nil +} + +func filterEvents(rawEvents []Event, subaccounts subaccountsSetType) []Event { + var filteredEvents []Event + for _, event := range rawEvents { + if _, ok := subaccounts[subaccountIDType(event.SubaccountID)]; ok { + filteredEvents = append(filteredEvents, event) + } + } + return filteredEvents +} diff --git a/internal/subaccountsync/cis_model.go b/internal/subaccountsync/cis_model.go new file mode 100644 index 0000000000..d7a095ed93 --- /dev/null +++ b/internal/subaccountsync/cis_model.go @@ -0,0 +1,37 @@ +package subaccountsync + +type ( + EventDetails struct { + BetaEnabled bool `json:"betaEnabled"` + UsedForProduction string `json:"usedForProduction"` + } + + Event struct { + ActionTime int64 `json:"actionTime"` + SubaccountID string `json:"entityId"` + Type string `json:"eventType"` + Details EventDetails + } + + CisEventsResponse struct { + Total int `json:"total"` + TotalPages int `json:"totalPages"` + PageNum int `json:"pageNum"` + Events []Event `json:"events"` + } + + CisStateType struct { + BetaEnabled bool `json:"betaEnabled"` + UsedForProduction string `json:"usedForProduction"` + ModifiedDate int64 `json:"modifiedDate"` + } + + CisErrorResponseType struct { + Code int `json:"code"` + Message string `json:"message"` + } + + CisAccountErrorResponseType struct { + Error CisErrorResponseType `json:"error"` + } +) diff --git a/internal/subaccountsync/config.go b/internal/subaccountsync/config.go new file mode 100644 index 0000000000..972e4f4c1b --- /dev/null +++ b/internal/subaccountsync/config.go @@ -0,0 +1,51 @@ +package subaccountsync + +import ( + "log/slog" + "strings" + "time" + + "github.com/kyma-project/kyma-environment-broker/internal/storage" +) + +type ( + Config struct { + CisAccounts CisEndpointConfig + CisEvents CisEndpointConfig + Database storage.Config + UpdateResources bool `envconfig:"default=false"` + EventsWindowSize time.Duration `envconfig:"default=20m"` + EventsSyncInterval time.Duration `envconfig:"default=15m"` + AccountsSyncInterval time.Duration `envconfig:"default=24h"` + StorageSyncInterval time.Duration `envconfig:"default=10m"` + SyncQueueSleepInterval time.Duration `envconfig:"default=30s"` + MetricsPort string `envconfig:"default=8081"` + LogLevel string `envconfig:"default=info"` + KymaVersion string + } + + CisEndpointConfig struct { + ClientID string + ClientSecret string + AuthURL string + ServiceURL string + PageSize string `envconfig:"default=150,optional"` + RateLimitingInterval time.Duration `envconfig:"default=2s,optional"` + MaxRequestsPerInterval int `envconfig:"default=5,optional"` + } +) + +func (c Config) GetLogLevel() slog.Level { + switch strings.ToUpper(c.LogLevel) { + case "DEBUG": + return slog.LevelDebug + case "INFO": + return slog.LevelInfo + case "WARN": + return slog.LevelWarn + case "ERROR": + return slog.LevelError + default: + return slog.LevelInfo + } +} diff --git a/internal/subaccountsync/event_window.go b/internal/subaccountsync/event_window.go new file mode 100644 index 0000000000..bdf1d7dc57 --- /dev/null +++ b/internal/subaccountsync/event_window.go @@ -0,0 +1,36 @@ +package subaccountsync + +type EventWindow struct { + lastFromTime int64 + lastToTime int64 + windowSize int64 + nowMillisFunc func() int64 +} + +func NewEventWindow(windowSize int64, nowFunc func() int64) *EventWindow { + return &EventWindow{ + nowMillisFunc: nowFunc, + windowSize: windowSize, + } +} + +func (ew *EventWindow) GetNextFromTime() int64 { + eventsFrom := ew.nowMillisFunc() - ew.windowSize + if eventsFrom > ew.lastToTime { + eventsFrom = ew.lastToTime + } + if eventsFrom < 0 { + eventsFrom = 0 + } + return eventsFrom +} + +func (ew *EventWindow) UpdateFromTime(fromTime int64) { + ew.lastFromTime = fromTime +} + +func (ew *EventWindow) UpdateToTime(eventTime int64) { + if eventTime > ew.lastToTime { + ew.lastToTime = eventTime + } +} diff --git a/internal/subaccountsync/event_window_test.go b/internal/subaccountsync/event_window_test.go new file mode 100644 index 0000000000..2af5d22176 --- /dev/null +++ b/internal/subaccountsync/event_window_test.go @@ -0,0 +1,36 @@ +package subaccountsync + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +const windowSize = 20 * 60 * 1000 + +func TestEventWindow(t *testing.T) { + now := int64(0) + ew := NewEventWindow(windowSize, func() int64 { return now }) + + t.Run("should not return negative value", func(t *testing.T) { + from := ew.GetNextFromTime() + assert.Equal(t, int64(0), from) + }) + + t.Run("should return beginning of time when called first time, and sized window when called second time", func(t *testing.T) { + now = 1709164800000 // 2024-12-28 00:00:00 + from := ew.GetNextFromTime() + assert.Equal(t, int64(0), from) + ew.UpdateToTime(now - 1000) // second to midnight + ew.UpdateFromTime(from) + assert.Equal(t, now-1000, ew.lastToTime) + assert.Equal(t, from, ew.lastFromTime) + now = 1709164800000 + 15*60*1000 // 2024-12-28 00:15:00 + from = ew.GetNextFromTime() + assert.Equal(t, now-windowSize, from) + ew.UpdateToTime(now - 1000) // 2024-12-28 00:14:59 + ew.UpdateFromTime(from) + assert.Equal(t, now-1000, ew.lastToTime) + assert.Equal(t, from, ew.lastFromTime) + }) +} diff --git a/internal/subaccountsync/informer.go b/internal/subaccountsync/informer.go new file mode 100644 index 0000000000..b669aa281a --- /dev/null +++ b/internal/subaccountsync/informer.go @@ -0,0 +1,68 @@ +package subaccountsync + +import ( + "fmt" + "log/slog" + "reflect" + + "github.com/prometheus/client_golang/prometheus" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/tools/cache" +) + +func configureInformer(informer *cache.SharedIndexInformer, stateReconciler *stateReconcilerType, logger *slog.Logger, metrics *Metrics) { + _, err := (*informer).AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + metrics.informer.With(prometheus.Labels{"event": "add"}).Inc() + u, ok := obj.(*unstructured.Unstructured) + if !ok { + logger.Error(fmt.Sprintf("added Kyma resource is not an Unstructured: %s", obj)) + return + } + subaccountID, runtimeID, betaEnabled := getDataFromLabels(u) + if subaccountID == "" { + logger.Error(fmt.Sprintf("added Kyma resource has no subaccount label: %s", u.GetName())) + return + } + stateReconciler.reconcileResourceUpdate(subaccountIDType(subaccountID), runtimeIDType(runtimeID), runtimeStateType{betaEnabled: betaEnabled}) + data, err := stateReconciler.accountsClient.GetSubaccountData(subaccountID) + if err != nil { + logger.Warn(fmt.Sprintf("while getting data for subaccount:%s", err)) + } else { + stateReconciler.reconcileCisAccount(subaccountIDType(subaccountID), data) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + metrics.informer.With(prometheus.Labels{"event": "update"}).Inc() + u, ok := newObj.(*unstructured.Unstructured) + if !ok { + logger.Error(fmt.Sprintf("updated Kyma resource is not an Unstructured: %s", newObj)) + return + } + subaccountID, runtimeID, betaEnabled := getDataFromLabels(u) + if subaccountID == "" { + logger.Error(fmt.Sprintf("updated Kyma resource has no subaccount label: %s", u.GetName())) + return + } + if !reflect.DeepEqual(oldObj.(*unstructured.Unstructured).GetLabels(), u.GetLabels()) { + stateReconciler.reconcileResourceUpdate(subaccountIDType(subaccountID), runtimeIDType(runtimeID), runtimeStateType{betaEnabled: betaEnabled}) + } + }, + DeleteFunc: func(obj interface{}) { + metrics.informer.With(prometheus.Labels{"event": "delete"}).Inc() + u, ok := obj.(*unstructured.Unstructured) + if !ok { + logger.Error(fmt.Sprintf("deleted Kyma resource is not an Unstructured: %s", obj)) + return + } + logger.Info(fmt.Sprintf("Kyma resource deleted: %s", u.GetName())) + subaccountID, runtimeID, _ := getDataFromLabels(u) + if subaccountID == "" || runtimeID == "" { + // deleted kyma resource without subaccount label or runtime label - no need to make fuss, silently ignore + return + } + stateReconciler.deleteRuntimeFromState(subaccountIDType(subaccountID), runtimeIDType(runtimeID)) + }, + }) + fatalOnError(err) +} diff --git a/internal/subaccountsync/metrics.go b/internal/subaccountsync/metrics.go new file mode 100644 index 0000000000..890900eb6e --- /dev/null +++ b/internal/subaccountsync/metrics.go @@ -0,0 +1,43 @@ +package subaccountsync + +import "github.com/prometheus/client_golang/prometheus" + +type Metrics struct { + queue prometheus.Gauge + queueOps *prometheus.CounterVec + cisRequests *prometheus.CounterVec + states *prometheus.GaugeVec + informer *prometheus.CounterVec +} + +func NewMetrics(reg prometheus.Registerer, namespace string) *Metrics { + m := &Metrics{ + states: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "in_memory_states", + Help: "Information about in-memory states.", + }, []string{"type"}), + queueOps: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "priority_queue_ops", + Help: "Priority queue operations.", + }, []string{"operation"}), + cisRequests: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "cis_requests", + Help: "CIS requests.", + }, []string{"endpoint"}), + informer: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "informer", + Help: "Informer stats.", + }, []string{"event"}), + queue: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "priority_queue_size", + Help: "Queue size.", + }), + } + reg.MustRegister(m.queue, m.queueOps, m.states, m.informer, m.cisRequests) + return m +} diff --git a/internal/subaccountsync/rate_limited_client.go b/internal/subaccountsync/rate_limited_client.go new file mode 100644 index 0000000000..c8afb33fdc --- /dev/null +++ b/internal/subaccountsync/rate_limited_client.go @@ -0,0 +1,64 @@ +package subaccountsync + +import ( + "context" + "fmt" + "io" + "log/slog" + "net/http" + + "golang.org/x/oauth2/clientcredentials" + "golang.org/x/time/rate" +) + +type RateLimitedCisClient struct { + ctx context.Context + httpClient *http.Client + config CisEndpointConfig + log *slog.Logger + RateLimiter *rate.Limiter +} + +type RateLimiter interface { + Do(req *http.Request) (*http.Response, error) +} + +func NewRateLimitedCisClient(ctx context.Context, config CisEndpointConfig, log *slog.Logger) *RateLimitedCisClient { + cfg := clientcredentials.Config{ + ClientID: config.ClientID, + ClientSecret: config.ClientSecret, + TokenURL: config.AuthURL, + } + httpClientOAuth := cfg.Client(ctx) + + rl := rate.NewLimiter(rate.Every(config.RateLimitingInterval), config.MaxRequestsPerInterval) + + return &RateLimitedCisClient{ + ctx: ctx, + httpClient: httpClientOAuth, + config: config, + log: log, + RateLimiter: rl, + } +} + +func (c *RateLimitedCisClient) Do(req *http.Request) (*http.Response, error) { + err := c.RateLimiter.Wait(c.ctx) + if err != nil { + return nil, err + } + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + return resp, nil +} + +func (c *RateLimitedCisClient) handleErrorStatusCode(response *http.Response) string { + body, err := io.ReadAll(response.Body) + if err != nil { + return fmt.Sprintf("server returned %d status code, response body is unreadable", response.StatusCode) + } + + return fmt.Sprintf("server returned %d status code, body: %s", response.StatusCode, string(body)) +} diff --git a/internal/subaccountsync/state_reconciler.go b/internal/subaccountsync/state_reconciler.go new file mode 100644 index 0000000000..eedacea1e8 --- /dev/null +++ b/internal/subaccountsync/state_reconciler.go @@ -0,0 +1,355 @@ +package subaccountsync + +import ( + "context" + "fmt" + "time" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/go-co-op/gocron" + "github.com/kyma-project/kyma-environment-broker/internal" + "github.com/kyma-project/kyma-environment-broker/internal/syncqueues" +) + +func (reconciler *stateReconcilerType) recreateStateFromDB() { + logs := reconciler.logger + dbStates, err := reconciler.db.SubaccountStates().ListStates() + if err != nil { + logs.Error(fmt.Sprintf("while getting states from db: %s", err)) + return + } + + for _, subaccount := range dbStates { + //create subaccount state in inMemoryState + reconciler.inMemoryState[subaccountIDType(subaccount.ID)] = subaccountStateType{ + cisState: CisStateType{subaccount.BetaEnabled == "true", subaccount.UsedForProduction, subaccount.ModifiedAt}, + } + } + + subaccountsMap, err := reconciler.getDistinctSubaccountsFromInstances() + if err != nil { + logs.Warn(fmt.Sprintf("while getting subaccounts from db: %s", err)) + return + } + + for subaccount := range reconciler.inMemoryState { + _, ok := subaccountsMap[subaccount] + if !ok { + logs.Warn(fmt.Sprintf("subaccount %s found in previous state but not found in current instances, will be deleted", subaccount)) + reconciler.setPendingDelete(subaccount) + } + } + + for subaccount := range subaccountsMap { + _, ok := reconciler.inMemoryState[subaccount] + if !ok { + logs.Warn(fmt.Sprintf("subaccount %s not found in previous state but found in current instances", subaccount)) + reconciler.inMemoryState[subaccount] = subaccountStateType{} + } + } + reconciler.setMetrics() +} + +func (reconciler *stateReconcilerType) setPendingDelete(subaccount subaccountIDType) { + reconciler.mutex.Lock() + defer reconciler.mutex.Unlock() + + state := reconciler.inMemoryState[subaccount] + state.pendingDelete = true + reconciler.inMemoryState[subaccount] = state +} + +func (reconciler *stateReconcilerType) setMetrics() { + if reconciler.metrics == nil { + return + } + reconciler.metrics.states.With(prometheus.Labels{"type": "total"}).Set(float64(len(reconciler.inMemoryState))) + //count subaccounts with beta enabled + betaEnabled := 0 + //create map for UsedForProduction + usedForProduction := make(map[string]int) + for _, state := range reconciler.inMemoryState { + if state.cisState != (CisStateType{}) { + if state.cisState.BetaEnabled { + betaEnabled++ + } + //increment counter for UsedForProduction + usedForProduction[state.cisState.UsedForProduction]++ + } + } + reconciler.metrics.states.With(prometheus.Labels{"type": "betaEnabled"}).Set(float64(betaEnabled)) + // for each UsedForProduction value set the counter + for key, value := range usedForProduction { + reconciler.metrics.states.With(prometheus.Labels{"type": key}).Set(float64(value)) + } +} + +func (reconciler *stateReconcilerType) periodicAccountsSync() { + logs := reconciler.logger + + // get distinct subaccounts from inMemoryState + subaccountsSet := reconciler.getAllSubaccountIDsFromState() + logs.Info(fmt.Sprintf("Running CIS accounts synchronization for %d subaccounts", len(subaccountsSet))) + + for subaccountID := range subaccountsSet { + subaccountDataFromCis, err := reconciler.accountsClient.GetSubaccountData(string(subaccountID)) + if subaccountDataFromCis == (CisStateType{}) { + logs.Warn(fmt.Sprintf("subaccount %s not found in CIS", subaccountID)) + continue + } + if err != nil { + logs.Error(fmt.Sprintf("while getting data for subaccount:%s", err)) + } else { + reconciler.reconcileCisAccount(subaccountID, subaccountDataFromCis) + } + } +} + +func (reconciler *stateReconcilerType) periodicEventsSync(fromActionTime int64) { + + logs := reconciler.logger + eventsClient := reconciler.eventsClient + subaccountsSet := reconciler.getAllSubaccountIDsFromState() + + logs.Info(fmt.Sprintf("Running CIS events synchronization from epoch: %d for %d subaccounts", fromActionTime, len(subaccountsSet))) + + eventsOfInterest, err := eventsClient.getEventsForSubaccounts(fromActionTime, *logs, subaccountsSet) + if err != nil { + logs.Error(fmt.Sprintf("while getting subaccount events: %s", err)) + // we will retry in the next run + } + + ew := reconciler.eventWindow + for _, event := range eventsOfInterest { + reconciler.reconcileCisEvent(event) + ew.UpdateToTime(event.ActionTime) + } +} + +func (reconciler *stateReconcilerType) getAllSubaccountIDsFromState() subaccountsSetType { + subaccountsMap := make(subaccountsSetType) + for subaccount := range reconciler.inMemoryState { + subaccountsMap[subaccount] = struct{}{} + } + return subaccountsMap +} + +func (reconciler *stateReconcilerType) runCronJobs(cfg Config, ctx context.Context) { + s := gocron.NewScheduler(time.UTC) + + logs := reconciler.logger + ew := reconciler.eventWindow + + _, err := s.Every(cfg.EventsSyncInterval).Do(func() { + // establish actual time window + eventsFrom := ew.GetNextFromTime() + + reconciler.periodicEventsSync(eventsFrom) + reconciler.metrics.cisRequests.With(prometheus.Labels{"endpoint": "events"}).Inc() + + ew.UpdateFromTime(eventsFrom) + }) + if err != nil { + logs.Error(fmt.Sprintf("while scheduling events sync job: %s", err)) + } + + _, err = s.Every(cfg.AccountsSyncInterval).Do(func() { + reconciler.periodicAccountsSync() + reconciler.metrics.cisRequests.With(prometheus.Labels{"endpoint": "accounts"}).Inc() + }) + if err != nil { + logs.Error(fmt.Sprintf("while scheduling accounts sync job: %s", err)) + } + + _, err = s.Every(cfg.StorageSyncInterval).Do(func() { + logs.Info(fmt.Sprintf("Running state storage synchronization")) + reconciler.storeStateInDb() + }) + if err != nil { + logs.Error(fmt.Sprintf("while scheduling storage sync job: %s", err)) + } + + s.StartBlocking() // blocks the current goroutine - we do not reach the end of the runCronJobs function +} + +func (reconciler *stateReconcilerType) reconcileCisAccount(subaccountID subaccountIDType, newCisState CisStateType) { + reconciler.mutex.Lock() + defer reconciler.mutex.Unlock() + + logs := reconciler.logger + + state, ok := reconciler.inMemoryState[subaccountID] + if !ok { + logs.Warn(fmt.Sprintf("subaccount %s for which we called accounts not found in in-memory state - should not happen", subaccountID)) + return + } + if newCisState.ModifiedDate >= state.cisState.ModifiedDate { + state.cisState = newCisState + reconciler.inMemoryState[subaccountID] = state + reconciler.enqueueSubaccountIfOutdated(subaccountID, state) + } + reconciler.setMetrics() +} + +func (reconciler *stateReconcilerType) reconcileCisEvent(event Event) { + reconciler.mutex.Lock() + defer reconciler.mutex.Unlock() + + logs := reconciler.logger + + subaccount := subaccountIDType(event.SubaccountID) + state, ok := reconciler.inMemoryState[subaccount] + if !ok { + // possible case when subaccount was deleted from the state and then created after the last full sync, we will sync it next time + logs.Warn(fmt.Sprintf("subaccount %s for event not found in state", subaccount)) + } + if event.ActionTime >= state.cisState.ModifiedDate { + cisState := CisStateType{ + BetaEnabled: event.Details.BetaEnabled, + UsedForProduction: event.Details.UsedForProduction, + ModifiedDate: event.ActionTime, + } + state.cisState = cisState + reconciler.inMemoryState[subaccount] = state + reconciler.enqueueSubaccountIfOutdated(subaccount, state) + } + reconciler.setMetrics() +} + +func (reconciler *stateReconcilerType) reconcileResourceUpdate(subaccountID subaccountIDType, runtimeID runtimeIDType, runtimeState runtimeStateType) { + reconciler.mutex.Lock() + defer reconciler.mutex.Unlock() + + state, ok := reconciler.inMemoryState[subaccountID] + if !ok { + // we create new state, there is no state for this subaccount yet (no data form CIS to compare + //log + reconciler.logger.Debug(fmt.Sprintf("subaccount %s not found in state, creating new state", subaccountID)) + reconciler.inMemoryState[subaccountID] = subaccountStateType{ + resourcesState: subaccountRuntimesType{runtimeID: runtimeState}, + } + } else { + if state.resourcesState == nil { + state.resourcesState = make(subaccountRuntimesType) + } + state.resourcesState[runtimeID] = runtimeState + reconciler.inMemoryState[subaccountID] = state + reconciler.logger.Debug(fmt.Sprintf("subaccount %s found in state, check if outdated", subaccountID)) + reconciler.enqueueSubaccountIfOutdated(subaccountID, state) + } + reconciler.setMetrics() +} + +// mark state pending delete and remove runtime from subaccount state +func (reconciler *stateReconcilerType) deleteRuntimeFromState(subaccountID subaccountIDType, runtimeID runtimeIDType) { + reconciler.mutex.Lock() + defer reconciler.mutex.Unlock() + + logs := reconciler.logger + state, ok := reconciler.inMemoryState[subaccountID] + if !ok { + logs.Warn(fmt.Sprintf("subaccount %s not found in state", subaccountID)) + return + } + _, ok = state.resourcesState[runtimeID] + if !ok { + logs.Warn(fmt.Sprintf("runtime %s not found in subaccount %s", runtimeID, subaccountID)) + return + } + delete(state.resourcesState, runtimeID) + state.pendingDelete = len(state.resourcesState) == 0 + reconciler.inMemoryState[subaccountID] = state + reconciler.setMetrics() +} + +// Requests for change are queued in priority queue, the queue is consumed by the updater. +// Since there are multiple sources of changes (events, accounts, resources) and the changes can appear not chronologically, we use priority queue ordered by action time. +// By definition updater (single instance) processes the queue in order of action time and assures that the state is updated in the correct order. + +// E.g. consider following scenario (t1 0 { + runtimes := state.resourcesState + cisState := state.cisState + for _, runtimeState := range runtimes { + outdated = outdated || runtimeState.betaEnabled == "" // label not set at all + outdated = outdated || (cisState.BetaEnabled && runtimeState.betaEnabled == "false") + outdated = outdated || (!cisState.BetaEnabled && runtimeState.betaEnabled == "true") // label set to different value + } + } + return outdated +} + +func (reconciler *stateReconcilerType) storeStateInDb() { + reconciler.mutex.Lock() + defer reconciler.mutex.Unlock() + + var upsertCnt, deleteCnt, failureCnt int + logs := reconciler.logger + + logs.Info(fmt.Sprintf("Syncing state to persistent storage")) + + for subaccount, state := range reconciler.inMemoryState { + if state.pendingDelete { // no runtimes left, we can delete the state from the storage + err := reconciler.db.SubaccountStates().DeleteState(string(subaccount)) + if err != nil { + logs.Error(fmt.Sprintf("while deleting subaccount:%s state from persistent storage: %s", subaccount, err)) + failureCnt++ + continue + } + deleteCnt++ + delete(reconciler.inMemoryState, subaccount) + } else { + err := reconciler.db.SubaccountStates().UpsertState(internal.SubaccountState{ + ID: string(subaccount), + BetaEnabled: fmt.Sprintf("%t", state.cisState.BetaEnabled), + UsedForProduction: state.cisState.UsedForProduction, + ModifiedAt: state.cisState.ModifiedDate, + }) + if err != nil { + failureCnt++ + logs.Error(fmt.Sprintf("while deleting subaccount:%s state from persistent storage: %s", subaccount, err)) + continue + } + upsertCnt++ + } + } + logs.Info(fmt.Sprintf("State synced to persistent storage: %d upserts, %d deletes, %d failures", upsertCnt, deleteCnt, failureCnt)) +} + +func (reconciler *stateReconcilerType) getDistinctSubaccountsFromInstances() (subaccountsSetType, error) { + reconciler.mutex.Lock() + defer reconciler.mutex.Unlock() + + subaccounts, err := reconciler.db.Instances().GetDistinctSubAccounts() + + subaccountsSet := make(subaccountsSetType) + for _, subaccount := range subaccounts { + subaccountsSet[subaccountIDType(subaccount)] = struct{}{} + } + return subaccountsSet, err +} + +func epochInMillis() int64 { + return time.Now().UnixNano() / int64(time.Millisecond) +} diff --git a/internal/subaccountsync/state_reconciler_test.go b/internal/subaccountsync/state_reconciler_test.go new file mode 100644 index 0000000000..cf1c04165e --- /dev/null +++ b/internal/subaccountsync/state_reconciler_test.go @@ -0,0 +1,1320 @@ +package subaccountsync + +import ( + "context" + "fmt" + "log/slog" + "net/http" + "os" + "strconv" + "sync" + "testing" + "time" + + "github.com/kyma-project/kyma-environment-broker/internal/cis" + "golang.org/x/time/rate" + + "github.com/kyma-project/kyma-environment-broker/internal" + "github.com/kyma-project/kyma-environment-broker/internal/storage" + queues "github.com/kyma-project/kyma-environment-broker/internal/syncqueues" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + subaccountId1 = "subaccount-1" + subaccountId2 = "subaccount-2" + subaccountId3 = "subaccount-3" + subaccountId4 = "subaccount-4" + runtimeId11 = "runtime-1-1" + runtimeId12 = "runtime-1-2" + runtimeId21 = "runtime-2-1" + runtimeId31 = "runtime-3-1" + veryOldTime = 1 + oldTime = 10 + notSoOldTime = 20 + recent = 40 +) + +var logger = slog.New(slog.NewTextHandler(os.Stderr, nil)) + +var useInMemoryStorage, _ = strconv.ParseBool(os.Getenv("DB_IN_MEMORY_FOR_E2E_TESTS")) + +func setupSuite(t testing.TB) func(t testing.TB) { + logger.Info("setup suite") + var tearDownFunc func() + if useInMemoryStorage { + logger.Info("using in-memory storage") + } else { + logger.Info("using real storage") + tearDownFunc = setupStorageContainer() + } + + return func(t testing.TB) { + logger.Info("teardown suite") + if tearDownFunc != nil { + tearDownFunc() + } + } +} + +func setupTestNilStorage(t testing.TB) (func(t testing.TB), storage.BrokerStorage) { + logger.Info("setup test - no storage needed") + + return func(t testing.TB) { + logger.Info("teardown test") + }, nil +} + +func setupTestWithStorage(t testing.TB) (func(t testing.TB), storage.BrokerStorage) { + logger.Info("setup test - create storage") + storageCleanup, brokerStorage, err := getStorageForTests() + require.NoError(t, err) + require.NotNil(t, brokerStorage) + + return func(t testing.TB) { + if storageCleanup != nil { + logger.Info("teardown test - cleanup storage") + err := storageCleanup() + assert.NoError(t, err) + } + }, brokerStorage +} + +func TestStateReconcilerWithFakeCisServer(t *testing.T) { + teardownSuite := setupSuite(t) + + srv, err := cis.NewFakeServer() + defer srv.Close() + require.NoError(t, err) + + defer teardownSuite(t) + + cisClient := srv.Client() + cisConfig := CisEndpointConfig{ + ServiceURL: srv.URL, + RateLimitingInterval: time.Minute * 10, + MaxRequestsPerInterval: 1000, + } + + t.Run("should schedule update of one resource after getting account data from faked CIS, then new subaccount comes in", func(t *testing.T) { + teardownTest, brokerStorage := setupTestNilStorage(t) + defer teardownTest(t) + + reconciler := createNewReconcilerWithFakeCisServer(brokerStorage, cisClient, cisConfig) + + // given + // initial event from a kyma resource, first runtime, no label + reconciler.reconcileResourceUpdate(cis.FakeSubaccountID1, runtimeId11, runtimeStateType{betaEnabled: ""}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + + // queue should be empty since we have not got state from CIS + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then the same subaccount, second runtime, with false label + reconciler.reconcileResourceUpdate(cis.FakeSubaccountID1, runtimeId12, runtimeStateType{betaEnabled: "false"}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then we get state from CIS (accounts) + reconciler.periodicAccountsSync() + + // then queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + element, ok := reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, cis.FakeSubaccountID1, element.SubaccountID) + assert.Equal(t, "false", element.BetaEnabled) + assert.Equal(t, "NOT_USED_FOR_PRODUCTION", reconciler.inMemoryState[cis.FakeSubaccountID1].cisState.UsedForProduction) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + //then we got update from the plane (updater updated resources) + reconciler.reconcileResourceUpdate(cis.FakeSubaccountID1, runtimeId11, runtimeStateType{betaEnabled: "false"}) + reconciler.reconcileResourceUpdate(cis.FakeSubaccountID1, runtimeId12, runtimeStateType{betaEnabled: "false"}) + + // then we add kyma resource, so we got update from informer + reconciler.reconcileResourceUpdate(cis.FakeSubaccountID2, runtimeId21, runtimeStateType{betaEnabled: "false"}) + assert.Equal(t, 2, len(reconciler.inMemoryState)) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then we get state from CIS (accounts) + reconciler.periodicAccountsSync() + + // then queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + element, ok = reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, cis.FakeSubaccountID2, element.SubaccountID) + assert.Equal(t, "true", element.BetaEnabled) + assert.Equal(t, "USED_FOR_PRODUCTION", reconciler.inMemoryState[cis.FakeSubaccountID2].cisState.UsedForProduction) + assert.True(t, reconciler.syncQueue.IsEmpty()) + }) + + t.Run("should not schedule any change when event comes about one subaccount and betaEnabled has the same value", func(t *testing.T) { + teardownTest, brokerStorage := setupTestNilStorage(t) + defer teardownTest(t) + + reconciler := createNewReconcilerWithFakeCisServer(brokerStorage, cisClient, cisConfig) + + // given + // initial event from a kyma resources, all true + reconciler.reconcileResourceUpdate(cis.FakeSubaccountID1, runtimeId11, runtimeStateType{betaEnabled: "true"}) + reconciler.reconcileResourceUpdate(cis.FakeSubaccountID2, runtimeId21, runtimeStateType{betaEnabled: "true"}) + assert.Equal(t, 2, len(reconciler.inMemoryState)) + + // queue should be empty since we have not got state from CIS + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then we get state from CIS (accounts) + reconciler.periodicAccountsSync() + + // then queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + element, ok := reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, cis.FakeSubaccountID1, element.SubaccountID) + assert.Equal(t, "false", element.BetaEnabled) + assert.Equal(t, "NOT_USED_FOR_PRODUCTION", reconciler.inMemoryState[cis.FakeSubaccountID1].cisState.UsedForProduction) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + //then we got update from the plane (updater updated resources) + reconciler.reconcileResourceUpdate(cis.FakeSubaccountID1, runtimeId11, runtimeStateType{betaEnabled: "false"}) + + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then we get events from CIS but betaEnabled is not changed + reconciler.periodicEventsSync(1710770000000) + + // then queue should contain be empty + assert.True(t, reconciler.syncQueue.IsEmpty()) + }) + + t.Run("should schedule update after event comes about change of one subaccount and betaEnabled has different value", func(t *testing.T) { + teardownTest, brokerStorage := setupTestNilStorage(t) + defer teardownTest(t) + + reconciler := createNewReconcilerWithFakeCisServer(brokerStorage, cisClient, cisConfig) + + // given + // initial event from a kyma resources, all true + reconciler.reconcileResourceUpdate(cis.FakeSubaccountID1, runtimeId11, runtimeStateType{betaEnabled: "true"}) + reconciler.reconcileResourceUpdate(cis.FakeSubaccountID2, runtimeId21, runtimeStateType{betaEnabled: "true"}) + assert.Equal(t, 2, len(reconciler.inMemoryState)) + + // queue should be empty since we have not got state from CIS + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then we get state from CIS (accounts) + reconciler.periodicAccountsSync() + + // then queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + element, ok := reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, cis.FakeSubaccountID1, element.SubaccountID) + assert.Equal(t, "false", element.BetaEnabled) + assert.Equal(t, "NOT_USED_FOR_PRODUCTION", reconciler.inMemoryState[cis.FakeSubaccountID1].cisState.UsedForProduction) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + //then we got update from the plane (updater updated resources) + reconciler.reconcileResourceUpdate(cis.FakeSubaccountID1, runtimeId11, runtimeStateType{betaEnabled: "false"}) + + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then we get events from CIS but betaEnabled is changed + reconciler.periodicEventsSync(1710761400000) + + // then queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + element, ok = reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, cis.FakeSubaccountID1, element.SubaccountID) + assert.Equal(t, "true", element.BetaEnabled) + assert.Equal(t, "UNSET", reconciler.inMemoryState[cis.FakeSubaccountID1].cisState.UsedForProduction) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + //then we got update from the plane (updater updated resources) + reconciler.reconcileResourceUpdate(cis.FakeSubaccountID1, runtimeId11, runtimeStateType{betaEnabled: "true"}) + + assert.True(t, reconciler.syncQueue.IsEmpty()) + }) + + t.Run("should handle properly change for many runtimes repeating requests", func(t *testing.T) { + teardownTest, brokerStorage := setupTestNilStorage(t) + defer teardownTest(t) + + reconciler := createNewReconcilerWithFakeCisServer(brokerStorage, cisClient, cisConfig) + + // given + // initial event from a kyma resources, all true + reconciler.reconcileResourceUpdate(cis.FakeSubaccountID1, runtimeId11, runtimeStateType{betaEnabled: "true"}) + reconciler.reconcileResourceUpdate(cis.FakeSubaccountID1, runtimeId12, runtimeStateType{betaEnabled: "true"}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + + // queue should be empty since we have not got state from CIS + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then we get state from CIS (accounts) + reconciler.periodicAccountsSync() + + // then queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + element, ok := reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, cis.FakeSubaccountID1, element.SubaccountID) + assert.Equal(t, "false", element.BetaEnabled) + assert.Equal(t, "NOT_USED_FOR_PRODUCTION", reconciler.inMemoryState[cis.FakeSubaccountID1].cisState.UsedForProduction) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + //then we got update from the plane (updater updated first resource) + reconciler.reconcileResourceUpdate(cis.FakeSubaccountID1, runtimeId11, runtimeStateType{betaEnabled: "false"}) + + // but we still have one resource with true label so we enqueue the update request again + assert.False(t, reconciler.syncQueue.IsEmpty()) + element, ok = reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, cis.FakeSubaccountID1, element.SubaccountID) + assert.Equal(t, "false", element.BetaEnabled) + assert.Equal(t, "NOT_USED_FOR_PRODUCTION", reconciler.inMemoryState[cis.FakeSubaccountID1].cisState.UsedForProduction) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + //then we got update from the plane (updater updated second resource) + reconciler.reconcileResourceUpdate(cis.FakeSubaccountID1, runtimeId12, runtimeStateType{betaEnabled: "false"}) + + assert.True(t, reconciler.syncQueue.IsEmpty()) + }) + + t.Run("should handle properly change for many runtimes with event coming", func(t *testing.T) { + teardownTest, brokerStorage := setupTestNilStorage(t) + defer teardownTest(t) + + reconciler := createNewReconcilerWithFakeCisServer(brokerStorage, cisClient, cisConfig) + + // given + // initial event from a kyma resources, all true + reconciler.reconcileResourceUpdate(cis.FakeSubaccountID1, runtimeId11, runtimeStateType{betaEnabled: "true"}) + reconciler.reconcileResourceUpdate(cis.FakeSubaccountID1, runtimeId12, runtimeStateType{betaEnabled: "true"}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + + // queue should be empty since we have not got state from CIS + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then we get state from CIS (accounts) + reconciler.periodicAccountsSync() + + // then queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + element, ok := reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, cis.FakeSubaccountID1, element.SubaccountID) + assert.Equal(t, "false", element.BetaEnabled) + assert.Equal(t, "NOT_USED_FOR_PRODUCTION", reconciler.inMemoryState[cis.FakeSubaccountID1].cisState.UsedForProduction) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + //then we got update from the plane (updater updated first resource) + reconciler.reconcileResourceUpdate(cis.FakeSubaccountID1, runtimeId11, runtimeStateType{betaEnabled: "false"}) + + // but we still have one resource with true label so we enqueue the update request again + assert.False(t, reconciler.syncQueue.IsEmpty()) + + // then we get events from CIS and betaEnabled is changed + reconciler.periodicEventsSync(1710761400000) + + // then queue should contain one element but with beteEnabled true + assert.False(t, reconciler.syncQueue.IsEmpty()) + + element, ok = reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, cis.FakeSubaccountID1, element.SubaccountID) + assert.Equal(t, "true", element.BetaEnabled) + assert.Equal(t, "UNSET", reconciler.inMemoryState[cis.FakeSubaccountID1].cisState.UsedForProduction) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + //then we got outstanding update from the plane (updater updated second resource with false label) + reconciler.reconcileResourceUpdate(cis.FakeSubaccountID1, runtimeId12, runtimeStateType{betaEnabled: "false"}) + + // so we have not reached the stable state so we enqueue the update request again + assert.False(t, reconciler.syncQueue.IsEmpty()) + element, ok = reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, cis.FakeSubaccountID1, element.SubaccountID) + assert.Equal(t, "true", element.BetaEnabled) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + //then we got update from the plane (updater updated second resource with true label) + reconciler.reconcileResourceUpdate(cis.FakeSubaccountID1, runtimeId12, runtimeStateType{betaEnabled: "true"}) + + assert.False(t, reconciler.syncQueue.IsEmpty()) + element, ok = reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, cis.FakeSubaccountID1, element.SubaccountID) + assert.Equal(t, "true", element.BetaEnabled) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + //then we got update from the plane (updater updated second resource with true label) + reconciler.reconcileResourceUpdate(cis.FakeSubaccountID1, runtimeId11, runtimeStateType{betaEnabled: "true"}) + assert.True(t, reconciler.syncQueue.IsEmpty()) + }) +} + +func TestStateReconciler(t *testing.T) { + teardownSuite := setupSuite(t) + defer teardownSuite(t) + + t.Run("should schedule update of one resource after getting account data from CIS", func(t *testing.T) { + teardownTest, brokerStorage := setupTestNilStorage(t) + defer teardownTest(t) + + reconciler := createNewReconciler(brokerStorage) + + // given + // initial event from a kyma resource, first runtime, no label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: ""}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + + // queue should be empty since we have not got state from CIS + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then the same subaccount, second runtime, with false label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId12, runtimeStateType{betaEnabled: "false"}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then we get state from CIS (accounts) + reconciler.reconcileCisAccount(subaccountId1, CisStateType{BetaEnabled: true, UsedForProduction: "NOT_USED_FOR_PRODUCTION", ModifiedDate: veryOldTime}) + + // queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + element, ok := reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, subaccountId1, element.SubaccountID) + assert.Equal(t, "true", element.BetaEnabled) + + assert.True(t, reconciler.syncQueue.IsEmpty()) + }) + + t.Run("should schedule update of one resource after getting account and event from CIS", func(t *testing.T) { + teardownTest, brokerStorage := setupTestNilStorage(t) + defer teardownTest(t) + + reconciler := createNewReconciler(brokerStorage) + + // given + // initial event from a kyma resource, first runtime, no label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: ""}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + + // queue should be empty since we have not got state from CIS + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // when the same subaccount, second runtime, with false label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId12, runtimeStateType{betaEnabled: "false"}) + // then + assert.Equal(t, 1, len(reconciler.inMemoryState)) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // when we get state from CIS (accounts) + reconciler.reconcileCisAccount(subaccountId1, CisStateType{BetaEnabled: false, UsedForProduction: "NOT_USED_FOR_PRODUCTION", ModifiedDate: 1}) + // then queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + + // when we get recent event from CIS with true label + + reconciler.reconcileCisEvent(fixCisUpdateEvent(subaccountId1, "true", oldTime)) + + // then queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + + element, ok := reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, subaccountId1, element.SubaccountID) + assert.Equal(t, "true", element.BetaEnabled) + + assert.True(t, reconciler.syncQueue.IsEmpty()) + + }) + + t.Run("should schedule update of one resource after getting event and account data from CIS", func(t *testing.T) { + teardownTest, brokerStorage := setupTestNilStorage(t) + defer teardownTest(t) + + reconciler := createNewReconciler(brokerStorage) + + // given + // initial event from a kyma resource, first runtime, no label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: ""}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + + // queue should be empty since we have not got state from CIS + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // when the same subaccount, second runtime, with false label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId12, runtimeStateType{betaEnabled: "false"}) + // then + assert.Equal(t, 1, len(reconciler.inMemoryState)) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // when we get recent event from CIS with true label + reconciler.reconcileCisEvent(fixCisUpdateEvent(subaccountId1, "true", oldTime)) + + // then queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + + // when we get very old state from CIS (accounts) + cisState := CisStateType{BetaEnabled: false, UsedForProduction: "false", ModifiedDate: veryOldTime} + reconciler.reconcileCisAccount(subaccountId1, cisState) + // then queue should still contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + + element, ok := reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, subaccountId1, element.SubaccountID) + assert.Equal(t, "true", element.BetaEnabled) + + assert.True(t, reconciler.syncQueue.IsEmpty()) + }) + t.Run("should schedule update after getting event and then gets account data", func(t *testing.T) { + teardownTest, brokerStorage := setupTestNilStorage(t) + defer teardownTest(t) + + reconciler := createNewReconciler(brokerStorage) + + // given + // initial event from a kyma resource, first runtime, no label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: ""}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + + // queue should be empty since we have not got state from CIS + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // when the same subaccount, second runtime, with false label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId12, runtimeStateType{betaEnabled: "false"}) + // then + assert.Equal(t, 1, len(reconciler.inMemoryState)) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // when we get recent event from CIS with true label + reconciler.reconcileCisEvent(fixCisUpdateEvent(subaccountId1, "true", oldTime)) + + // then queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + + element, ok := reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, subaccountId1, element.SubaccountID) + assert.Equal(t, "true", element.BetaEnabled) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // when we get older state from CIS (accounts) + reconciler.reconcileCisAccount(subaccountId1, CisStateType{BetaEnabled: false, UsedForProduction: "NOT_USED_FOR_PRODUCTION", ModifiedDate: 1}) + + // then queue should still be empty since we used more recent event to update the resource + assert.True(t, reconciler.syncQueue.IsEmpty()) + + }) + t.Run("should schedule update after each event", func(t *testing.T) { + teardownTest, brokerStorage := setupTestNilStorage(t) + defer teardownTest(t) + + reconciler := createNewReconciler(brokerStorage) + + // given + // initial event from a kyma resource, first runtime, no label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: ""}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + + // queue should be empty since we have not got state from CIS + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then the same subaccount, second runtime, with false label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId12, runtimeStateType{betaEnabled: "false"}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then we get event from CIS with false label + reconciler.reconcileCisEvent(fixCisUpdateEvent(subaccountId1, "false", oldTime)) + + // queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + element, ok := reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, subaccountId1, element.SubaccountID) + assert.Equal(t, "false", element.BetaEnabled) + + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then we get event from CIS with false label + + reconciler.reconcileCisEvent(fixCisUpdateEvent(subaccountId1, "true", notSoOldTime)) + + // queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + element, ok = reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, subaccountId1, element.SubaccountID) + assert.Equal(t, "true", element.BetaEnabled) + + assert.True(t, reconciler.syncQueue.IsEmpty()) + }) + t.Run("should schedule update after two consecutive events", func(t *testing.T) { + teardownTest, brokerStorage := setupTestNilStorage(t) + defer teardownTest(t) + + reconciler := createNewReconciler(brokerStorage) + + // given + // initial event from a kyma resource, first runtime, no label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: ""}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + + // queue should be empty since we have not got state from CIS + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then the same subaccount, second runtime, with false label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId12, runtimeStateType{betaEnabled: "false"}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then we get event from CIS with false label + reconciler.reconcileCisEvent(fixCisUpdateEvent(subaccountId1, "false", oldTime)) + + // queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + + // then we get event from CIS with false label + reconciler.reconcileCisEvent(fixCisUpdateEvent(subaccountId1, "true", notSoOldTime)) + + // queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + element, ok := reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, subaccountId1, element.SubaccountID) + assert.Equal(t, "true", element.BetaEnabled) + + assert.True(t, reconciler.syncQueue.IsEmpty()) + }) + t.Run("should schedule update after two consecutive events in reversed order", func(t *testing.T) { + teardownTest, brokerStorage := setupTestNilStorage(t) + defer teardownTest(t) + + reconciler := createNewReconciler(brokerStorage) + + // given + // initial event from a kyma resource, first runtime, no label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: ""}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + + // queue should be empty since we have not got state from CIS + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then the same subaccount, second runtime, with false label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId12, runtimeStateType{betaEnabled: "false"}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then we get recent event from CIS with true label + reconciler.reconcileCisEvent(fixCisUpdateEvent(subaccountId1, "true", notSoOldTime)) + + // then we get older event from CIS with false label + reconciler.reconcileCisEvent(fixCisUpdateEvent(subaccountId1, "false", oldTime)) + + // queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + + // queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + element, ok := reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, subaccountId1, element.SubaccountID) + assert.Equal(t, "true", element.BetaEnabled) + + assert.True(t, reconciler.syncQueue.IsEmpty()) + }) + t.Run("should ignore outdated event after more recent one - before update", func(t *testing.T) { + teardownTest, brokerStorage := setupTestNilStorage(t) + defer teardownTest(t) + + reconciler := createNewReconciler(brokerStorage) + + // given + // initial event from a kyma resource, first runtime, no label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: ""}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + + // queue should be empty since we have not got state from CIS + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then we get recent event from CIS with true label + reconciler.reconcileCisEvent(fixCisUpdateEvent(subaccountId1, "true", notSoOldTime)) + + // queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + element, ok := reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, subaccountId1, element.SubaccountID) + assert.Equal(t, "true", element.BetaEnabled) + + // then we get older event from CIS with false label, we do not know if real update already happened + reconciler.reconcileCisEvent(fixCisUpdateEvent(subaccountId1, "false", oldTime)) + + // queue should be empty since we used more recent event to update the resource + assert.True(t, reconciler.syncQueue.IsEmpty()) + }) + t.Run("should ignore outdated event after more recent was applied", func(t *testing.T) { + teardownTest, brokerStorage := setupTestNilStorage(t) + defer teardownTest(t) + + reconciler := createNewReconciler(brokerStorage) + + // given + // initial event from a kyma resource, first runtime, no label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: ""}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + + // queue should be empty since we have not got state from CIS + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then we get recent event from CIS with true label + reconciler.reconcileCisEvent(fixCisUpdateEvent(subaccountId1, "true", notSoOldTime)) + + // queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + element, ok := reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, subaccountId1, element.SubaccountID) + assert.Equal(t, "true", element.BetaEnabled) + + // then we got confirmation that the update was applied + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: "true"}) + + // then we get older event from CIS with false label + reconciler.reconcileCisEvent(fixCisUpdateEvent(subaccountId1, "false", oldTime)) + + // queue should be empty since we used more recent event to update the resource + assert.True(t, reconciler.syncQueue.IsEmpty()) + }) + t.Run("should update resource after restart regardless of lost event", func(t *testing.T) { + teardownTest, brokerStorage := setupTestWithStorage(t) + defer teardownTest(t) + + reconciler := createNewReconciler(brokerStorage) + + // given + reconciler.recreateStateFromDB() + assert.True(t, reconciler.syncQueue.IsEmpty()) + assert.Equal(t, 0, len(reconciler.inMemoryState)) + + // when + // initial add event from a kyma resource, first runtime, no label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: ""}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + + // then we fetch state from accounts endpoint + reconciler.reconcileCisAccount(subaccountId1, CisStateType{BetaEnabled: false, UsedForProduction: "NOT_USED_FOR_PRODUCTION", ModifiedDate: veryOldTime}) + + // queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + + // then comes app restart + newReconciler := createNewReconciler(brokerStorage) + // we lost the event, lost the state, lost the queue + newReconciler.recreateStateFromDB() + // when + // initial add event from a kyma resource, first runtime, no label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: ""}) + reconciler.reconcileCisAccount(subaccountId1, CisStateType{BetaEnabled: false, UsedForProduction: "NOT_USED_FOR_PRODUCTION", ModifiedDate: veryOldTime}) + // queue should contain one element again + + assert.True(t, newReconciler.syncQueue.IsEmpty()) + }) + t.Run("should update resource after restart", func(t *testing.T) { + teardownTest, brokerStorage := setupTestWithStorage(t) + defer teardownTest(t) + + reconciler := createNewReconciler(brokerStorage) + + reconciler.recreateStateFromDB() + assert.True(t, reconciler.syncQueue.IsEmpty()) + assert.Equal(t, 0, len(reconciler.inMemoryState)) + + // when + // initial add event from a kyma resource, first runtime, no label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: ""}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + + // then we fetch state from accounts endpoint + reconciler.reconcileCisAccount(subaccountId1, CisStateType{BetaEnabled: false, UsedForProduction: "NOT_USED_FOR_PRODUCTION", ModifiedDate: veryOldTime}) + + // queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + + // then comes app restart + newReconciler := createNewReconciler(brokerStorage) + // we lost the state, lost the queue + newReconciler.recreateStateFromDB() + // when + // initial add event from the kyma resource, first runtime, no label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: ""}) + reconciler.reconcileCisAccount(subaccountId1, CisStateType{BetaEnabled: false, UsedForProduction: "NOT_USED_FOR_PRODUCTION", ModifiedDate: veryOldTime}) + // queue should contain one element again + assert.False(t, reconciler.syncQueue.IsEmpty()) + + element, ok := reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, subaccountId1, element.SubaccountID) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then we get update event from informer + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: "false"}) + + // then there is nothing to do + assert.True(t, newReconciler.syncQueue.IsEmpty()) + }) + t.Run("should update resource before restart", func(t *testing.T) { + teardownTest, brokerStorage := setupTestWithStorage(t) + defer teardownTest(t) + + reconciler := createNewReconciler(brokerStorage) + + // given + reconciler.recreateStateFromDB() + assert.True(t, reconciler.syncQueue.IsEmpty()) + assert.Equal(t, 0, len(reconciler.inMemoryState)) + + // when + // initial add event from a kyma resource, first runtime, no label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: ""}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + + // then we fetch state from accounts endpoint + reconciler.reconcileCisAccount(subaccountId1, CisStateType{BetaEnabled: false, UsedForProduction: "NOT_USED_FOR_PRODUCTION", ModifiedDate: veryOldTime}) + + // queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + + // then we get old event from CIS with true label + reconciler.reconcileCisEvent(fixCisUpdateEvent(subaccountId1, "true", oldTime)) + + // then queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + + // we extract the element + element, ok := reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, subaccountId1, element.SubaccountID) + assert.Equal(t, "true", element.BetaEnabled) + + // and update resource so update event comes from informer + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: "true"}) + + // then comes app restart + newReconciler := createNewReconciler(brokerStorage) + // we lost the event, lost the state, lost the queue + newReconciler.recreateStateFromDB() + // when + // initial add event from the kyma resource, first runtime, no label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: "true"}) + reconciler.reconcileCisAccount(subaccountId1, CisStateType{BetaEnabled: true, UsedForProduction: "NOT_USED_FOR_PRODUCTION", ModifiedDate: oldTime}) + // queue should be empty - no difference, resource is up-to-date + assert.True(t, reconciler.syncQueue.IsEmpty()) + }) + t.Run("should schedule resource update", func(t *testing.T) { + teardownTest, brokerStorage := setupTestWithStorage(t) + defer teardownTest(t) + + reconciler := createNewReconciler(brokerStorage) + + // given + // initial event from a kyma resource, first runtime, no label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: ""}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + + // queue should be empty since we have not got state from CIS + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then the same subaccount, second runtime, with false label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId12, runtimeStateType{betaEnabled: "false"}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then we get state from CIS (accounts) + reconciler.reconcileCisAccount(subaccountId1, CisStateType{BetaEnabled: false, UsedForProduction: "NOT_USED_FOR_PRODUCTION", ModifiedDate: 1}) + // queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + element, ok := reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, subaccountId1, element.SubaccountID) + assert.Equal(t, "false", element.BetaEnabled) + + assert.True(t, reconciler.syncQueue.IsEmpty()) + }) + t.Run("should schedule two resource updates in proper order", func(t *testing.T) { + // informer sends two events + // then we get accounts + // then we get event from CIS for the second subaccount + + teardownTest, brokerStorage := setupTestWithStorage(t) + defer teardownTest(t) + + reconciler := createNewReconciler(brokerStorage) + + // given + // initial event from a kyma resource, first runtime, no label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: ""}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + + // initial event from a second kyma resource, first runtime, no label + reconciler.reconcileResourceUpdate(subaccountId2, runtimeId21, runtimeStateType{betaEnabled: ""}) + assert.Equal(t, 2, len(reconciler.inMemoryState)) + + // queue should be empty since we have not got state from CIS + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then the same subaccount, second runtime, with false label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId12, runtimeStateType{betaEnabled: "false"}) + assert.Equal(t, 2, len(reconciler.inMemoryState)) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then we get state from CIS (accounts) + reconciler.reconcileCisAccount(subaccountId1, CisStateType{BetaEnabled: false, UsedForProduction: "NOT_USED_FOR_PRODUCTION", ModifiedDate: veryOldTime}) + + // queue should contain only one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + + // then we get event from CIS for the second subaccount + reconciler.reconcileCisAccount(subaccountId2, CisStateType{BetaEnabled: true, UsedForProduction: "NOT_SET", ModifiedDate: oldTime}) + + element1, ok := reconciler.syncQueue.Extract() + assert.True(t, ok) + + assert.Equal(t, subaccountId1, element1.SubaccountID) + assert.Equal(t, "false", element1.BetaEnabled) + + element2, ok := reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, subaccountId2, element2.SubaccountID) + assert.Equal(t, "true", element2.BetaEnabled) + + assert.True(t, reconciler.syncQueue.IsEmpty()) + }) + + t.Run("should recreate state from DB", func(t *testing.T) { + teardownTest, brokerStorage := setupTestWithStorage(t) + defer teardownTest(t) + + reconciler := createNewReconciler(brokerStorage) + + // when there are two subaccounts with current CIS states and one without CIS state + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: ""}) + reconciler.reconcileResourceUpdate(subaccountId2, runtimeId21, runtimeStateType{betaEnabled: ""}) + reconciler.reconcileResourceUpdate(subaccountId3, runtimeId31, runtimeStateType{betaEnabled: ""}) + reconciler.reconcileCisAccount(subaccountId1, CisStateType{BetaEnabled: false, UsedForProduction: "NOT_SET", ModifiedDate: 1}) + reconciler.reconcileCisAccount(subaccountId2, CisStateType{BetaEnabled: true, UsedForProduction: "NOT_SET", ModifiedDate: 2}) + reconciler.reconcileCisAccount(subaccountId3, CisStateType{BetaEnabled: true, UsedForProduction: "USED_FOR_PRODUCTION", ModifiedDate: 2}) + // then + + knownSubaccounts := reconciler.getAllSubaccountIDsFromState() + assert.Equal(t, 3, len(knownSubaccounts)) + assert.Contains(t, knownSubaccounts, subaccountIDType(subaccountId1)) + assert.Contains(t, knownSubaccounts, subaccountIDType(subaccountId2)) + assert.Contains(t, knownSubaccounts, subaccountIDType(subaccountId3)) + + // then + reconciler.storeStateInDb() + + reconcilerAfterReset := createNewReconciler(brokerStorage) + + reconcilerAfterReset.recreateStateFromDB() + knownSubaccounts = reconciler.getAllSubaccountIDsFromState() + + // then + assert.Equal(t, 3, len(knownSubaccounts)) + assert.Contains(t, knownSubaccounts, subaccountIDType(subaccountId1)) + assert.Contains(t, knownSubaccounts, subaccountIDType(subaccountId2)) + assert.Contains(t, knownSubaccounts, subaccountIDType(subaccountId3)) + assert.Equal(t, 3, len(reconciler.inMemoryState)) + }) + + t.Run("should recreate state from DB despite inconsistency between states and instances", func(t *testing.T) { + // three subaccounts in the instances table + // three subaccounts in the subaccount_states, but different set + + teardownTest, brokerStorage := setupTestWithStorage(t) + defer teardownTest(t) + + reconciler := createNewReconciler(brokerStorage) + + // given + fixInstancesTableWithFourInstancesAndThreeRuntimes(t, reconciler.db) + fixSubaccountStatesTable(t, reconciler.db, + internal.SubaccountState{ID: subaccountId1, BetaEnabled: "true", UsedForProduction: "NOT_SET", ModifiedAt: oldTime}, + internal.SubaccountState{ID: subaccountId2, BetaEnabled: "false", UsedForProduction: "NOT_SET", ModifiedAt: notSoOldTime}, + internal.SubaccountState{ID: subaccountId4, BetaEnabled: "true", UsedForProduction: "USED_FOR_PRODUCTION", ModifiedAt: recent}) + + // when + reconciler.recreateStateFromDB() + + // then + knownSubaccounts := reconciler.getAllSubaccountIDsFromState() + assert.Equal(t, 4, len(knownSubaccounts)) + assert.Contains(t, knownSubaccounts, subaccountIDType(subaccountId1)) + assert.Contains(t, knownSubaccounts, subaccountIDType(subaccountId2)) + assert.Contains(t, knownSubaccounts, subaccountIDType(subaccountId3)) + assert.Contains(t, knownSubaccounts, subaccountIDType(subaccountId4)) + assert.Equal(t, 4, len(reconciler.inMemoryState)) + + // and when + reconciler.storeStateInDb() + + // and app restarts + + reconcilerAfterReset := createNewReconciler(brokerStorage) + + reconcilerAfterReset.recreateStateFromDB() + + // then we remove state which is not in the instances table + knownSubaccounts = reconciler.getAllSubaccountIDsFromState() + assert.Equal(t, 3, len(knownSubaccounts)) + assert.Contains(t, knownSubaccounts, subaccountIDType(subaccountId1)) + assert.Contains(t, knownSubaccounts, subaccountIDType(subaccountId2)) + assert.Contains(t, knownSubaccounts, subaccountIDType(subaccountId3)) + assert.Equal(t, 3, len(reconciler.inMemoryState)) + }) + t.Run("should handle resource deletion and remove the state", func(t *testing.T) { + teardownTest, brokerStorage := setupTestWithStorage(t) + defer teardownTest(t) + + reconciler := createNewReconciler(brokerStorage) + + // given + // initial event from a kyma resource, first runtime, no label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: ""}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + + // queue should be empty since we have not got state from CIS + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // when the same subaccount, second runtime, with false label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId12, runtimeStateType{betaEnabled: "false"}) + // then + assert.Equal(t, 1, len(reconciler.inMemoryState)) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then we get deleted event from informer + reconciler.deleteRuntimeFromState(subaccountId1, runtimeId11) + + // then we should still keep the state + assert.Equal(t, 1, len(reconciler.inMemoryState)) + + // when we get the last runtime deleted + reconciler.deleteRuntimeFromState(subaccountId1, runtimeId12) + + // then we should still keep the state + assert.Equal(t, 1, len(reconciler.inMemoryState)) + + // then we store the state in the db, as side effect we remove state from the memory + reconciler.storeStateInDb() + + // then we should have no state in the memory + assert.Equal(t, 0, len(reconciler.inMemoryState)) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // then runtime is added again and we recreate the state + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: ""}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + }) + t.Run("should handle resource update and update the state", func(t *testing.T) { + teardownTest, brokerStorage := setupTestWithStorage(t) + defer teardownTest(t) + + reconciler := createNewReconciler(brokerStorage) + + // given + // initial event from a kyma resource, first runtime, no label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: ""}) + assert.Equal(t, 1, len(reconciler.inMemoryState)) + + // queue should be empty since we have not got state from CIS + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // when the same subaccount, first runtime, with false label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: "false"}) + // then + assert.Equal(t, 1, len(reconciler.inMemoryState)) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // and when the same subaccount, first runtime, with false label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: "true"}) + + // then + assert.Equal(t, 1, len(reconciler.inMemoryState)) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // but then comes account from CIS + reconciler.reconcileCisAccount(subaccountId1, CisStateType{BetaEnabled: false, UsedForProduction: "NOT_SET", ModifiedDate: veryOldTime}) + assert.False(t, reconciler.syncQueue.IsEmpty()) + + // then someone changes label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: "false"}) + + // but we have the update in the queue already even if it is futile (false to false) - this is current behavior + assert.False(t, reconciler.syncQueue.IsEmpty()) + }) + t.Run("should handle subaccount creation - resource already with proper label value", func(t *testing.T) { + // caveat - if the subaccount was neither listed in the instances nor in the previous state, we won't notice the event + // the other approach would be to watch all subaccounts coming in the events, possible but not feasible + teardownTest, brokerStorage := setupTestWithStorage(t) + defer teardownTest(t) + + reconciler := createNewReconciler(brokerStorage) + + // given + fixInstancesTableWithOneInstance(t, reconciler.db) + + // and given previous state empty + // when + reconciler.recreateStateFromDB() + + // then assuming someone added subaccountId3 to the instances table when app was down - hence we have not got it in the previous state + assert.Equal(t, 1, len(reconciler.inMemoryState)) + + // and the resource is created already but with desired label + reconciler.reconcileResourceUpdate(subaccountId1, runtimeId11, runtimeStateType{betaEnabled: "true"}) + + // then we query the accounts + reconciler.reconcileCisAccount(subaccountId3, CisStateType{BetaEnabled: true, UsedForProduction: "NOT_SET", ModifiedDate: veryOldTime}) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + reconciler.reconcileCisEvent(fixCisCreateEvent(subaccountId3, "true", veryOldTime)) + + // then + // queue should be empty + assert.True(t, reconciler.syncQueue.IsEmpty()) + }) + t.Run("should handle subaccount creation - resource without label", func(t *testing.T) { + teardownTest, brokerStorage := setupTestWithStorage(t) + defer teardownTest(t) + + reconciler := createNewReconciler(brokerStorage) + + // given + fixInstancesTableWithOneInstance(t, reconciler.db) + + // and given previous state empty + // when + reconciler.recreateStateFromDB() + + // then assuming someone added subaccountId3 to the instances table when app was down - hence we have not got it in the previous state + assert.Equal(t, 1, len(reconciler.inMemoryState)) + + // and the resource is created already, but with not desired label + reconciler.reconcileResourceUpdate(subaccountId3, runtimeId31, runtimeStateType{betaEnabled: ""}) + + // then we query the accounts + reconciler.reconcileCisAccount(subaccountId3, CisStateType{BetaEnabled: true, UsedForProduction: "NOT_SET", ModifiedDate: veryOldTime}) + assert.False(t, reconciler.syncQueue.IsEmpty()) + + // then we get the creation event + reconciler.reconcileCisEvent(fixCisCreateEvent(subaccountId3, "true", veryOldTime)) + + // then + // queue should contain one element + assert.False(t, reconciler.syncQueue.IsEmpty()) + element, ok := reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, subaccountId3, element.SubaccountID) + assert.Equal(t, "true", element.BetaEnabled) + }) + t.Run("should handle subaccount creation - no resource yet", func(t *testing.T) { + teardownTest, brokerStorage := setupTestWithStorage(t) + defer teardownTest(t) + + reconciler := createNewReconciler(brokerStorage) + + // given + fixInstancesTableWithOneInstance(t, reconciler.db) + + // and given previous state empty + // when + reconciler.recreateStateFromDB() + + // then assuming someone added subaccountId3 to the instances table when app was down - hence we have not got it in the previous state + assert.Equal(t, 1, len(reconciler.inMemoryState)) + + // and the resource is not there + // then we query the accounts + reconciler.reconcileCisAccount(subaccountId3, CisStateType{BetaEnabled: true, UsedForProduction: "NOT_SET", ModifiedDate: veryOldTime}) + assert.True(t, reconciler.syncQueue.IsEmpty()) + + reconciler.reconcileCisEvent(fixCisCreateEvent(subaccountId3, "true", veryOldTime)) + + // then + // queue should be empty since there is nothing to be updated + assert.True(t, reconciler.syncQueue.IsEmpty()) + + // but then the resource is created, but without label + reconciler.reconcileResourceUpdate(subaccountId3, runtimeId31, runtimeStateType{betaEnabled: ""}) + assert.False(t, reconciler.syncQueue.IsEmpty()) + + element, ok := reconciler.syncQueue.Extract() + assert.True(t, ok) + assert.Equal(t, subaccountId3, element.SubaccountID) + assert.Equal(t, "true", element.BetaEnabled) + }) + +} + +// test fixtures + +func createNewReconciler(storage storage.BrokerStorage) stateReconcilerType { + return stateReconcilerType{ + inMemoryState: make(inMemoryStateType), + mutex: sync.Mutex{}, + logger: logger, + syncQueue: queues.NewPriorityQueueWithCallbacks(logger, &queues.EventHandler{}), + db: storage, + } +} + +func createFakeRateLimitedCisClient(ctx context.Context, httpClient *http.Client, config CisEndpointConfig, log *slog.Logger) *RateLimitedCisClient { + + rl := rate.NewLimiter(rate.Every(config.RateLimitingInterval), config.MaxRequestsPerInterval) + + return &RateLimitedCisClient{ + ctx: ctx, + httpClient: httpClient, + config: config, + RateLimiter: rl, + log: log, + } +} + +func createNewReconcilerWithFakeCisServer(brokerStorage storage.BrokerStorage, client *http.Client, cisEndpointConfig CisEndpointConfig) stateReconcilerType { + rtlClient := createFakeRateLimitedCisClient(context.Background(), client, cisEndpointConfig, logger) + var epochInStubs = int64(1710748500000) + return stateReconcilerType{ + inMemoryState: make(inMemoryStateType), + mutex: sync.Mutex{}, + logger: logger, + syncQueue: queues.NewPriorityQueueWithCallbacks(logger, &queues.EventHandler{}), + db: brokerStorage, + accountsClient: rtlClient, + eventsClient: rtlClient, + eventWindow: NewEventWindow(60*1000, func() int64 { + return epochInStubs + }), + } +} + +func fixInstancesTableWithFourInstancesAndThreeRuntimes(t *testing.T, brokerStorage storage.BrokerStorage) { + require.NoError(t, brokerStorage.Instances().Insert(fixInstance("1", subaccountId1, runtimeId11))) + require.NoError(t, brokerStorage.Instances().Insert(fixInstance("2", subaccountId1, runtimeId12))) + require.NoError(t, brokerStorage.Instances().Insert(fixInstance("3", subaccountId2, runtimeId21))) + require.NoError(t, brokerStorage.Instances().Insert(fixInstance("4", subaccountId3, runtimeId31))) +} + +func fixInstancesTableWithOneInstance(t *testing.T, brokerStorage storage.BrokerStorage) { + require.NoError(t, brokerStorage.Instances().Insert(fixInstance("1", subaccountId3, runtimeId31))) +} + +func fixSubaccountStatesTable(t *testing.T, brokerStorage storage.BrokerStorage, subaccountStates ...internal.SubaccountState) { + + for _, state := range subaccountStates { + require.NoError(t, brokerStorage.SubaccountStates().UpsertState(state)) + } +} + +func fixInstance(id string, subaccountID string, runtimeID string) internal.Instance { + return internal.Instance{ + InstanceID: id, + RuntimeID: runtimeID, + GlobalAccountID: fmt.Sprintf("GlobalAccountID field for SubAccountID: %s", subaccountID), + SubAccountID: subaccountID, + ServiceID: fmt.Sprintf("ServiceID field. IDX: %s", id), + ServiceName: fmt.Sprintf("ServiceName field. IDX: %s", id), + ServicePlanID: fmt.Sprintf("ServicePlanID field. IDX: %s", id), + ServicePlanName: fmt.Sprintf("ServicePlanName field. IDX: %s", id), + Parameters: internal.ProvisioningParameters{ + PlatformRegion: fmt.Sprintf("region-value-%s", id), + }, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } +} + +func fixCisUpdateEvent(subaccountID string, betaEnabled string, actionTime int64) Event { + return fixCisEvent(subaccountID, "Subaccount_Update", betaEnabled, actionTime) +} + +func fixCisCreateEvent(subaccountID string, betaEnabled string, actionTime int64) Event { + return fixCisEvent(subaccountID, "Subaccount_Create", betaEnabled, actionTime) +} + +func fixCisEvent(subaccountID string, eventType string, betaEnabled string, actionTime int64) Event { + return Event{ + ActionTime: actionTime, + SubaccountID: subaccountID, + Type: eventType, + Details: EventDetails{ + BetaEnabled: betaEnabled == "true", + }, + } +} + +func brokerStorageTestConfig() storage.Config { + return storage.Config{ + Host: "localhost", + User: "test", + Password: "test", + Port: "5432", + Name: "test-sync", + SSLMode: "disable", + SecretKey: "################################", + MaxOpenConns: 1, + MaxIdleConns: 1, + ConnMaxLifetime: time.Minute, + } +} + +func getStorageForTests() (func() error, storage.BrokerStorage, error) { + if useInMemoryStorage { + return nil, storage.NewMemoryStorage(), nil + } + return storage.GetStorageForTest(brokerStorageTestConfig()) +} + +func setupStorageContainer() func() { + config := brokerStorageTestConfig() + + docker, err := internal.NewDockerHandler() + if err != nil { + logger.Error(fmt.Sprintf("Error creating docker handler: %s, exiting...", err)) + os.Exit(1) + } + defer func(docker *internal.DockerHelper) { + err := docker.CloseDockerClient() + if err != nil { + logger.Error(fmt.Sprintf("Error creating docker client: %s", err)) + os.Exit(1) + } + }(docker) + + cleanupContainer, err := docker.CreateDBContainer(internal.ContainerCreateRequest{ + Port: config.Port, + User: config.User, + Password: config.Password, + Name: config.Name, + Host: config.Host, + ContainerName: "subaccount-sync-tests", + Image: "postgres:11", + }) + return func() { + if cleanupContainer != nil { + err := cleanupContainer() + if err != nil { + logger.Error(fmt.Sprintf("Error cleaning container: %s", err)) + os.Exit(1) + } + } + } +} diff --git a/internal/subaccountsync/subaccount_sync_service.go b/internal/subaccountsync/subaccount_sync_service.go new file mode 100644 index 0000000000..cf3fcc21d1 --- /dev/null +++ b/internal/subaccountsync/subaccount_sync_service.go @@ -0,0 +1,184 @@ +package subaccountsync + +import ( + "context" + "fmt" + "log/slog" + "net/http" + "sync" + "time" + + "github.com/kyma-project/kyma-environment-broker/internal/kymacustomresource" + "github.com/kyma-project/kyma-environment-broker/internal/storage" + queues "github.com/kyma-project/kyma-environment-broker/internal/syncqueues" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + subaccountIDLabel = "kyma-project.io/subaccount-id" + runtimeIDLabel = "kyma-project.io/runtime-id" + betaEnabledLabel = "operator.kyma-project.io/beta" + eventServicePath = "%s/events/v1/events/central" + subaccountServicePath = "%s/accounts/v1/technical/subaccounts/%s" + eventType = "Subaccount_Creation,Subaccount_Update" +) + +type ( + subaccountIDType string + runtimeIDType string + runtimeStateType struct { + betaEnabled string + } + subaccountRuntimesType map[runtimeIDType]runtimeStateType + statesFromCisType map[subaccountIDType]CisStateType + subaccountsSetType map[subaccountIDType]struct{} + subaccountStateType struct { + cisState CisStateType + resourcesState subaccountRuntimesType + pendingDelete bool + } + inMemoryStateType map[subaccountIDType]subaccountStateType + stateReconcilerType struct { + inMemoryState inMemoryStateType + mutex sync.Mutex + eventsClient *RateLimitedCisClient + accountsClient *RateLimitedCisClient + kcpK8sClient *client.Client + dynamicK8sClient *dynamic.Interface + db storage.BrokerStorage + syncQueue queues.MultiConsumerPriorityQueue + logger *slog.Logger + updater *kymacustomresource.Updater + metrics *Metrics + eventWindow *EventWindow + } +) + +type SyncService struct { + appName string + ctx context.Context + cfg Config + kymaGVR schema.GroupVersionResource + db storage.BrokerStorage + k8sClient dynamic.Interface +} + +func NewSyncService(appName string, ctx context.Context, cfg Config, kymaGVR schema.GroupVersionResource, db storage.BrokerStorage, dynamicClient dynamic.Interface) *SyncService { + return &SyncService{ + appName: appName, + ctx: ctx, + cfg: cfg, + kymaGVR: kymaGVR, + db: db, + k8sClient: dynamicClient, + } +} + +func (s *SyncService) Run() { + logger := slog.Default() + logger.Info(fmt.Sprintf("%s service started", s.appName)) + + // create CIS clients + eventsClient := CreateEventsClient(s.ctx, s.cfg.CisEvents, logger) + accountsClient := CreateAccountsClient(s.ctx, s.cfg.CisAccounts, logger) + + // create and register metrics + metricsRegistry := prometheus.NewRegistry() + metrics := NewMetrics(metricsRegistry, s.appName) + promHandler := promhttp.HandlerFor(metricsRegistry, promhttp.HandlerOpts{}) + http.Handle("/metrics", promHandler) + + go func() { + err := http.ListenAndServe(s.cfg.MetricsPort, nil) + if err != nil { + logger.Error(fmt.Sprintf("while serving metrics: %s", err)) + } + }() + + // create priority queue + priorityQueue := queues.NewPriorityQueueWithCallbacks(logger, &queues.EventHandler{ + OnInsert: func(queueSize int) { + metrics.queue.Set(float64(queueSize)) + metrics.queueOps.With(prometheus.Labels{"operation": "insert"}).Inc() + }, + OnExtract: func(queueSize int, timeEnqueued int64) { + metrics.queue.Set(float64(queueSize)) + metrics.queueOps.With(prometheus.Labels{"operation": "extract"}).Inc() + }, + }) + + // create updater if needed + var updater *kymacustomresource.Updater + var err error + if s.cfg.UpdateResources { + updater, err = kymacustomresource.NewUpdater(s.k8sClient, priorityQueue, s.kymaGVR, s.cfg.SyncQueueSleepInterval, betaEnabledLabel) + fatalOnError(err) + } + + // create state reconciler + stateReconciler := stateReconcilerType{ + inMemoryState: make(inMemoryStateType), + mutex: sync.Mutex{}, + eventsClient: eventsClient, + accountsClient: accountsClient, + dynamicK8sClient: &s.k8sClient, + logger: logger.With("component", "state-reconciler"), + db: s.db, + updater: updater, + syncQueue: priorityQueue, + metrics: metrics, + eventWindow: NewEventWindow(s.cfg.EventsWindowSize.Milliseconds(), epochInMillis), + } + + stateReconciler.recreateStateFromDB() + + factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(s.k8sClient, time.Minute, "kcp-system", nil) + informer := factory.ForResource(s.kymaGVR).Informer() + + configureInformer(&informer, &stateReconciler, logger.With("component", "informer"), metrics) + + go stateReconciler.runCronJobs(s.cfg, s.ctx) + + if s.cfg.UpdateResources && stateReconciler.updater != nil { + go func() { + err := stateReconciler.updater.Run() + if err != nil { + logger.Warn(fmt.Sprintf("while running updater: %s, cannot update", err)) + } + }() + } else { + logger.Info("Resource update is disabled") + } + + informer.Run(s.ctx.Done()) +} + +func CreateAccountsClient(ctx context.Context, accountsConfig CisEndpointConfig, logger *slog.Logger) *RateLimitedCisClient { + accountsClient := NewRateLimitedCisClient(ctx, accountsConfig, logger.With("component", "CIS-Accounts-client")) + return accountsClient +} + +func CreateEventsClient(ctx context.Context, eventsConfig CisEndpointConfig, logger *slog.Logger) *RateLimitedCisClient { + eventsClient := NewRateLimitedCisClient(ctx, eventsConfig, logger.With("component", "CIS-Events-client")) + return eventsClient +} + +func getDataFromLabels(u *unstructured.Unstructured) (subaccountID string, runtimeID string, betaEnabled string) { + labels := u.GetLabels() + subaccountID = labels[subaccountIDLabel] + runtimeID = labels[runtimeIDLabel] + betaEnabled = labels[betaEnabledLabel] + return +} + +func fatalOnError(err error) { + if err != nil { + panic(err) + } +} diff --git a/internal/subaccountsync/testdata/events.json b/internal/subaccountsync/testdata/events.json new file mode 100644 index 0000000000..ae6f546152 --- /dev/null +++ b/internal/subaccountsync/testdata/events.json @@ -0,0 +1,146 @@ +[ + { + "id": 6, + "actionTime": 1710770600000, + "creationTime": 1710770900000, + "details": { + "description": "Subaccount updated.", + "guid": "17b8dcc2-3de1-4884-bcd3-b1c4657d81be", + "technicalName": "17b8dcc2-3de1-4884-bcd3-b1c4657d81be", + "parentGuid": "54130280-9eef-4fab-94b6-be2bea4fa1d0", + "displayName": "subaccount-2", + "subaccountDescription": "subaccount-2 for testing", + "region": "eu10-canary", + "jobLocation": null, + "subdomain": "subaccount-2-subdomain", + "betaEnabled": true, + "usedForProduction": "NOT_USED_FOR_PRODUCTION", + "labels": {} + }, + "globalAccountGUID": "54130280-9eef-4fab-94b6-be2bea4fa1d0", + "entityId": "17b8dcc2-3de1-4884-bcd3-b1c4657d81be", + "entityType": "Subaccount", + "eventOrigin": "accounts-service", + "eventType": "Subaccount_Update" + }, + { + "id": 5, + "actionTime": 1710761400000, + "creationTime": 1710761700000, + "details": { + "description": "Subaccount updated.", + "guid": "cad2806a-3545-4aa0-8a7c-4fc246dba684", + "technicalName": "cad2806a-3545-4aa0-8a7c-4fc246dba684", + "parentGuid": "bbecf441-f9bd-4d09-8a3d-877a7519b949", + "displayName": "subaccount-1", + "subaccountDescription": "subaccount-1 for testing", + "region": "eu10-canary", + "jobLocation": null, + "subdomain": "subaccount-1-subdomain", + "betaEnabled": true, + "usedForProduction": "UNSET", + "labels": {} + }, + "globalAccountGUID": "bbecf441-f9bd-4d09-8a3d-877a7519b949", + "entityId": "cad2806a-3545-4aa0-8a7c-4fc246dba684", + "entityType": "Subaccount", + "eventOrigin": "accounts-service", + "eventType": "Subaccount_Update" + }, + { + "id": 4, + "actionTime": 1710760200000, + "creationTime": 1710760500000, + "details": { + "description": "Subaccount updated.", + "guid": "17b8dcc2-3de1-4884-bcd3-b1c4657d81be", + "technicalName": "17b8dcc2-3de1-4884-bcd3-b1c4657d81be", + "parentGuid": "54130280-9eef-4fab-94b6-be2bea4fa1d0", + "displayName": "subaccount-2", + "subaccountDescription": "subaccount-2 for testing", + "region": "eu10-canary", + "jobLocation": null, + "subdomain": "subaccount-2-subdomain", + "betaEnabled": true, + "usedForProduction": "USED_FOR_PRODUCTION", + "labels": {} + }, + "globalAccountGUID": "54130280-9eef-4fab-94b6-be2bea4fa1d0", + "entityId": "17b8dcc2-3de1-4884-bcd3-b1c4657d81be", + "entityType": "Subaccount", + "eventOrigin": "accounts-service", + "eventType": "Subaccount_Update" + }, + { + "id": 3, + "actionTime": 1710759300000, + "creationTime": 1710759600000, + "details": { + "description": "Subaccount created.", + "guid": "17b8dcc2-3de1-4884-bcd3-b1c4657d81be", + "technicalName": "17b8dcc2-3de1-4884-bcd3-b1c4657d81be", + "parentGuid": "54130280-9eef-4fab-94b6-be2bea4fa1d0", + "displayName": "subaccount-2", + "subaccountDescription": "subaccount-2 for testing", + "region": "eu10-canary", + "jobLocation": null, + "subdomain": "subaccount-2-subdomain", + "betaEnabled": false, + "usedForProduction": "USED_FOR_PRODUCTION", + "labels": {} + }, + "globalAccountGUID": "54130280-9eef-4fab-94b6-be2bea4fa1d0", + "entityId": "17b8dcc2-3de1-4884-bcd3-b1c4657d81be", + "entityType": "Subaccount", + "eventOrigin": "accounts-service", + "eventType": "Subaccount_Creation" + }, + { + "id": 2, + "actionTime": 1710749400000, + "creationTime": 1710749700000, + "details": { + "description": "Subaccount updated.", + "guid": "cad2806a-3545-4aa0-8a7c-4fc246dba684", + "technicalName": "cad2806a-3545-4aa0-8a7c-4fc246dba684", + "parentGuid": "bbecf441-f9bd-4d09-8a3d-877a7519b949", + "displayName": "subaccount-1", + "subaccountDescription": "subaccount-1 for testing", + "region": "eu10-canary", + "jobLocation": null, + "subdomain": "subaccount-1-subdomain", + "betaEnabled": false, + "usedForProduction": null, + "labels": {} + }, + "globalAccountGUID": "bbecf441-f9bd-4d09-8a3d-877a7519b949", + "entityId": "cad2806a-3545-4aa0-8a7c-4fc246dba684", + "entityType": "Subaccount", + "eventOrigin": "accounts-service", + "eventType": "Subaccount_Update" + }, + { + "id": 1, + "actionTime": 1710748500000, + "creationTime": 1710748800000, + "details": { + "description": "Subaccount created.", + "guid": "cad2806a-3545-4aa0-8a7c-4fc246dba684", + "technicalName": "cad2806a-3545-4aa0-8a7c-4fc246dba684", + "parentGuid": "bbecf441-f9bd-4d09-8a3d-877a7519b949", + "displayName": "subaccount-1", + "subaccountDescription": "subaccount-1 for testing", + "region": "eu10-canary", + "jobLocation": null, + "subdomain": "subaccount-1-subdomain", + "betaEnabled": false, + "usedForProduction": "USED_FOR_PRODUCTION", + "labels": {} + }, + "globalAccountGUID": "bbecf441-f9bd-4d09-8a3d-877a7519b949", + "entityId": "cad2806a-3545-4aa0-8a7c-4fc246dba684", + "entityType": "Subaccount", + "eventOrigin": "accounts-service", + "eventType": "Subaccount_Creation" + } +] \ No newline at end of file diff --git a/internal/subaccountsync/testdata/subaccounts.json b/internal/subaccountsync/testdata/subaccounts.json new file mode 100644 index 0000000000..e1880cd227 --- /dev/null +++ b/internal/subaccountsync/testdata/subaccounts.json @@ -0,0 +1,44 @@ +[ + { + "guid": "cad2806a-3545-4aa0-8a7c-4fc246dba684", + "technicalName": "cad2806a-3545-4aa0-8a7c-4fc246dba684", + "displayName": "subaccount-1", + "globalAccountGUID": "bbecf441-f9bd-4d09-8a3d-877a7519b949", + "parentGUID": "bbecf441-f9bd-4d09-8a3d-877a7519b949", + "parentType": "ROOT", + "closestEntitlementManagedParentGUID": "bbecf441-f9bd-4d09-8a3d-877a7519b949", + "region": "eu10-canary", + "subdomain": "subaccount-1-abcd", + "betaEnabled": false, + "usedForProduction": "NOT_USED_FOR_PRODUCTION", + "description": null, + "state": "OK", + "stateMessage": "Subaccount created.", + "contentAutomationState": null, + "contentAutomationStateDetails": null, + "createdDate": 1710748800000, + "createdBy": "test.user@fake.com", + "modifiedDate": 1710749700000 + }, + { + "guid": "17b8dcc2-3de1-4884-bcd3-b1c4657d81be", + "technicalName": "17b8dcc2-3de1-4884-bcd3-b1c4657d81be", + "displayName": "subaccount-2", + "globalAccountGUID": "54130280-9eef-4fab-94b6-be2bea4fa1d0", + "parentGUID": "54130280-9eef-4fab-94b6-be2bea4fa1d0", + "parentType": "ROOT", + "closestEntitlementManagedParentGUID": "54130280-9eef-4fab-94b6-be2bea4fa1d0", + "region": "eu10-canary", + "subdomain": "subaccount-2-efgh", + "betaEnabled": true, + "usedForProduction": "USED_FOR_PRODUCTION", + "description": null, + "state": "OK", + "stateMessage": "Subaccount created.", + "contentAutomationState": null, + "contentAutomationStateDetails": null, + "createdDate": 1710759600000, + "createdBy": "test.user@fake.com", + "modifiedDate": 1710760500000 + } +] \ No newline at end of file diff --git a/resources/keb/Chart.yaml b/resources/keb/Chart.yaml index a6f9b86043..96896ebb23 100644 --- a/resources/keb/Chart.yaml +++ b/resources/keb/Chart.yaml @@ -1,7 +1,7 @@ apiVersion: v2 -appVersion: "1.3.7" +appVersion: "1.4.1" name: keb description: description: Kyma Environment Broker Helm chart for Kubernetes -version: 1.3.7 +version: 1.4.1 type: application diff --git a/resources/keb/templates/subaccount-sync-deployment.yaml b/resources/keb/templates/subaccount-sync-deployment.yaml new file mode 100644 index 0000000000..fe4880774e --- /dev/null +++ b/resources/keb/templates/subaccount-sync-deployment.yaml @@ -0,0 +1,167 @@ +{{ if .Values.subaccountSync.enabled }} +apiVersion: apps/v1 +kind: Deployment +metadata: + name: subaccount-sync + namespace: kcp-system + labels: + app: subaccount-sync + annotations: + argocd.argoproj.io/sync-options: Prune=false +spec: + replicas: 1 + selector: + matchLabels: + app: subaccount-sync + template: + metadata: + labels: + app: subaccount-sync + spec: + serviceAccountName: {{ .Values.global.kyma_environment_broker.serviceAccountName }} + {{- with .Values.deployment.securityContext }} + securityContext: + {{ toYaml . | nindent 12 }} + {{- end }} + containers: + - name: subaccount-sync + image: "{{ .Values.global.images.container_registry.path }}/{{ .Values.global.images.kyma_environment_broker.dir }}kyma-environment-subaccount-sync:{{ .Values.global.images.kyma_environment_subaccount_sync.version }}" + imagePullPolicy: Always + ports: + - containerPort: 80 + env: + - name: SUBACCOUNT_SYNC_KYMA_VERSION + value: {{ .Values.kymaVersion | required "please specify Values.kymaVersion" | quote}} + - name: SUBACCOUNT_SYNC_METRICS_PORT + value: {{ .Values.subaccountSync.metricsPort | quote }} + - name: SUBACCOUNT_SYNC_LOG_LEVEL + value: {{ .Values.subaccountSync.logLevel | quote }} + - name: SUBACCOUNT_SYNC_UPDATE_RESOURCES + value: {{ .Values.subaccountSync.updateResources | quote }} + - name: SUBACCOUNT_SYNC_ACCOUNTS_SYNC_INTERVAL + value: {{ .Values.subaccountSync.accountSyncInterval | quote }} + - name: SUBACCOUNT_SYNC_STORAGE_SYNC_INTERVAL + value: {{ .Values.subaccountSync.storageSyncInterval | quote }} + - name: SUBACCOUNT_SYNC_EVENTS_WINDOW_SIZE + value: {{ .Values.subaccountSync.eventsWindowSize | quote }} + - name: SUBACCOUNT_SYNC_EVENTS_WINDOW_INTERVAL + value: {{ .Values.subaccountSync.eventsWindowInterval | quote }} + - name: SUBACCOUNT_SYNC_QUEUE_SLEEP_INTERVAL + value: {{ .Values.subaccountSync.queueSleepInterval | quote }} + - name: SUBACCOUNT_SYNC_CIS_EVENTS_CLIENT_ID + valueFrom: + secretKeyRef: + name: {{ .Values.cis.v2.secretName | required "please specify .Values.cis.v2.secretName" | quote }} + key: id + - name: SUBACCOUNT_SYNC_CIS_EVENTS_CLIENT_SECRET + valueFrom: + secretKeyRef: + name: {{ .Values.cis.v2.secretName | required "please specify .Values.cis.v2.secretName" | quote }} + key: secret + - name: SUBACCOUNT_SYNC_CIS_EVENTS_AUTH_URL + value: {{ .Values.cis.accounts.authURL | required "please specify .Values.cis.accounts.authURL" | quote }} + - name: SUBACCOUNT_SYNC_CIS_EVENTS_SERVICE_URL + value: {{ .Values.cis.accounts.serviceURL | required "please specify .Values.cis.accounts.serviceURL" | quote }} + - name: SUBACCOUNT_SYNC_CIS_EVENTS_RATE_LIMITING_INTERVAL + value: {{ .Values.subaccountSync.cisRateLimits.events.rateLimitingInterval | quote }} + - name: SUBACCOUNT_SYNC_CIS_EVENTS_MAX_REQUESTS_PER_INTERVAL + value: {{ .Values.subaccountSync.cisRateLimits.events.maxRequestsPerInterval | quote }} + - name: SUBACCOUNT_SYNC_CIS_ACCOUNTS_CLIENT_ID + valueFrom: + secretKeyRef: + name: {{ .Values.cis.accounts.secretName | required "please specify .Values.cis.accounts.secretName" | quote }} + key: id + - name: SUBACCOUNT_SYNC_CIS_ACCOUNTS_CLIENT_SECRET + valueFrom: + secretKeyRef: + name: {{ .Values.cis.accounts.secretName | required "please specify .Values.cis.accounts.secretName" | quote }} + key: secret + - name: SUBACCOUNT_SYNC_CIS_ACCOUNTS_AUTH_URL + value: {{ .Values.cis.accounts.authURL | required "please specify .Values.cis.accounts.authURL" | quote }} + - name: SUBACCOUNT_SYNC_CIS_ACCOUNTS_SERVICE_URL + value: {{ .Values.cis.accounts.serviceURL | required "please specify .Values.cis.accounts.serviceURL" | quote }} + - name: SUBACCOUNT_SYNC_CIS_ACCOUNTS_RATE_LIMITING_INTERVAL + value: {{ .Values.subaccountSync.cisRateLimits.accounts.rateLimitingInterval | quote }} + - name: SUBACCOUNT_SYNC_CIS_ACCOUNTS_MAX_REQUESTS_PER_INTERVAL + value: {{ .Values.subaccountSync.cisRateLimits.accounts.maxRequestsPerInterval | quote }} + - name: SUBACCOUNT_SYNC_DATABASE_SECRET_KEY + valueFrom: + secretKeyRef: + name: kcp-storage-client-secret + key: secretKey + optional: true + - name: SUBACCOUNT_SYNC_DATABASE_USER + valueFrom: + secretKeyRef: + name: kcp-postgresql + key: postgresql-broker-username + - name: SUBACCOUNT_SYNC_DATABASE_PASSWORD + valueFrom: + secretKeyRef: + name: kcp-postgresql + key: postgresql-broker-password + - name: SUBACCOUNT_SYNC_DATABASE_HOST + valueFrom: + secretKeyRef: + name: kcp-postgresql + key: postgresql-serviceName + - name: SUBACCOUNT_SYNC_DATABASE_PORT + valueFrom: + secretKeyRef: + name: kcp-postgresql + key: postgresql-servicePort + - name: SUBACCOUNT_SYNC_DATABASE_NAME + valueFrom: + secretKeyRef: + name: kcp-postgresql + key: postgresql-broker-db-name + - name: SUBACCOUNT_SYNC_DATABASE_SSLMODE + valueFrom: + secretKeyRef: + name: kcp-postgresql + key: postgresql-sslMode + - name: SUBACCOUNT_SYNC_DATABASE_SSLROOTCERT + value: /secrets/cloudsql-sslrootcert/server-ca.pem + {{- if and (eq .Values.global.database.embedded.enabled false) (eq .Values.global.database.cloudsqlproxy.enabled false)}} + volumeMounts: + - name: cloudsql-sslrootcert + mountPath: /secrets/cloudsql-sslrootcert + readOnly: true + {{- end}} + {{- if and (eq .Values.global.database.embedded.enabled false) (eq .Values.global.database.cloudsqlproxy.enabled true)}} + - name: cloudsql-proxy + image: {{ .Values.global.images.cloudsql_proxy_image }} + {{- if .Values.global.database.cloudsqlproxy.workloadIdentity.enabled }} + command: ["/cloud_sql_proxy", + "-instances={{ .Values.global.database.managedGCP.instanceConnectionName }}=tcp:5432"] + {{- else }} + command: ["/cloud_sql_proxy", + "-instances={{ .Values.global.database.managedGCP.instanceConnectionName }}=tcp:5432", + "-credential_file=/secrets/cloudsql-instance-credentials/credentials.json"] + volumeMounts: + - name: cloudsql-instance-credentials + mountPath: /secrets/cloudsql-instance-credentials + readOnly: true + {{- end }} + {{- with .Values.deployment.securityContext }} + securityContext: + {{ toYaml . | nindent 16 }} + {{- end }} + {{- end}} + {{- if and (eq .Values.global.database.embedded.enabled false) (eq .Values.global.database.cloudsqlproxy.enabled true) (eq .Values.global.database.cloudsqlproxy.workloadIdentity.enabled false)}} + volumes: + - name: cloudsql-instance-credentials + secret: + secretName: cloudsql-instance-credentials + {{- end}} + {{- if and (eq .Values.global.database.embedded.enabled false) (eq .Values.global.database.cloudsqlproxy.enabled false)}} + volumes: + - name: cloudsql-sslrootcert + secret: + secretName: kcp-postgresql + items: + - key: postgresql-sslRootCert + path: server-ca.pem + optional: true + {{- end}} +{{ end }} \ No newline at end of file diff --git a/resources/keb/values.yaml b/resources/keb/values.yaml index 8ff2ada9f8..36b0a9c576 100644 --- a/resources/keb/values.yaml +++ b/resources/keb/values.yaml @@ -11,25 +11,25 @@ global: version: "v20240307-f250dc83" # do not update along with the other images kyma_environment_broker: dir: - version: "1.3.7" + version: "1.4.1" kyma_environments_subaccount_cleanup_job: dir: - version: "1.3.7" + version: "1.4.1" kyma_environment_trial_cleanup_job: dir: - version: "1.3.7" + version: "1.4.1" kyma_environment_expirator_job: dir: - version: "1.3.7" + version: "1.4.1" kyma_environment_deprovision_retrigger_job: dir: - version: "1.3.7" + version: "1.4.1" kyma_environment_runtime_reconciler: dir: - version: "1.3.7" + version: "1.4.1" kyma_environment_subaccount_sync: dir: - version: "1.3.7" + version: "1.4.1" kyma_environment_broker: enabled: false serviceAccountName: "kcp-kyma-environment-broker" @@ -385,21 +385,22 @@ deprovisionRetrigger: dryRun: true subaccountSync: - updateResources: "false" - accountSyncInterval: "24h" - storageSyncInterval: "5m" - eventsWindowSize: "20m" - eventsWindowInterval: "15m" - queueSleepInterval: "30s" - metricsPort: "8081" - logLevel: "info" + enabled: true + updateResources: false + accountSyncInterval: 24h + storageSyncInterval: 5m + eventsWindowSize: 20m + eventsWindowInterval: 15m + queueSleepInterval: 30s + metricsPort: 8081 + logLevel: info cisRateLimits: events: - rateLimitingInterval: "2s" - maxRequestsPerInterval: "5" + rateLimitingInterval: 2s + maxRequestsPerInterval: 5 accounts: - rateLimitingInterval: "2s" - maxRequestsPerInterval: "5" + rateLimitingInterval: 2s + maxRequestsPerInterval: 5 serviceMonitor: enabled: true diff --git a/sec-scanners-config.yaml b/sec-scanners-config.yaml index 97080a2721..673b2ca1db 100644 --- a/sec-scanners-config.yaml +++ b/sec-scanners-config.yaml @@ -1,15 +1,14 @@ module-name: kyma-environment-broker -rc-tag: 1.3.7 +rc-tag: 1.4.1 protecode: - - europe-docker.pkg.dev/kyma-project/prod/kyma-environment-broker:1.3.7 - - europe-docker.pkg.dev/kyma-project/prod/kyma-environment-deprovision-retrigger-job:1.3.7 - - europe-docker.pkg.dev/kyma-project/prod/kyma-environments-cleanup-job:1.3.7 - - europe-docker.pkg.dev/kyma-project/prod/kyma-environment-runtime-reconciler:1.3.7 - - europe-docker.pkg.dev/kyma-project/prod/kyma-environment-trial-cleanup-job:1.3.7 - - europe-docker.pkg.dev/kyma-project/prod/kyma-environment-archiver-job:1.3.7 - - europe-docker.pkg.dev/kyma-project/prod/kyma-environment-expirator-job:1.3.7 - - europe-docker.pkg.dev/kyma-project/prod/kyma-environment-subaccount-cleanup-job:1.3.7 - - europe-docker.pkg.dev/kyma-project/prod/kyma-environment-subaccount-sync:1.3.7 + - europe-docker.pkg.dev/kyma-project/prod/kyma-environment-broker:1.4.1 + - europe-docker.pkg.dev/kyma-project/prod/kyma-environment-deprovision-retrigger-job:1.4.1 + - europe-docker.pkg.dev/kyma-project/prod/kyma-environments-cleanup-job:1.4.1 + - europe-docker.pkg.dev/kyma-project/prod/kyma-environment-runtime-reconciler:1.4.1 + - europe-docker.pkg.dev/kyma-project/prod/kyma-environment-archiver-job:1.4.1 + - europe-docker.pkg.dev/kyma-project/prod/kyma-environment-expirator-job:1.4.1 + - europe-docker.pkg.dev/kyma-project/prod/kyma-environment-subaccount-cleanup-job:1.4.1 + - europe-docker.pkg.dev/kyma-project/prod/kyma-environment-subaccount-sync:1.4.1 whitesource: language: golang-mod subprojects: false diff --git a/utils/archiver/kyma-environment-broker-archiver.yaml b/utils/archiver/kyma-environment-broker-archiver.yaml index 3535846d4d..13d4909c14 100644 --- a/utils/archiver/kyma-environment-broker-archiver.yaml +++ b/utils/archiver/kyma-environment-broker-archiver.yaml @@ -74,4 +74,4 @@ spec: template: spec: containers: - - image: europe-docker.pkg.dev/kyma-project/prod/kyma-environment-archiver-job:1.3.7 + - image: europe-docker.pkg.dev/kyma-project/prod/kyma-environment-archiver-job:1.4.1 diff --git a/utils/kyma-environments-cleanup-job/kyma-environments-cleanup-job.yaml b/utils/kyma-environments-cleanup-job/kyma-environments-cleanup-job.yaml index 991470a7d7..5649e78922 100644 --- a/utils/kyma-environments-cleanup-job/kyma-environments-cleanup-job.yaml +++ b/utils/kyma-environments-cleanup-job/kyma-environments-cleanup-job.yaml @@ -28,7 +28,7 @@ spec: containers: - name: kyma-environments-cleanup command: ["/bin/main"] - image: europe-docker.pkg.dev/kyma-project/prod/kyma-environments-cleanup-job:1.3.7 + image: europe-docker.pkg.dev/kyma-project/prod/kyma-environments-cleanup-job:1.4.1 imagePullPolicy: IfNotPresent env: - name: DATABASE_EMBEDDED