Skip to content

Commit

Permalink
Merge branch 'main' into enhance-regions-azure
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrmiskiewicz authored Apr 15, 2024
2 parents 5caf69a + 1f00e2d commit 7605a32
Show file tree
Hide file tree
Showing 23 changed files with 2,879 additions and 41 deletions.
2 changes: 1 addition & 1 deletion cmd/expirator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func main() {

fatalOnError(err)

slog.Info("Trial cleanup job finished successfully!")
slog.Info("Expirator job finished successfully!")

err = conn.Close()
if err != nil {
Expand Down
129 changes: 123 additions & 6 deletions cmd/subaccountsync/main.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,135 @@
package main

import (
"context"
"fmt"
"log/slog"
"os"
)
"os/signal"
"time"

"github.com/kyma-project/kyma-environment-broker/internal/events"
"github.com/kyma-project/kyma-environment-broker/internal/storage"
"github.com/sirupsen/logrus"

"github.com/kyma-project/kyma-environment-broker/internal/kymacustomresource"
"k8s.io/apimachinery/pkg/runtime/schema"

const (
AppPrefix = "subaccount-sync"
kebConfig "github.com/kyma-project/kyma-environment-broker/internal/config"
"github.com/vrischmann/envconfig"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/client/config"

subsync "github.com/kyma-project/kyma-environment-broker/internal/subaccountsync"
)

const AppPrefix = "subaccount_sync"

func main() {
// create slog logger
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
logger.Info(fmt.Sprintf("%s app stub started", AppPrefix))
// create context
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()

cli := getK8sClient()

// create and fill config
var cfg subsync.Config
err := envconfig.InitWithPrefix(&cfg, AppPrefix)
if err != nil {
fatalOnError(err)
}

logLevel := new(slog.LevelVar)
logLevel.Set(cfg.GetLogLevel())
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: logLevel,
})).With("service", "subaccount-sync"))

slog.Info(fmt.Sprintf("Configuration: event window size:%s, event sync interval:%s, accounts sync interval: %s, storage sync interval: %s, queue sleep interval: %s",
cfg.EventsWindowSize, cfg.EventsSyncInterval, cfg.AccountsSyncInterval, cfg.StorageSyncInterval, cfg.SyncQueueSleepInterval))

// create config provider - provider still uses logrus logger
configProvider := kebConfig.NewConfigProvider(
kebConfig.NewConfigMapReader(ctx, cli, logrus.WithField("service", "storage"), cfg.KymaVersion),
kebConfig.NewConfigMapKeysValidator(),
kebConfig.NewConfigMapConverter())

// create Kyma GVR
kymaGVR := getResourceKindProvider(cfg.KymaVersion, configProvider)

// create DB connection
cipher := storage.NewEncrypter(cfg.Database.SecretKey)
db, dbConn, err := storage.NewFromConfig(cfg.Database, events.Config{}, cipher, logrus.WithField("service", "storage"))
if err != nil {
fatalOnError(err)
}
defer func() {
if r := recover(); r != nil {
slog.Error("Recovered from panic. Error:\n", r)
}
err = dbConn.Close()
if err != nil {
slog.Warn(fmt.Sprintf("failed to close database connection: %s", err.Error()))
}
}()

// create dynamic K8s client
dynamicK8sClient := createDynamicK8sClient()

// create service
syncService := subsync.NewSyncService(AppPrefix, ctx, cfg, kymaGVR, db, dynamicK8sClient)
syncService.Run()
}

func getK8sClient() client.Client {
k8sCfg, err := config.GetConfig()
fatalOnError(err)
cli, err := createK8sClient(k8sCfg)
fatalOnError(err)
return cli
}

func createK8sClient(cfg *rest.Config) (client.Client, error) {
mapper, err := apiutil.NewDiscoveryRESTMapper(cfg)
if err != nil {
err = wait.Poll(time.Second, time.Minute, func() (bool, error) {
mapper, err = apiutil.NewDiscoveryRESTMapper(cfg)
if err != nil {
return false, nil
}
return true, nil
})
if err != nil {
return nil, fmt.Errorf("while waiting for client mapper: %w", err)
}
}
cli, err := client.New(cfg, client.Options{Mapper: mapper})
if err != nil {
return nil, fmt.Errorf("while creating a client: %w", err)
}
return cli, nil
}

func createDynamicK8sClient() dynamic.Interface {
kcpK8sConfig := config.GetConfigOrDie()
clusterClient, err := dynamic.NewForConfig(kcpK8sConfig)
fatalOnError(err)
return clusterClient
}

func getResourceKindProvider(kymaVersion string, configProvider *kebConfig.ConfigProvider) schema.GroupVersionResource {
resourceKindProvider := kymacustomresource.NewResourceKindProvider(kymaVersion, configProvider)
kymaGVR, err := resourceKindProvider.DefaultGvr()
fatalOnError(err)
return kymaGVR
}

func fatalOnError(err error) {
if err != nil {
panic(err)
}
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a
golang.org/x/mod v0.17.0
golang.org/x/oauth2 v0.19.0
golang.org/x/time v0.5.0
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.29.2
Expand Down Expand Up @@ -79,6 +80,7 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gnostic v0.7.0 // indirect
github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
Expand Down Expand Up @@ -128,7 +130,6 @@ require (
golang.org/x/sys v0.18.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.18.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
Expand Down
68 changes: 68 additions & 0 deletions internal/subaccountsync/cis_accounts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package subaccountsync

import (
"encoding/json"
"fmt"
"io"
"net/http"
)

func (c *RateLimitedCisClient) buildSubaccountRequest(subaccountID string) (*http.Request, error) {
request, err := http.NewRequest(http.MethodGet, fmt.Sprintf(subaccountServicePath, c.config.ServiceURL, subaccountID), nil)
if err != nil {
return nil, fmt.Errorf("while creating request: %w", err)
}
q := request.URL.Query()
request.URL.RawQuery = q.Encode()
return request, nil
}

func (c *RateLimitedCisClient) fetchDataForSetOfSubaccounts(subaccounts subaccountsSetType) (statesFromCisType, error) {

subaccountsDataFromAccounts := make(statesFromCisType)
for subaccount := range subaccounts {
accountData, err := c.GetSubaccountData(string(subaccount))
if err != nil {
c.log.Error(fmt.Sprintf("while getting subaccount data: %s", err))
} else {
c.log.Debug(fmt.Sprintf("getting for subaccount %s", subaccount))
subaccountsDataFromAccounts[subaccount] = accountData
}
}
return subaccountsDataFromAccounts, nil
}

func (c *RateLimitedCisClient) GetSubaccountData(subaccountID string) (CisStateType, error) {
request, err := c.buildSubaccountRequest(subaccountID)
if err != nil {
return CisStateType{}, fmt.Errorf("while building request for accounts technical service: %w", err)
}

response, err := c.httpClient.Do(request)
if err != nil {
return CisStateType{}, fmt.Errorf("while executing request to accounts technical service: %w", err)
}

defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
c.log.Warn(fmt.Sprintf("failed to close response body: %s", err.Error()))
}
}(response.Body)

if response.StatusCode == http.StatusNotFound {
return CisStateType{}, nil
}

if response.StatusCode != http.StatusOK {
return CisStateType{}, fmt.Errorf("while processing response: %s", c.handleErrorStatusCode(response))
}

var cisResponse CisStateType
err = json.NewDecoder(response.Body).Decode(&cisResponse)
if err != nil {
return CisStateType{}, fmt.Errorf("while decoding CIS account response: %w", err)
}

return cisResponse, nil
}
101 changes: 101 additions & 0 deletions internal/subaccountsync/cis_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package subaccountsync

import (
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"strconv"
)

func (c *RateLimitedCisClient) buildEventRequest(page int, fromActionTime int64) (*http.Request, error) {
request, err := http.NewRequest(http.MethodGet, fmt.Sprintf(eventServicePath, c.config.ServiceURL), nil)
if err != nil {
return nil, fmt.Errorf("while creating request: %v", err)
}
q := request.URL.Query()
q.Add("eventType", eventType)
q.Add("pageSize", c.config.PageSize)
q.Add("pageNum", strconv.Itoa(page))
q.Add("fromActionTime", strconv.FormatInt(fromActionTime, 10))
q.Add("sortField", "actionTime")
q.Add("sortOrder", "ASC")

request.URL.RawQuery = q.Encode()

return request, nil
}

func (c *RateLimitedCisClient) FetchEventsWindow(fromActionTime int64) ([]Event, error) {
var events []Event
var currentPage int
for {
cisResponse, err := c.fetchEventsPage(currentPage, fromActionTime)
if err != nil {
c.log.Error(fmt.Sprintf("while getting subaccount events for %d page: %v", currentPage, err))
return nil, err
}
events = append(events, cisResponse.Events...)
currentPage++
if cisResponse.TotalPages == currentPage {
break
}
}
c.log.Debug(fmt.Sprintf("Fetched event window - pages: %d, events: %d, from epoch: %d", currentPage, len(events), fromActionTime))
return events, nil
}

func (c *RateLimitedCisClient) fetchEventsPage(page int, fromActionTime int64) (CisEventsResponse, error) {
request, err := c.buildEventRequest(page, fromActionTime)
if err != nil {
return CisEventsResponse{}, fmt.Errorf("while building request for event service: %v", err)
}

response, err := c.httpClient.Do(request)
if err != nil {
return CisEventsResponse{}, fmt.Errorf("while executing request to event service: %v", err)
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
c.log.Warn(fmt.Sprintf("failed to close response body: %s", err.Error()))
}
}(response.Body)

if response.StatusCode != http.StatusOK {
return CisEventsResponse{}, fmt.Errorf("while processing response: %s", c.handleErrorStatusCode(response))
}

var cisResponse CisEventsResponse
err = json.NewDecoder(response.Body).Decode(&cisResponse)
if err != nil {
return CisEventsResponse{}, fmt.Errorf("while decoding CIS events response: %v", err)
}

return cisResponse, nil
}

func (c *RateLimitedCisClient) getEventsForSubaccounts(fromActionTime int64, logs slog.Logger, subaccountsMap subaccountsSetType) ([]Event, error) {
rawEvents, err := c.FetchEventsWindow(fromActionTime)
if err != nil {
logs.Error(fmt.Sprintf("while getting subaccount delete events: %s", err))
return nil, err
}

// filter events to get only the ones in subaccounts map
filteredEventsFromCis := filterEvents(rawEvents, subaccountsMap)
logs.Info(fmt.Sprintf("Raw events: %d, filtered: %d", len(rawEvents), len(filteredEventsFromCis)))

return filteredEventsFromCis, nil
}

func filterEvents(rawEvents []Event, subaccounts subaccountsSetType) []Event {
var filteredEvents []Event
for _, event := range rawEvents {
if _, ok := subaccounts[subaccountIDType(event.SubaccountID)]; ok {
filteredEvents = append(filteredEvents, event)
}
}
return filteredEvents
}
37 changes: 37 additions & 0 deletions internal/subaccountsync/cis_model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package subaccountsync

type (
EventDetails struct {
BetaEnabled bool `json:"betaEnabled"`
UsedForProduction string `json:"usedForProduction"`
}

Event struct {
ActionTime int64 `json:"actionTime"`
SubaccountID string `json:"entityId"`
Type string `json:"eventType"`
Details EventDetails
}

CisEventsResponse struct {
Total int `json:"total"`
TotalPages int `json:"totalPages"`
PageNum int `json:"pageNum"`
Events []Event `json:"events"`
}

CisStateType struct {
BetaEnabled bool `json:"betaEnabled"`
UsedForProduction string `json:"usedForProduction"`
ModifiedDate int64 `json:"modifiedDate"`
}

CisErrorResponseType struct {
Code int `json:"code"`
Message string `json:"message"`
}

CisAccountErrorResponseType struct {
Error CisErrorResponseType `json:"error"`
}
)
Loading

0 comments on commit 7605a32

Please sign in to comment.