diff --git a/.env b/.env index 6a08828..f39fbfd 100644 --- a/.env +++ b/.env @@ -4,3 +4,5 @@ ZOT_URL="127.0.0.1:8585" TOKEN="" ENV=dev USE_UNSECURE=true +GROUP_NAME=satellite-test-group-state +STATE_ARTIFACT_NAME=state diff --git a/.gitignore b/.gitignore index 1f90f92..420e0d1 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,6 @@ dist/ zot/cache.db secrets.txt __debug_bin1949266242 + +/zot +/runtime \ No newline at end of file diff --git a/ci/utils.go b/ci/utils.go index 20f4d76..954ec1a 100644 --- a/ci/utils.go +++ b/ci/utils.go @@ -60,7 +60,6 @@ func (m *HarborSatellite) Service( // builds given component from source - func (m *HarborSatellite) build(source *dagger.Directory, component string) *dagger.Directory { fmt.Printf("Building %s\n", component) diff --git a/config.json b/config.json new file mode 100644 index 0000000..807d2f9 --- /dev/null +++ b/config.json @@ -0,0 +1,16 @@ +{ + "auth": { + "name": "admin", + "registry": "https://registry.bupd.xyz", + "secret": "Harbor12345" + }, + "bring_own_registry": false, + "ground_control_url": "http://localhost:8080", + "log_level": "info", + "own_registry_adr": "127.0.0.1", + "own_registry_port": "8585", + "states": ["https://registry.bupd.xyz/satellite-test-group-state/state:latest"], + "url_or_file": "https://registry.bupd.xyz", + "zotconfigpath": "./registry/config.json", + "use_unsecure": true +} diff --git a/config.toml b/config.toml index 8e02820..30614fc 100644 --- a/config.toml +++ b/config.toml @@ -6,7 +6,10 @@ own_registry_adr = "127.0.0.1" own_registry_port = "8585" # URL of remote registry OR local file path -url_or_file = "https://demo.goharbor.io/v2/myproject/album-server" +# url_or_file = "https://demo.goharbor.io/v2/myproject/album-server" +url_or_file = "https://registry.bupd.xyz" +## for testing for local file +# url_or_file = "./image-list/images.json" # Default path for Zot registry config.json zotConfigPath = "./registry/config.json" diff --git a/go.mod b/go.mod index 666a5f7..8829be4 100644 --- a/go.mod +++ b/go.mod @@ -328,6 +328,7 @@ require ( github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rivo/uniseg v0.4.7 // indirect + github.com/robfig/cron/v3 v3.0.1 github.com/rubenv/sql-migrate v1.5.2 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect @@ -398,8 +399,8 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.27.0 - go.opentelemetry.io/otel/log v0.3.0 - go.opentelemetry.io/otel/metric v1.27.0 + go.opentelemetry.io/otel/log v0.7.0 + go.opentelemetry.io/otel/metric v1.27.0 // indirect go.opentelemetry.io/otel/sdk v1.27.0 go.opentelemetry.io/otel/sdk/log v0.3.0 go.opentelemetry.io/otel/sdk/metric v1.27.0 diff --git a/go.sum b/go.sum index 7801ef3..3102b75 100644 --- a/go.sum +++ b/go.sum @@ -1289,6 +1289,8 @@ github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qq github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= diff --git a/image-list/images.json b/image-list/images.json index 271496a..86cf321 100644 --- a/image-list/images.json +++ b/image-list/images.json @@ -1,16 +1,25 @@ { - "registryUrl": "https://demo.goharbor.io/v2/", - "repositories": [ + "registry": "Satellite", + "artifacts": [ { - "repository": "myproject", - "images": [ - { - "name": "album-server@sha256:39879890008f12c25ea14125aa8e9ec8ef3e167f0b0ed88057e955a8fa32c430" - }, - { - "name": "album-server:busybox" - } - ] + "repository": "satellite-test-group-state/alpine", + "tag": [ + "latest" + ], + "labels": null, + "type": "IMAGE", + "digest": "sha256:9cee2b382fe2412cd77d5d437d15a93da8de373813621f2e4d406e3df0cf0e7c", + "deleted": false + }, + { + "repository": "satellite-test-group-state/postgres", + "tag": [ + "latest" + ], + "labels": null, + "type": "IMAGE", + "digest": "sha256:dde924f70bc972261013327c480adf402ea71487b5750e40569a0b74fa90c74a", + "deleted": false } ] -} \ No newline at end of file +} diff --git a/internal/config/config.go b/internal/config/config.go index 93ae830..4f6aa02 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1,164 +1,138 @@ package config import ( + "encoding/json" "fmt" "os" - - "github.com/joho/godotenv" - "github.com/spf13/viper" ) -var AppConfig *Config +var appConfig *Config + +const DefaultConfigPath string = "config.json" + +type Auth struct { + Name string `json:"name"` + Registry string `json:"registry"` + Secret string `json:"secret"` +} type Config struct { - log_level string - own_registry bool - own_registry_adr string - own_registry_port string - zot_config_path string - input string - zot_url string - registry string - repository string - user_input string - scheme string - api_version string - image string - harbor_password string - harbor_username string - env string - use_unsecure bool + Auth Auth `json:"auth"` + BringOwnRegistry bool `json:"bring_own_registry"` + GroundControlURL string `json:"ground_control_url"` + LogLevel string `json:"log_level"` + OwnRegistryAddress string `json:"own_registry_adr"` + OwnRegistryPort string `json:"own_registry_port"` + States []string `json:"states"` + URLOrFile string `json:"url_or_file"` + ZotConfigPath string `json:"zotconfigpath"` + UseUnsecure bool `json:"use_unsecure"` + ZotUrl string `json:"zot_url"` + StateFetchPeriod string `json:"state_fetch_period"` } func GetLogLevel() string { - return AppConfig.log_level + return appConfig.LogLevel } func GetOwnRegistry() bool { - return AppConfig.own_registry + return appConfig.BringOwnRegistry } func GetOwnRegistryAdr() string { - return AppConfig.own_registry_adr + return appConfig.OwnRegistryAddress } func GetOwnRegistryPort() string { - return AppConfig.own_registry_port + return appConfig.OwnRegistryPort } func GetZotConfigPath() string { - return AppConfig.zot_config_path + return appConfig.ZotConfigPath } func GetInput() string { - return AppConfig.input + return appConfig.URLOrFile } func SetZotURL(url string) { - AppConfig.zot_url = url + appConfig.ZotUrl = url } func GetZotURL() string { - return AppConfig.zot_url + return appConfig.ZotUrl } -func SetRegistry(registry string) { - AppConfig.registry = registry -} - -func GetRegistry() string { - return AppConfig.registry -} - -func SetRepository(repository string) { - AppConfig.repository = repository -} - -func GetRepository() string { - return AppConfig.repository -} - -func SetUserInput(user_input string) { - AppConfig.user_input = user_input -} - -func GetUserInput() string { - return AppConfig.user_input +func UseUnsecure() bool { + return appConfig.UseUnsecure } -func SetScheme(scheme string) { - AppConfig.scheme = scheme +func GetHarborPassword() string { + return appConfig.Auth.Secret } -func GetScheme() string { - return AppConfig.scheme +func GetHarborUsername() string { + return appConfig.Auth.Name } -func SetAPIVersion(api_version string) { - AppConfig.api_version = api_version +func SetRemoteRegistryURL(url string) { + appConfig.Auth.Registry = url } -func GetAPIVersion() string { - return AppConfig.api_version +func GetRemoteRegistryURL() string { + return appConfig.Auth.Registry } -func SetImage(image string) { - AppConfig.image = image +func GetStateFetchPeriod() string { + return appConfig.StateFetchPeriod } -func GetImage() string { - return AppConfig.image +func GetStates() []string { + return appConfig.States } -func UseUnsecure() bool { - return AppConfig.use_unsecure +func ParseConfigFromJson(jsonData string) (*Config, error) { + var config Config + err := json.Unmarshal([]byte(jsonData), &config) + if err != nil { + return nil, err + } + return &config, nil } -func GetHarborPassword() string { - return AppConfig.harbor_password -} +func ReadConfigData(configPath string) ([]byte, error) { -func GetHarborUsername() string { - return AppConfig.harbor_username + fileInfo, err := os.Stat(configPath) + if err != nil { + return nil, err + } + if fileInfo.IsDir() { + return nil, os.ErrNotExist + } + data, err := os.ReadFile(configPath) + if err != nil { + return nil, err + } + return data, nil } func LoadConfig() (*Config, error) { - viper.SetConfigName("config") - viper.SetConfigType("toml") - viper.AddConfigPath(".") - if err := viper.ReadInConfig(); err != nil { - return nil, fmt.Errorf("error reading config file at path '%s': %w", viper.ConfigFileUsed(), err) - } - - // Load environment and start satellite - if err := godotenv.Load(); err != nil { - return &Config{}, fmt.Errorf("error loading .env file: %w", err) + configData, err := ReadConfigData(DefaultConfigPath) + if err != nil { + fmt.Printf("Error reading config file: %v\n", err) + return nil, err } - var use_unsecure bool - if os.Getenv("USE_UNSECURE") == "true" { - use_unsecure = true - } else { - use_unsecure = false + config, err := ParseConfigFromJson(string(configData)) + if err != nil { + fmt.Printf("Error parsing config file: %v\n", err) + return nil, err } - - return &Config{ - log_level: viper.GetString("log_level"), - own_registry: viper.GetBool("bring_own_registry"), - own_registry_adr: viper.GetString("own_registry_adr"), - own_registry_port: viper.GetString("own_registry_port"), - zot_config_path: viper.GetString("zotConfigPath"), - input: viper.GetString("url_or_file"), - harbor_password: os.Getenv("HARBOR_PASSWORD"), - harbor_username: os.Getenv("HARBOR_USERNAME"), - env: os.Getenv("ENV"), - zot_url: os.Getenv("ZOT_URL"), - use_unsecure: use_unsecure, - }, nil + return config, nil } func InitConfig() error { var err error - AppConfig, err = LoadConfig() + appConfig, err = LoadConfig() if err != nil { return err } diff --git a/internal/notifier/email_notifier.go b/internal/notifier/email_notifier.go new file mode 100644 index 0000000..ed45f23 --- /dev/null +++ b/internal/notifier/email_notifier.go @@ -0,0 +1 @@ +package notifier diff --git a/internal/notifier/notifier.go b/internal/notifier/notifier.go new file mode 100644 index 0000000..cf4b2ff --- /dev/null +++ b/internal/notifier/notifier.go @@ -0,0 +1,28 @@ +package notifier + +import ( + "context" + + "container-registry.com/harbor-satellite/logger" +) + +type Notifier interface { + // Notify sends a notification + Notify() error +} + +type SimpleNotifier struct{ + ctx context.Context +} + +func NewSimpleNotifier(ctx context.Context) Notifier { + return &SimpleNotifier{ + ctx: ctx, + } +} + +func (n *SimpleNotifier) Notify() error { + log := logger.FromContext(n.ctx) + log.Info().Msg("This is a simple notifier") + return nil +} diff --git a/internal/replicate/replicate.go b/internal/replicate/replicate.go deleted file mode 100644 index 59bb805..0000000 --- a/internal/replicate/replicate.go +++ /dev/null @@ -1,226 +0,0 @@ -package replicate - -import ( - "context" - "encoding/json" - "fmt" - "os" - "path/filepath" - "strings" - - "container-registry.com/harbor-satellite/internal/config" - "container-registry.com/harbor-satellite/internal/store" - "container-registry.com/harbor-satellite/logger" - "github.com/google/go-containerregistry/pkg/authn" - "github.com/google/go-containerregistry/pkg/crane" -) - -type Replicator interface { - // Replicate copies images from the source registry to the local registry. - Replicate(ctx context.Context, image string) error - DeleteExtraImages(ctx context.Context, imgs []store.Image) error -} - -type BasicReplicator struct { - username string - password string - use_unsecure bool - zot_url string -} - -type ImageInfo struct { - Name string `json:"name"` -} - -type Repository struct { - Repository string `json:"repository"` - Images []ImageInfo `json:"images"` -} - -type RegistryInfo struct { - RegistryUrl string `json:"registryUrl"` - Repositories []Repository `json:"repositories"` -} - -func NewReplicator(context context.Context) Replicator { - return &BasicReplicator{ - username: config.GetHarborUsername(), - password: config.GetHarborPassword(), - use_unsecure: config.UseUnsecure(), - zot_url: config.GetZotURL(), - } -} - -func (r *BasicReplicator) Replicate(ctx context.Context, image string) error { - - source := getPullSource(ctx, image) - - if source != "" { - CopyImage(ctx, source) - } - return nil -} - -func stripPrefix(imageName string) string { - if idx := strings.Index(imageName, ":"); idx != -1 { - return imageName[idx+1:] - } - return imageName -} - -func (r *BasicReplicator) DeleteExtraImages(ctx context.Context, imgs []store.Image) error { - log := logger.FromContext(ctx) - zotUrl := os.Getenv("ZOT_URL") - registry := os.Getenv("REGISTRY") - repository := os.Getenv("REPOSITORY") - image := os.Getenv("IMAGE") - - localRegistry := fmt.Sprintf("%s/%s/%s/%s", zotUrl, registry, repository, image) - log.Info().Msgf("Local registry: %s", localRegistry) - - // Get the list of images from the local registry - localImages, err := crane.ListTags(localRegistry) - if err != nil { - log.Error().Msgf("failed to list tags: %v", err) - return err - } - - // Create a map for quick lookup of the provided image list - imageMap := make(map[string]struct{}) - for _, img := range imgs { - // Strip the "album-server:" prefix from the image name - strippedName := stripPrefix(img.Name) - imageMap[strippedName] = struct{}{} - } - - // Iterate over the local images and delete those not in the provided image list - for _, localImage := range localImages { - if _, exists := imageMap[localImage]; !exists { - // Image is not in the provided list, delete it - log.Info().Msgf("Deleting image: %s", localImage) - err := crane.Delete(fmt.Sprintf("%s:%s", localRegistry, localImage)) - if err != nil { - log.Error().Msgf("failed to delete image: %v", err) - return err - } - log.Info().Msgf("Image deleted: %s", localImage) - } - } - - return nil -} - -func getPullSource(ctx context.Context, image string) string { - log := logger.FromContext(ctx) - input := os.Getenv("USER_INPUT") - scheme := os.Getenv("SCHEME") - if strings.HasPrefix(scheme, "http://") || strings.HasPrefix(scheme, "https://") { - url := os.Getenv("REGISTRY") + "/" + os.Getenv("REPOSITORY") + "/" + image - return url - } else { - registryInfo, err := getFileInfo(ctx, input) - if err != nil { - log.Error().Msgf("Error getting file info: %v", err) - return "" - } - registryURL := registryInfo.RegistryUrl - registryURL = strings.TrimPrefix(registryURL, "https://") - registryURL = strings.TrimSuffix(registryURL, "/v2/") - - // TODO: Handle multiple repositories - repositoryName := registryInfo.Repositories[0].Repository - - return registryURL + "/" + repositoryName + "/" + image - } -} - -func getFileInfo(ctx context.Context, input string) (*RegistryInfo, error) { - log := logger.FromContext(ctx) - // Get the current working directory - workingDir, err := os.Getwd() - if err != nil { - log.Error().Msgf("Error getting current directory: %v", err) - return nil, err - } - - // Construct the full path by joining the working directory and the input path - fullPath := filepath.Join(workingDir, input) - - // Read the file - jsonData, err := os.ReadFile(fullPath) - if err != nil { - log.Error().Msgf("Error reading file: %v", err) - return nil, err - } - - var registryInfo RegistryInfo - err = json.Unmarshal(jsonData, ®istryInfo) - if err != nil { - log.Error().Msgf("Error unmarshalling JSON data: %v", err) - return nil, err - } - - return ®istryInfo, nil -} - -func CopyImage(ctx context.Context, imageName string) error { - log := logger.FromContext(ctx) - log.Info().Msgf("Copying image: %s", imageName) - zotUrl := os.Getenv("ZOT_URL") - if zotUrl == "" { - log.Error().Msg("ZOT_URL environment variable is not set") - return fmt.Errorf("ZOT_URL environment variable is not set") - } - - // Build the destination reference - destRef := fmt.Sprintf("%s/%s", zotUrl, imageName) - log.Info().Msgf("Destination reference: %s", destRef) - - // Get credentials from environment variables - username := os.Getenv("HARBOR_USERNAME") - password := os.Getenv("HARBOR_PASSWORD") - if username == "" || password == "" { - log.Error().Msg("HARBOR_USERNAME or HARBOR_PASSWORD environment variable is not set") - return fmt.Errorf("HARBOR_USERNAME or HARBOR_PASSWORD environment variable is not set") - } - - auth := authn.FromConfig(authn.AuthConfig{ - Username: username, - Password: password, - }) - options := []crane.Option{crane.WithAuth(auth)} - if config.UseUnsecure() { - options = append(options, crane.Insecure) - } - // Pull the image with authentication - srcImage, err := crane.Pull(imageName, options...) - if err != nil { - log.Error().Msgf("Failed to pull image: %v", err) - return fmt.Errorf("failed to pull image: %w", err) - } else { - log.Info().Msg("Image pulled successfully") - } - - // Push the image to the destination registry - push_options := []crane.Option{} - if config.UseUnsecure() { - push_options = append(push_options, crane.Insecure) - } - err = crane.Push(srcImage, destRef, push_options...) - if err != nil { - log.Error().Msgf("Failed to push image: %v", err) - return fmt.Errorf("failed to push image: %w", err) - } else { - log.Info().Msg("Image pushed successfully") - } - - // Delete ./local-oci-layout directory - // This is required because it is a temporary directory used by crane to pull and push images to and from - // And crane does not automatically clean it - if err := os.RemoveAll("./local-oci-layout"); err != nil { - log.Error().Msgf("Failed to remove directory: %v", err) - return fmt.Errorf("failed to remove directory: %w", err) - } - - return nil -} diff --git a/internal/satellite/satellite.go b/internal/satellite/satellite.go index b2f6b0a..1d0d038 100644 --- a/internal/satellite/satellite.go +++ b/internal/satellite/satellite.go @@ -2,75 +2,51 @@ package satellite import ( "context" - "time" - "container-registry.com/harbor-satellite/internal/replicate" - "container-registry.com/harbor-satellite/internal/store" + "container-registry.com/harbor-satellite/internal/config" + "container-registry.com/harbor-satellite/internal/notifier" + "container-registry.com/harbor-satellite/internal/scheduler" + "container-registry.com/harbor-satellite/internal/state" + "container-registry.com/harbor-satellite/internal/utils" "container-registry.com/harbor-satellite/logger" ) type Satellite struct { - storer store.Storer - replicator replicate.Replicator + stateReader state.StateReader + schedulerKey scheduler.SchedulerKey } -func NewSatellite(ctx context.Context, storer store.Storer, replicator replicate.Replicator) *Satellite { +func NewSatellite(ctx context.Context, schedulerKey scheduler.SchedulerKey) *Satellite { return &Satellite{ - storer: storer, - replicator: replicator, + schedulerKey: schedulerKey, } } func (s *Satellite) Run(ctx context.Context) error { log := logger.FromContext(ctx) - - // Execute the initial operation immediately without waiting for the ticker - imgs, err := s.storer.List(ctx) + log.Info().Msg("Starting Satellite") + var cronExpr string + state_fetch_period := config.GetStateFetchPeriod() + cronExpr, err := utils.FormatDuration(state_fetch_period) if err != nil { - log.Error().Err(err).Msg("Error listing images") - return err - } - if len(imgs) == 0 { - log.Info().Msg("No images to replicate") - } else { - for _, img := range imgs { - err = s.replicator.Replicate(ctx, img.Name) - if err != nil { - log.Error().Err(err).Msg("Error replicating image") - return err - } - } - s.replicator.DeleteExtraImages(ctx, imgs) - } - log.Info().Msg("--------------------------------\n") - - // Temporarily set to faster tick rate for testing purposes - ticker := time.NewTicker(3 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return nil - case <-ticker.C: - imgs, err := s.storer.List(ctx) - if err != nil { - log.Error().Err(err).Msg("Error listing images") - return err - } - if len(imgs) == 0 { - log.Info().Msg("No images to replicate") - } else { - for _, img := range imgs { - err = s.replicator.Replicate(ctx, img.Name) - if err != nil { - log.Error().Err(err).Msg("Error replicating image") - return err - } - } - s.replicator.DeleteExtraImages(ctx, imgs) - } - } - log.Info().Msg("--------------------------------\n") + log.Warn().Msgf("Error formatting duration in seconds: %v", err) + log.Warn().Msgf("Using default duration: %v", state.DefaultFetchAndReplicateStateTimePeriod) + cronExpr = state.DefaultFetchAndReplicateStateTimePeriod } + userName := config.GetHarborUsername() + password := config.GetHarborPassword() + zotURL := config.GetZotURL() + sourceRegistry := utils.FormatRegistryURL(config.GetRemoteRegistryURL()) + useUnsecure := config.UseUnsecure() + // Get the scheduler from the context + scheduler := ctx.Value(s.schedulerKey).(scheduler.Scheduler) + // Create a simple notifier and add it to the process + notifier := notifier.NewSimpleNotifier(ctx) + // Creating a process to fetch and replicate the state + states := config.GetStates() + fetchAndReplicateStateProcess := state.NewFetchAndReplicateStateProcess(scheduler.NextID(), cronExpr, notifier, userName, password, zotURL, sourceRegistry, useUnsecure, states) + // Add the process to the scheduler + scheduler.Schedule(fetchAndReplicateStateProcess) + + return nil } diff --git a/internal/scheduler/process.go b/internal/scheduler/process.go new file mode 100644 index 0000000..a306bc9 --- /dev/null +++ b/internal/scheduler/process.go @@ -0,0 +1,21 @@ +package scheduler + +import "context" + +// Process represents a process that can be scheduled +type Process interface { + // Execute runs the process + Execute(ctx context.Context) error + + // GetID returns the unique GetID of the process + GetID() uint64 + + // GetName returns the name of the process + GetName() string + + // GetCronExpr returns the cron expression for the process + GetCronExpr() string + + // IsRunning returns true if the process is running + IsRunning() bool +} diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go new file mode 100644 index 0000000..f12c20e --- /dev/null +++ b/internal/scheduler/scheduler.go @@ -0,0 +1,105 @@ +package scheduler + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + + "container-registry.com/harbor-satellite/logger" + "github.com/robfig/cron/v3" +) + +type SchedulerKey string + +const BasicSchedulerKey SchedulerKey = "basic-scheduler" + +type Scheduler interface { + // GetSchedulerKey would return the key of the scheduler which is unique and for a particular scheduler and is used to get the scheduler from the context + GetSchedulerKey() SchedulerKey + // Schedule would add a process to the scheduler + Schedule(process Process) error + // Start would start the scheduler + Start() error + // Stop would stop the scheduler + Stop() error + // NextID would return the next unique ID + NextID() uint64 +} + +type BasicScheduler struct { + // name is the key of the scheduler + name SchedulerKey + // cron is the cron scheduler + cron *cron.Cron + // processes is a map of processes which are attached to the scheduler + processes map[string]Process + // locks is a map of locks for each process which is used to schedule if the process are interdependent + locks map[string]*sync.Mutex + // stopped is a flag to check if the scheduler is stopped + stopped bool + // counter is the counter for the unique ID of the process + counter uint64 + // mu is the mutex for the scheduler + mu sync.Mutex + // ctx is the context of the scheduler + ctx context.Context +} + +func NewBasicScheduler(ctx context.Context) Scheduler { + return &BasicScheduler{ + cron: cron.New(), + processes: make(map[string]Process), + locks: make(map[string]*sync.Mutex), + mu: sync.Mutex{}, + name: BasicSchedulerKey, + ctx: ctx, + } +} + +func (s *BasicScheduler) GetSchedulerKey() SchedulerKey { + return s.name +} + +func (s *BasicScheduler) NextID() uint64 { + return atomic.AddUint64(&s.counter, 1) +} + +func (s *BasicScheduler) Schedule(process Process) error { + log := logger.FromContext(s.ctx) + log.Info().Msgf("Scheduling process %s", process.GetName()) + s.mu.Lock() + defer s.mu.Unlock() + if _, exists := s.processes[process.GetName()]; exists { + return fmt.Errorf("process %s already exists", process.GetName()) + } + // Add the process to the scheduler + _, err := s.cron.AddFunc(process.GetCronExpr(), func() { + s.executeProcess(process) + }) + if err != nil { + return fmt.Errorf("error adding process to scheduler: %w", err) + } + s.processes[process.GetName()] = process + log.Info().Msgf("Process %s scheduled with cron expression %s", process.GetName(), process.GetCronExpr()) + return nil +} + +func (s *BasicScheduler) Start() error { + s.cron.Start() + return nil +} + +func (s *BasicScheduler) Stop() error { + s.stopped = true + s.cron.Stop() + return nil +} + +func (s *BasicScheduler) executeProcess(process Process) error { + if s.stopped { + return fmt.Errorf("scheduler is stopped") + } + // Execute the process + return process.Execute(s.ctx) +} diff --git a/internal/server/server.go b/internal/server/server.go index 7da7785..661e98c 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -3,9 +3,9 @@ package server import ( "context" "errors" + "fmt" "net/http" - "container-registry.com/harbor-satellite/internal/config" "github.com/rs/zerolog" "golang.org/x/sync/errgroup" ) @@ -21,16 +21,14 @@ type App struct { server *http.Server ctx context.Context Logger *zerolog.Logger - config *config.Config } -func NewApp(router Router, ctx context.Context, logger *zerolog.Logger, config *config.Config, registrars ...RouteRegistrar) *App { +func NewApp(router Router, ctx context.Context, logger *zerolog.Logger, registrars ...RouteRegistrar) *App { return &App{ router: router, registrars: registrars, ctx: ctx, Logger: logger, - config: config, server: &http.Server{Addr: ":9090", Handler: router}, } } @@ -62,7 +60,11 @@ func (a *App) SetupServer(g *errgroup.Group) { }) g.Go(func() error { <-a.ctx.Done() - a.Logger.Info().Msg("Shutting down server") - return a.Shutdown(a.ctx) + a.Logger.Warn().Msg("Shutting down server") + err := a.Shutdown(a.ctx) + if err != nil { + return fmt.Errorf("error shutting down server: %w", err) + } + return fmt.Errorf("satellite shutting down") }) } diff --git a/internal/state/artifact.go b/internal/state/artifact.go new file mode 100644 index 0000000..81ff6c8 --- /dev/null +++ b/internal/state/artifact.go @@ -0,0 +1,107 @@ +package state + +import ( + "reflect" +) + +// ArtifactReader defines an interface for reading artifact data +type ArtifactReader interface { + GetRepository() string + GetTags() []string + GetDigest() string + GetType() string + IsDeleted() bool + HasChanged(newArtifact ArtifactReader) bool + SetRepository(repository string) + SetName(name string) + GetName() string +} + +// Artifact represents an artifact object in the registry +type Artifact struct { + Repository string `json:"repository,omitempty"` + Tags []string `json:"tag,omitempty"` + Labels []string `json:"labels"` + Type string `json:"type,omitempty"` + Digest string `json:"digest,omitempty"` + Deleted bool `json:"deleted"` + Name string `json:"name,omitempty"` +} + +// NewArtifact creates a new Artifact object +func NewArtifact(deleted bool, repository string, tags []string, digest, artifactType string) ArtifactReader { + return &Artifact{ + Deleted: deleted, + Repository: repository, + Tags: tags, + Digest: digest, + Type: artifactType, + } +} + +func (a *Artifact) GetRepository() string { + return a.Repository +} + +func (a *Artifact) GetTags() []string { + return a.Tags +} + +func (a *Artifact) GetDigest() string { + return a.Digest +} + +func (a *Artifact) GetType() string { + return a.Type +} + +func (a *Artifact) IsDeleted() bool { + return a.Deleted +} + +func (a *Artifact) GetName() string { + return a.Name +} + +// HasChanged compares the current artifact with another to determine if there are any changes +func (a *Artifact) HasChanged(newArtifact ArtifactReader) bool { + // Compare the digest (hash) + if a.GetDigest() != newArtifact.GetDigest() { + return true + } + + // Compare the repository + if a.GetRepository() != newArtifact.GetRepository() { + return true + } + + // Compare the tags (order-agnostic comparison) + if !reflect.DeepEqual(a.GetTags(), newArtifact.GetTags()) { + return true + } + + // Compare the deletion status + if a.IsDeleted() != newArtifact.IsDeleted() { + return true + } + + if a.GetType() != newArtifact.GetType() { + return true + } + + // Compare the tags (order-agnostic comparison using reflect.DeepEqual) + if !reflect.DeepEqual(a.GetTags(), newArtifact.GetTags()) { + return true + } + + // No changes detected + return false +} + +func (a *Artifact) SetRepository(repository string) { + a.Repository = repository +} + +func (a *Artifact) SetName(name string) { + a.Name = name +} diff --git a/internal/state/fetcher.go b/internal/state/fetcher.go new file mode 100644 index 0000000..476b2f4 --- /dev/null +++ b/internal/state/fetcher.go @@ -0,0 +1,129 @@ +package state + +import ( + "archive/tar" + "bytes" + "encoding/json" + "fmt" + "io" + "os" + + "container-registry.com/harbor-satellite/internal/config" + "container-registry.com/harbor-satellite/internal/utils" + "github.com/google/go-containerregistry/pkg/authn" + "github.com/google/go-containerregistry/pkg/crane" +) + +type StateFetcher interface { + FetchStateArtifact(state interface{}) error +} + +type baseStateFetcher struct { + username string + password string +} + +type URLStateFetcher struct { + baseStateFetcher + url string +} + +type FileStateArtifactFetcher struct { + baseStateFetcher + filePath string +} + +func NewURLStateFetcher(stateURL, userName, password string) StateFetcher { + url := utils.FormatRegistryURL(stateURL) + return &URLStateFetcher{ + baseStateFetcher: baseStateFetcher{ + username: userName, + password: password, + }, + url: url, + } +} + +func NewFileStateFetcher(filePath, userName, password string) StateFetcher { + return &FileStateArtifactFetcher{ + baseStateFetcher: baseStateFetcher{ + username: userName, + password: password, + }, + filePath: filePath, + } +} + +func (f *FileStateArtifactFetcher) FetchStateArtifact(state interface{}) error { + content, err := os.ReadFile(f.filePath) + if err != nil { + return fmt.Errorf("failed to read the state artifact file: %v", err) + } + err = json.Unmarshal(content, state) + if err != nil { + return fmt.Errorf("failed to parse the state artifact file: %v", err) + } + return nil +} + +func (f *URLStateFetcher) FetchStateArtifact(state interface{}) error { + auth := authn.FromConfig(authn.AuthConfig{ + Username: f.username, + Password: f.password, + }) + + options := []crane.Option{crane.WithAuth(auth)} + if config.UseUnsecure() { + options = append(options, crane.Insecure) + } + + img, err := crane.Pull(f.url, options...) + if err != nil { + return fmt.Errorf("failed to pull the state artifact: %v", err) + } + + tarContent := new(bytes.Buffer) + if err := crane.Export(img, tarContent); err != nil { + return fmt.Errorf("failed to export the state artifact: %v", err) + } + + tr := tar.NewReader(tarContent) + var artifactsJSON []byte + + for { + hdr, err := tr.Next() + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("failed to read the tar archive: %v", err) + } + + if hdr.Name == "artifacts.json" { + artifactsJSON, err = io.ReadAll(tr) + if err != nil { + return fmt.Errorf("failed to read the artifacts.json file: %v", err) + } + break + } + } + if artifactsJSON == nil { + return fmt.Errorf("artifacts.json not found in the state artifact") + } + err = json.Unmarshal(artifactsJSON, &state) + if err != nil { + return fmt.Errorf("failed to parse the artifacts.json file: %v", err) + } + return nil +} + +func FromJSON(data []byte, reg StateReader) (StateReader, error) { + if err := json.Unmarshal(data, ®); err != nil { + fmt.Print("Error in unmarshalling") + return nil, err + } + if reg.GetRegistryURL() == "" { + return nil, fmt.Errorf("registry URL is required") + } + return reg, nil +} diff --git a/internal/state/helpers.go b/internal/state/helpers.go new file mode 100644 index 0000000..331216d --- /dev/null +++ b/internal/state/helpers.go @@ -0,0 +1,50 @@ +package state + +import ( + "fmt" + + "container-registry.com/harbor-satellite/internal/config" + "container-registry.com/harbor-satellite/internal/utils" + "github.com/rs/zerolog" +) + +func processInput(input, username, password string, log *zerolog.Logger) (StateFetcher, error) { + + if utils.IsValidURL(input) { + return processURLInput(utils.FormatRegistryURL(input), username, password, log) + } + + log.Info().Msg("Input is not a valid URL, checking if it is a file path") + if err := validateFilePath(input, log); err != nil { + return nil, err + } + + return processFileInput(input, username, password, log) +} + +func validateFilePath(path string, log *zerolog.Logger) error { + if utils.HasInvalidPathChars(path) { + log.Error().Msg("Path contains invalid characters") + return fmt.Errorf("invalid file path: %s", path) + } + if err := utils.GetAbsFilePath(path); err != nil { + log.Error().Err(err).Msg("No file found") + return fmt.Errorf("no file found: %s", path) + } + return nil +} + +func processURLInput(input, username, password string, log *zerolog.Logger) (StateFetcher, error) { + log.Info().Msg("Input is a valid URL") + config.SetRemoteRegistryURL(input) + + stateArtifactFetcher := NewURLStateFetcher(input, username, password) + + return stateArtifactFetcher, nil +} + +func processFileInput(input, username, password string, log *zerolog.Logger) (StateFetcher, error) { + log.Info().Msg("Input is a valid file path") + stateArtifactFetcher := NewFileStateFetcher(input, username, password) + return stateArtifactFetcher, nil +} diff --git a/internal/state/replicator.go b/internal/state/replicator.go new file mode 100644 index 0000000..cb72e40 --- /dev/null +++ b/internal/state/replicator.go @@ -0,0 +1,122 @@ +package state + +import ( + "context" + "fmt" + + "container-registry.com/harbor-satellite/logger" + "github.com/google/go-containerregistry/pkg/authn" + "github.com/google/go-containerregistry/pkg/crane" + "github.com/google/go-containerregistry/pkg/v1/mutate" + "github.com/google/go-containerregistry/pkg/v1/types" +) + +type Replicator interface { + // Replicate copies images from the source registry to the local registry. + Replicate(ctx context.Context, replicationEntities []Entity) error + // DeleteReplicationEntity deletes the image from the local registry. + DeleteReplicationEntity(ctx context.Context, replicationEntity []Entity) error +} + +type BasicReplicator struct { + username string + password string + useUnsecure bool + remoteRegistryURL string + sourceRegistry string +} + +func NewBasicReplicator(username, password, zotURL, sourceRegistry string, useUnsecure bool) Replicator { + return &BasicReplicator{ + username: username, + password: password, + useUnsecure: useUnsecure, + remoteRegistryURL: zotURL, + sourceRegistry: sourceRegistry, + } +} + +// Entity represents an image or artifact which needs to be handled by the replicator +type Entity struct { + Name string + Repository string + Tag string + Digest string +} + +func (e Entity) GetName() string { + return e.Name +} + +func (e Entity) GetRepository() string { + return e.Repository +} + +func (e Entity) GetTag() string { + return e.Tag +} + +// Replicate replicates images from the source registry to the Zot registry. +func (r *BasicReplicator) Replicate(ctx context.Context, replicationEntities []Entity) error { + log := logger.FromContext(ctx) + auth := authn.FromConfig(authn.AuthConfig{ + Username: r.username, + Password: r.password, + }) + + options := []crane.Option{crane.WithAuth(auth)} + if r.useUnsecure { + options = append(options, crane.Insecure) + } + + for _, replicationEntity := range replicationEntities { + + log.Info().Msgf("Pulling image %s from repository %s at registry %s with tag %s", replicationEntity.GetName(), replicationEntity.GetRepository(), r.sourceRegistry, replicationEntity.GetTag()) + + // Pull the image from the source registry + srcImage, err := crane.Pull(fmt.Sprintf("%s/%s/%s:%s", r.sourceRegistry, replicationEntity.GetRepository(), replicationEntity.GetName(), replicationEntity.GetTag()), options...) + if err != nil { + log.Error().Msgf("Failed to pull image: %v", err) + return err + } + + // Convert Docker manifest to OCI manifest + ociImage := mutate.MediaType(srcImage, types.OCIManifestSchema1) + + // Push the converted OCI image to the Zot registry + err = crane.Push(ociImage, fmt.Sprintf("%s/%s/%s:%s", r.remoteRegistryURL, replicationEntity.GetRepository(), replicationEntity.GetName(), replicationEntity.GetTag()), options...) + if err != nil { + log.Error().Msgf("Failed to push image: %v", err) + return err + } + log.Info().Msgf("Image %s pushed successfully", replicationEntity.GetName()) + + } + return nil +} + +func (r *BasicReplicator) DeleteReplicationEntity(ctx context.Context, replicationEntity []Entity) error { + log := logger.FromContext(ctx) + auth := authn.FromConfig(authn.AuthConfig{ + Username: r.username, + Password: r.password, + }) + + options := []crane.Option{crane.WithAuth(auth)} + if r.useUnsecure { + options = append(options, crane.Insecure) + } + + for _, entity := range replicationEntity { + log.Info().Msgf("Deleting image %s from repository %s at registry %s with tag %s", entity.GetName(), entity.GetRepository(), r.remoteRegistryURL, entity.GetTag()) + + err := crane.Delete(fmt.Sprintf("%s/%s/%s:%s", r.remoteRegistryURL, entity.GetRepository(), entity.GetName(), entity.GetTag()), options...) + if err != nil { + log.Error().Msgf("Failed to delete image: %v", err) + return err + } + log.Info().Msgf("Image %s deleted successfully", entity.GetName()) + } + + return nil +} diff --git a/internal/state/state.go b/internal/state/state.go new file mode 100644 index 0000000..b191785 --- /dev/null +++ b/internal/state/state.go @@ -0,0 +1,98 @@ +package state + +import ( + "fmt" + "strings" +) + +// Registry defines an interface for registry operations +type StateReader interface { + // GetRegistryURL returns the URL of the registry after removing the "https://" or "http://" prefix if present and the trailing "/" + GetRegistryURL() string + // GetArtifacts returns the list of artifacts that needs to be pulled + GetArtifacts() []ArtifactReader + // GetArtifactByRepository takes in the repository name and returns the artifact associated with it + GetArtifactByRepository(repo string) (ArtifactReader, error) + // Compare the state artifact with the new state artifact + HasStateChanged(newState StateReader) bool + // GetArtifactByName takes in the name of the artifact and returns the artifact associated with it + GetArtifactByNameAndTag(name, tag string) ArtifactReader + // SetArtifacts sets the artifacts in the state + SetArtifacts(artifacts []ArtifactReader) +} + +type State struct { + Registry string `json:"registry"` + Artifacts []Artifact `json:"artifacts"` +} + +func NewState() StateReader { + state := &State{} + return state +} + +func (a *State) GetRegistryURL() string { + registry := a.Registry + registry = strings.TrimPrefix(registry, "https://") + registry = strings.TrimPrefix(registry, "http://") + registry = strings.TrimSuffix(registry, "/") + return registry +} + +func (a *State) GetArtifacts() []ArtifactReader { + var artifacts_reader []ArtifactReader + for i := range a.Artifacts { + artifacts_reader = append(artifacts_reader, &a.Artifacts[i]) + } + return artifacts_reader +} + +func (a *State) GetArtifactByRepository(repo string) (ArtifactReader, error) { + for i := range a.Artifacts { + if a.Artifacts[i].GetRepository() == repo { + return &a.Artifacts[i], nil + } + } + return nil, fmt.Errorf("artifact not found in the list") +} + +func (a *State) HasStateChanged(newState StateReader) bool { + if a.GetRegistryURL() != newState.GetRegistryURL() { + return true + } + artifacts := a.GetArtifacts() + newArtifacts := newState.GetArtifacts() + if len(artifacts) != len(newArtifacts) { + return true + } + for i, artifact := range artifacts { + if artifact.HasChanged(newArtifacts[i]) { + return true + } + } + return false +} + +func (a *State) GetArtifactByNameAndTag(name, tag string) ArtifactReader { + for i := range a.Artifacts { + if a.Artifacts[i].GetName() == name { + for _, t := range a.Artifacts[i].GetTags() { + if t == tag { + return &a.Artifacts[i] + } + } + } + } + return nil +} + +func (a *State) SetArtifacts(artifacts []ArtifactReader) { + // Clear existing artifacts + a.Artifacts = []Artifact{} + + // Set new artifacts + a.Artifacts = make([]Artifact, len(artifacts)) + for i, artifact := range artifacts { + a.Artifacts[i] = *artifact.(*Artifact) + } +} diff --git a/internal/state/state_process.go b/internal/state/state_process.go new file mode 100644 index 0000000..4c10d76 --- /dev/null +++ b/internal/state/state_process.go @@ -0,0 +1,245 @@ +package state + +import ( + "context" + "fmt" + "sync" + + "container-registry.com/harbor-satellite/internal/notifier" + "container-registry.com/harbor-satellite/internal/utils" + "container-registry.com/harbor-satellite/logger" + "github.com/rs/zerolog" +) + +const FetchAndReplicateStateProcessName string = "fetch-replicate-state-process" + +const DefaultFetchAndReplicateStateTimePeriod string = "00h00m010s" + +type FetchAndReplicateAuthConfig struct { + Username string + Password string + UseUnsecure bool + RemoteRegistryURL string + SourceRegistry string +} + +type FetchAndReplicateStateProcess struct { + id uint64 + name string + cronExpr string + isRunning bool + stateMap []StateMap + notifier notifier.Notifier + mu *sync.Mutex + authConfig FetchAndReplicateAuthConfig +} + +type StateMap struct { + url string + State StateReader + Entities []Entity +} + +func NewStateMap(url []string) []StateMap { + var stateMap []StateMap + for _, u := range url { + stateMap = append(stateMap, StateMap{url: u, State: nil, Entities: nil}) + } + return stateMap +} + +func NewFetchAndReplicateStateProcess(id uint64, cronExpr string, notifier notifier.Notifier, username, password, remoteRegistryURL, sourceRegistryURL string, useUnsecure bool, states []string) *FetchAndReplicateStateProcess { + return &FetchAndReplicateStateProcess{ + id: id, + name: FetchAndReplicateStateProcessName, + cronExpr: cronExpr, + isRunning: false, + notifier: notifier, + mu: &sync.Mutex{}, + stateMap: NewStateMap(states), + authConfig: FetchAndReplicateAuthConfig{ + Username: username, + Password: password, + UseUnsecure: useUnsecure, + RemoteRegistryURL: remoteRegistryURL, + SourceRegistry: sourceRegistryURL, + }, + } +} + +func (f *FetchAndReplicateStateProcess) Execute(ctx context.Context) error { + log := logger.FromContext(ctx) + if !f.start() { + log.Warn().Msg("Process already running") + return fmt.Errorf("process %s already running", f.GetName()) + } + defer f.stop() + + for i := range f.stateMap { + log.Info().Msgf("Processing state for %s", f.stateMap[i].url) + stateFetcher, err := processInput(f.stateMap[i].url, f.authConfig.Username, f.authConfig.Password, log) + if err != nil { + log.Error().Err(err).Msg("Error processing input") + return err + } + newStateFetched, err := f.FetchAndProcessState(stateFetcher, log) + if err != nil { + log.Error().Err(err).Msg("Error fetching state") + return err + } + log.Info().Msgf("State fetched successfully for %s", f.stateMap[i].url) + deleteEntity, replicateEntity, newState := f.GetChanges(newStateFetched, log, f.stateMap[i].Entities) + f.LogChanges(deleteEntity, replicateEntity, log) + if err := f.notifier.Notify(); err != nil { + log.Error().Err(err).Msg("Error sending notification") + } + + replicator := NewBasicReplicator(f.authConfig.Username, f.authConfig.Password, f.authConfig.RemoteRegistryURL, f.authConfig.SourceRegistry, f.authConfig.UseUnsecure) + // Delete the entities from the remote registry + if err := replicator.DeleteReplicationEntity(ctx, deleteEntity); err != nil { + log.Error().Err(err).Msg("Error deleting entities") + return err + } + // Replicate the entities to the remote registry + if err := replicator.Replicate(ctx, replicateEntity); err != nil { + log.Error().Err(err).Msg("Error replicating state") + return err + } + // Update the state directly in the slice + f.stateMap[i].State = newState + f.stateMap[i].Entities = FetchEntitiesFromState(newState) + } + return nil +} + +func (f *FetchAndReplicateStateProcess) GetChanges(newState StateReader, log *zerolog.Logger, oldEntites []Entity) ([]Entity, []Entity, StateReader) { + log.Info().Msg("Getting changes") + // Remove artifacts with null tags from the new state + newState = f.RemoveNullTagArtifacts(newState) + newEntites := FetchEntitiesFromState(newState) + + var entityToDelete []Entity + var entityToReplicate []Entity + + if oldEntites == nil { + log.Warn().Msg("Old state has zero entites, replicating the complete state") + return entityToDelete, newEntites, newState + } + + // Create maps for quick lookups + oldEntityMap := make(map[string]Entity) + for _, oldEntity := range oldEntites { + oldEntityMap[oldEntity.Name+"|"+oldEntity.Tag] = oldEntity + } + + // Check new artifacts and update lists + for _, newEntity := range newEntites { + nameTagKey := newEntity.Name + "|" + newEntity.Tag + oldEntity, exists := oldEntityMap[nameTagKey] + + if !exists { + // New artifact doesn't exist in old state, add to replication list + entityToReplicate = append(entityToReplicate, newEntity) + } else if newEntity.Digest != oldEntity.Digest { + // Artifact exists but has changed, add to both lists + entityToReplicate = append(entityToReplicate, newEntity) + entityToDelete = append(entityToDelete, oldEntity) + } + + // Remove processed old artifact from map + delete(oldEntityMap, nameTagKey) + } + + // Remaining artifacts in oldArtifactsMap should be deleted + for _, oldEntity := range oldEntityMap { + entityToDelete = append(entityToDelete, oldEntity) + } + + return entityToDelete, entityToReplicate, newState +} +func (f *FetchAndReplicateStateProcess) GetID() uint64 { + return f.id +} + +func (f *FetchAndReplicateStateProcess) GetName() string { + return f.name +} + +func (f *FetchAndReplicateStateProcess) GetCronExpr() string { + return fmt.Sprintf("@every %s", f.cronExpr) +} + +func (f *FetchAndReplicateStateProcess) IsRunning() bool { + return f.isRunning +} + +func (f *FetchAndReplicateStateProcess) start() bool { + f.mu.Lock() + defer f.mu.Unlock() + if f.isRunning { + return false + } + f.isRunning = true + return true +} + +func (f *FetchAndReplicateStateProcess) stop() { + f.mu.Lock() + defer f.mu.Unlock() + f.isRunning = false +} + +func (f *FetchAndReplicateStateProcess) RemoveNullTagArtifacts(state StateReader) StateReader { + var artifactsWithoutNullTags []ArtifactReader + for _, artifact := range state.GetArtifacts() { + if artifact.GetTags() != nil && len(artifact.GetTags()) != 0 { + artifactsWithoutNullTags = append(artifactsWithoutNullTags, artifact) + } + } + state.SetArtifacts(artifactsWithoutNullTags) + return state +} + +func ProcessState(state *StateReader) (*StateReader, error) { + for _, artifact := range (*state).GetArtifacts() { + repo, image, err := utils.GetRepositoryAndImageNameFromArtifact(artifact.GetRepository()) + if err != nil { + fmt.Printf("Error in getting repository and image name: %v", err) + return nil, err + } + artifact.SetRepository(repo) + artifact.SetName(image) + } + return state, nil +} + +func (f *FetchAndReplicateStateProcess) FetchAndProcessState(fetcher StateFetcher, log *zerolog.Logger) (StateReader, error) { + state := NewState() + err := fetcher.FetchStateArtifact(&state) + if err != nil { + log.Error().Err(err).Msg("Error fetching state artifact") + return nil, err + } + ProcessState(&state) + return state, nil +} + +func (f *FetchAndReplicateStateProcess) LogChanges(deleteEntity, replicateEntity []Entity, log *zerolog.Logger) { + log.Warn().Msgf("Total artifacts to delete: %d", len(deleteEntity)) + log.Warn().Msgf("Total artifacts to replicate: %d", len(replicateEntity)) +} + +func FetchEntitiesFromState(state StateReader) []Entity { + var entities []Entity + for _, artifact := range state.GetArtifacts() { + for _, tag := range artifact.GetTags() { + entities = append(entities, Entity{ + Name: artifact.GetName(), + Repository: artifact.GetRepository(), + Tag: tag, + Digest: artifact.GetDigest(), + }) + } + } + return entities +} diff --git a/internal/utils/utils.go b/internal/utils/utils.go index bac9c69..0b2ce7d 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -1,7 +1,6 @@ package utils import ( - "encoding/json" "errors" "fmt" "net" @@ -12,7 +11,6 @@ import ( "strings" "container-registry.com/harbor-satellite/internal/config" - "container-registry.com/harbor-satellite/internal/images" "container-registry.com/harbor-satellite/registry" ) @@ -46,6 +44,8 @@ func HandleOwnRegistry() error { // LaunchDefaultZotRegistry launches the default Zot registry using the Zot config path func LaunchDefaultZotRegistry() error { + defaultZotURL := fmt.Sprintf("%s:%s", "127.0.0.1", "8585") + config.SetZotURL(defaultZotURL) launch, err := registry.LaunchRegistry(config.GetZotConfigPath()) if !launch { return fmt.Errorf("error launching registry: %w", err) @@ -80,60 +80,48 @@ func HasInvalidPathChars(input string) bool { return strings.ContainsAny(input, "\\:*?\"<>|") } -// ParseImagesJsonFile parses the images.json file and decodes it into the ImageList struct -func ParseImagesJsonFile(absPath string, imagesList *images.ImageList) error { - file, err := os.Open(absPath) - if err != nil { - return err - } - defer file.Close() - - if err := json.NewDecoder(file).Decode(imagesList); err != nil { - return err +func GetRepositoryAndImageNameFromArtifact(repository string) (string, string, error) { + parts := strings.Split(repository, "/") + if len(parts) < 2 { + return "", "", fmt.Errorf("invalid repository format: %s. Expected format: repo/image", repository) } - return nil + repo := parts[0] + image := parts[1] + return repo, image, nil } -// Set registry environment variables -func SetRegistryEnvVars(imageList images.ImageList) error { - if !IsValidURL(imageList.RegistryURL) { - return fmt.Errorf("invalid registry url format in images.json") +func FormatDuration(input string) (string, error) { + seconds, err := strconv.Atoi(input) // Convert input string to an integer + if err != nil { + return "", errors.New("invalid input: not a valid number") } - registryURL := imageList.RegistryURL - registryParts := strings.Split(registryURL, "/") - if len(registryParts) < 3 { - return fmt.Errorf("invalid registryUrl format in images.json") + if seconds < 0 { + return "", errors.New("invalid input: seconds cannot be negative") } - os.Setenv("REGISTRY", registryParts[2]) - config.SetRegistry(registryParts[2]) + hours := seconds / 3600 + minutes := (seconds % 3600) / 60 + secondsRemaining := seconds % 60 - if len(imageList.Repositories) > 0 { - os.Setenv("REPOSITORY", imageList.Repositories[0].Repository) - config.SetRepository(imageList.Repositories[0].Repository) - } else { - return fmt.Errorf("no repositories found in images.json") + var result string + + if hours > 0 { + result += strconv.Itoa(hours) + "h" + } + if minutes > 0 { + result += strconv.Itoa(minutes) + "m" + } + if secondsRemaining > 0 || result == "" { + result += strconv.Itoa(secondsRemaining) + "s" } - return nil + return result, nil } -// SetUrlConfig sets the URL configuration for the input URL and sets the environment variables -func SetUrlConfig(input string) { - os.Setenv("USER_INPUT", input) - config.SetUserInput(input) - parts := strings.SplitN(input, "://", 2) - scheme := parts[0] + "://" - os.Setenv("SCHEME", scheme) - config.SetScheme(scheme) - registryAndPath := parts[1] - registryParts := strings.Split(registryAndPath, "/") - os.Setenv("REGISTRY", registryParts[0]) - config.SetRegistry(registryParts[0]) - os.Setenv("API_VERSION", registryParts[1]) - config.SetAPIVersion(registryParts[1]) - os.Setenv("REPOSITORY", registryParts[2]) - config.SetRepository(registryParts[2]) - os.Setenv("IMAGE", registryParts[3]) - config.SetImage(registryParts[3]) +// FormatRegistryURL formats the registry URL by trimming the "https://" or "http://" prefix if present +func FormatRegistryURL(url string) string { + // Trim the "https://" or "http://" prefix if present + url = strings.TrimPrefix(url, "https://") + url = strings.TrimPrefix(url, "http://") + return url } diff --git a/logger/logger.go b/logger/logger.go index 78664dc..55993e5 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -2,7 +2,9 @@ package logger import ( "context" + "fmt" "os" + "strings" "github.com/rs/zerolog" ) @@ -11,27 +13,50 @@ type contextKey string const loggerKey contextKey = "logger" -// AddLoggerToContext creates a new context with a zerolog logger for stdout adn stderr and sets the global log level. +// AddLoggerToContext creates a new context with a zerolog logger for stdout and stderr and sets the global log level. func AddLoggerToContext(ctx context.Context, logLevel string) context.Context { // Set log level to configured value - switch logLevel { - case "debug": - zerolog.SetGlobalLevel(zerolog.DebugLevel) - case "info": - zerolog.SetGlobalLevel(zerolog.InfoLevel) - case "warn": - zerolog.SetGlobalLevel(zerolog.WarnLevel) - case "error": - zerolog.SetGlobalLevel(zerolog.ErrorLevel) - case "fatal": - zerolog.SetGlobalLevel(zerolog.FatalLevel) - case "panic": - zerolog.SetGlobalLevel(zerolog.PanicLevel) - default: - zerolog.SetGlobalLevel(zerolog.InfoLevel) + level := getLogLevel(logLevel) + zerolog.SetGlobalLevel(level) + + // Create a custom console writer + output := zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: "2006-01-02 15:04:05"} + + // Customize the output for each log level + output.FormatLevel = func(i interface{}) string { + var l string + if ll, ok := i.(string); ok { + switch ll { + case "debug": + l = colorize(ll, 36) // cyan + case "info": + l = colorize(ll, 34) // blue + case "warn": + l = colorize(ll, 33) // yellow + case "error": + l = colorize(ll, 31) // red + case "fatal": + l = colorize(ll, 35) // magenta + case "panic": + l = colorize(ll, 41) // white on red background + default: + l = colorize(ll, 37) // white + } + } else { + if i == nil { + l = colorize("???", 37) // white + } else { + lStr := strings.ToUpper(fmt.Sprintf("%s", i)) + if len(lStr) > 3 { + lStr = lStr[:3] + } + l = lStr + } + } + return fmt.Sprintf("| %s |", l) } - logger := zerolog.New(os.Stderr).With().Timestamp().Logger() + logger := zerolog.New(output).With().Timestamp().Logger() ctx = context.WithValue(ctx, loggerKey, &logger) return ctx @@ -48,3 +73,28 @@ func FromContext(ctx context.Context) *zerolog.Logger { } return logger } + +// Helper function to get the log level +func getLogLevel(logLevel string) zerolog.Level { + switch logLevel { + case "debug": + return zerolog.DebugLevel + case "info": + return zerolog.InfoLevel + case "warn": + return zerolog.WarnLevel + case "error": + return zerolog.ErrorLevel + case "fatal": + return zerolog.FatalLevel + case "panic": + return zerolog.PanicLevel + default: + return zerolog.InfoLevel + } +} + +// Helper function to colorize text +func colorize(s string, color int) string { + return fmt.Sprintf("\x1b[%dm%s\x1b[0m", color, s) +} diff --git a/main.go b/main.go index 68d8d1f..2bff96c 100644 --- a/main.go +++ b/main.go @@ -8,11 +8,9 @@ import ( "syscall" "container-registry.com/harbor-satellite/internal/config" - "container-registry.com/harbor-satellite/internal/images" - "container-registry.com/harbor-satellite/internal/replicate" "container-registry.com/harbor-satellite/internal/satellite" + "container-registry.com/harbor-satellite/internal/scheduler" "container-registry.com/harbor-satellite/internal/server" - "container-registry.com/harbor-satellite/internal/store" "container-registry.com/harbor-satellite/internal/utils" "container-registry.com/harbor-satellite/logger" "golang.org/x/sync/errgroup" @@ -39,7 +37,6 @@ func run() error { g, ctx := errgroup.WithContext(ctx) ctx = logger.AddLoggerToContext(ctx, config.GetLogLevel()) log := logger.FromContext(ctx) - log.Info().Msg("Satellite starting") // Set up router and app app := setupServerApp(ctx, log) @@ -50,22 +47,21 @@ func run() error { if err := handleRegistrySetup(g, log, cancel); err != nil { return err } - - // Process Input (file or URL) - fetcher, err := processInput(ctx, log) + scheduler := scheduler.NewBasicScheduler(ctx) + ctx = context.WithValue(ctx, scheduler.GetSchedulerKey(), scheduler) + err := scheduler.Start() if err != nil { + log.Error().Err(err).Msg("Error starting scheduler") return err } - ctx, storer := store.NewInMemoryStore(ctx, fetcher) - replicator := replicate.NewReplicator(ctx) - satelliteService := satellite.NewSatellite(ctx, storer, replicator) + satelliteService := satellite.NewSatellite(ctx, scheduler.GetSchedulerKey()) g.Go(func() error { return satelliteService.Run(ctx) }) - log.Info().Msg("Satellite running") + log.Info().Msg("Startup complete 🚀") return g.Wait() } @@ -89,7 +85,6 @@ func setupServerApp(ctx context.Context, log *zerolog.Logger) *server.App { router, ctx, log, - config.AppConfig, &server.MetricsRegistrar{}, &server.DebugRegistrar{}, &satellite.SatelliteRegistrar{}, @@ -116,45 +111,3 @@ func handleRegistrySetup(g *errgroup.Group, log *zerolog.Logger, cancel context. } return nil } - -func processInput(ctx context.Context, log *zerolog.Logger) (store.ImageFetcher, error) { - input := config.GetInput() - if !utils.IsValidURL(input) { - log.Info().Msg("Input is not a valid URL, checking if it is a file path") - if err := validateFilePath(config.GetInput(), log); err != nil { - return nil, err - } - return setupFileFetcher(ctx, log) - } - - log.Info().Msg("Input is a valid URL") - fetcher := store.RemoteImageListFetcher(ctx, input) - utils.SetUrlConfig(input) - return fetcher, nil -} - -func validateFilePath(path string, log *zerolog.Logger) error { - if utils.HasInvalidPathChars(path) { - log.Error().Msg("Path contains invalid characters") - return fmt.Errorf("invalid file path: %s", path) - } - if err := utils.GetAbsFilePath(path); err != nil { - log.Error().Err(err).Msg("No file found") - return fmt.Errorf("no file found: %s", path) - } - return nil -} - -func setupFileFetcher(ctx context.Context, log *zerolog.Logger) (store.ImageFetcher, error) { - fetcher := store.FileImageListFetcher(ctx, config.GetInput()) - var imagesList images.ImageList - if err := utils.ParseImagesJsonFile(config.GetInput(), &imagesList); err != nil { - log.Error().Err(err).Msg("Error parsing images.json file") - return nil, err - } - if err := utils.SetRegistryEnvVars(imagesList); err != nil { - log.Error().Err(err).Msg("Error setting registry environment variables") - return nil, err - } - return fetcher, nil -}