diff --git a/internal/satellite/satellite.go b/internal/satellite/satellite.go index 7601a6b..ef6bdf1 100644 --- a/internal/satellite/satellite.go +++ b/internal/satellite/satellite.go @@ -42,7 +42,7 @@ func (s *Satellite) Run(ctx context.Context) error { // Creating a process to fetch and replicate the state fetchAndReplicateStateProcess := state.NewFetchAndReplicateStateProcess(scheduler.NextID(), cronExpr, s.stateArtifactFetcher, notifier) // Add the process to the scheduler - scheduler.Schedule(&fetchAndReplicateStateProcess) + scheduler.Schedule(fetchAndReplicateStateProcess) return nil } diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 552f7a0..f12c20e 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -46,14 +46,14 @@ type BasicScheduler struct { ctx context.Context } -func NewBasicScheduler(ctx *context.Context) Scheduler { +func NewBasicScheduler(ctx context.Context) Scheduler { return &BasicScheduler{ cron: cron.New(), processes: make(map[string]Process), locks: make(map[string]*sync.Mutex), mu: sync.Mutex{}, name: BasicSchedulerKey, - ctx: *ctx, + ctx: ctx, } } @@ -70,10 +70,8 @@ func (s *BasicScheduler) Schedule(process Process) error { log.Info().Msgf("Scheduling process %s", process.GetName()) s.mu.Lock() defer s.mu.Unlock() - for _, processes := range s.processes { - if process.GetName() == processes.GetName() { - return fmt.Errorf("process with Name %s already exists", process.GetName()) - } + if _, exists := s.processes[process.GetName()]; exists { + return fmt.Errorf("process %s already exists", process.GetName()) } // Add the process to the scheduler _, err := s.cron.AddFunc(process.GetCronExpr(), func() { diff --git a/internal/state/artifact.go b/internal/state/artifact.go index 40771a7..541db4d 100644 --- a/internal/state/artifact.go +++ b/internal/state/artifact.go @@ -19,8 +19,12 @@ type Artifact struct { Hash string `json:"hash"` } -func NewArtifact() ArtifactReader { - return &Artifact{} +func NewArtifact(repository, tag, hash string) ArtifactReader { + return &Artifact{ + Repository: repository, + Tag: tag, + Hash: hash, + } } func (a *Artifact) GetRepository() string { diff --git a/internal/state/fetcher.go b/internal/state/fetcher.go new file mode 100644 index 0000000..60a22f9 --- /dev/null +++ b/internal/state/fetcher.go @@ -0,0 +1,158 @@ +package state + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + + "container-registry.com/harbor-satellite/internal/config" + "oras.land/oras-go/v2" + "oras.land/oras-go/v2/content/file" + "oras.land/oras-go/v2/registry/remote" + "oras.land/oras-go/v2/registry/remote/auth" + "oras.land/oras-go/v2/registry/remote/retry" +) + + +type StateFetcher interface { + // Fetches the state artifact from the registry + FetchStateArtifact() (StateReader, error) +} + +type URLStateFetcher struct { + url string + group_name string + state_artifact_name string + state_artifact_reader StateReader +} + +func NewURLStateFetcher() StateFetcher { + url := config.GetRemoteRegistryURL() + // Trim the "https://" or "http://" prefix if present + url = strings.TrimPrefix(url, "https://") + url = strings.TrimPrefix(url, "http://") + state_artifact_reader := NewState() + return &URLStateFetcher{ + url: url, + group_name: config.GetGroupName(), + state_artifact_name: config.GetStateArtifactName(), + state_artifact_reader: state_artifact_reader, + } +} + +type FileStateArtifactFetcher struct { + filePath string + group_name string + state_artifact_name string + state_artifact_reader StateReader +} + +func NewFileStateFetcher() StateFetcher { + filePath := config.GetInput() + state_artifact_reader := NewState() + return &FileStateArtifactFetcher{ + filePath: filePath, + group_name: config.GetGroupName(), + state_artifact_name: config.GetStateArtifactName(), + state_artifact_reader: state_artifact_reader, + } +} + +func (f *FileStateArtifactFetcher) FetchStateArtifact() (StateReader, error) { + /// Read the state artifact file from the file path + content, err := os.ReadFile(f.filePath) + if err != nil { + return nil, fmt.Errorf("failed to read the state artifact file: %v", err) + } + state_reader, err := FromJSON(content, f.state_artifact_reader) + if err != nil { + return nil, fmt.Errorf("failed to parse the state artifact file: %v", err) + } + return state_reader, nil +} + +func (f *URLStateFetcher) FetchStateArtifact() (StateReader, error) { + cwd, err := os.Getwd() + if err != nil { + return nil, fmt.Errorf("failed to get current working directory: %v", err) + } + // Creating a file store in the current working directory will be deleted later after reading the state artifact + fs, err := file.New(fmt.Sprintf("%s/state-artifact", cwd)) + if err != nil { + return nil, fmt.Errorf("failed to create file store: %v", err) + } + defer fs.Close() + + ctx := context.Background() + + repo, err := remote.NewRepository(fmt.Sprintf("%s/%s/%s", f.url, f.group_name, f.state_artifact_name)) + if err != nil { + return nil, fmt.Errorf("failed to create remote repository: %v", err) + } + + // Setting up the authentication for the remote registry + repo.Client = &auth.Client{ + Client: retry.DefaultClient, + Cache: auth.NewCache(), + Credential: auth.StaticCredential( + f.url, + auth.Credential{ + Username: config.GetHarborUsername(), + Password: config.GetHarborPassword(), + }, + ), + } + // Copy from the remote repository to the file store + tag := "latest" + _, err = oras.Copy(ctx, repo, tag, fs, tag, oras.DefaultCopyOptions) + if err != nil { + return nil, fmt.Errorf("failed to copy from remote repository to file store: %v", err) + } + stateArtifactDir := filepath.Join(cwd, "state-artifact") + + var state_reader StateReader + // Find the state artifact file in the state-artifact directory that is created temporarily + err = filepath.Walk(stateArtifactDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if filepath.Ext(info.Name()) == ".json" { + content, err := os.ReadFile(path) + if err != nil { + return err + } + state_reader, err = FromJSON(content, f.state_artifact_reader) + if err != nil { + return fmt.Errorf("failed to parse the state artifact file: %v", err) + } + return nil + } + return nil + }) + + if err != nil { + return nil, fmt.Errorf("failed to read the state artifact file: %v", err) + } + // Clean up everything inside the state-artifact folder + err = os.RemoveAll(stateArtifactDir) + if err != nil { + return nil, fmt.Errorf("failed to remove state-artifact directory: %v", err) + } + return state_reader, nil +} + +// FromJSON parses the input JSON data into a StateArtifactReader +func FromJSON(data []byte, reg StateReader) (StateReader, error) { + if err := json.Unmarshal(data, ®); err != nil { + fmt.Print("Error in unmarshalling") + return nil, err + } + // Validation + if reg.GetRegistryURL()== "" { + return nil, fmt.Errorf("registry URL is required") + } + return reg, nil +} diff --git a/internal/state/replicator.go b/internal/state/replicator.go index a0d33a1..6dec2f0 100644 --- a/internal/state/replicator.go +++ b/internal/state/replicator.go @@ -18,34 +18,20 @@ type Replicator interface { } type BasicReplicator struct { - username string - password string - use_unsecure bool - zot_url string - state_reader StateReader + username string + password string + useUnsecure bool + zotURL string + stateReader StateReader } -type ImageInfo struct { - Name string `json:"name"` -} - -type Repository struct { - Repository string `json:"repository"` - Images []ImageInfo `json:"images"` -} - -type RegistryInfo struct { - RegistryUrl string `json:"registryUrl"` - Repositories []Repository `json:"repositories"` -} - -func BasicNewReplicator(state_reader StateReader) Replicator { +func NewBasicReplicator(state_reader StateReader) Replicator { return &BasicReplicator{ - username: config.GetHarborUsername(), - password: config.GetHarborPassword(), - use_unsecure: config.UseUnsecure(), - zot_url: config.GetZotURL(), - state_reader: state_reader, + username: config.GetHarborUsername(), + password: config.GetHarborPassword(), + useUnsecure: config.UseUnsecure(), + zotURL: config.GetZotURL(), + stateReader: state_reader, } } @@ -57,28 +43,29 @@ func (r *BasicReplicator) Replicate(ctx context.Context) error { }) options := []crane.Option{crane.WithAuth(auth)} - if r.use_unsecure { + if r.useUnsecure { options = append(options, crane.Insecure) } - source_registry := r.state_reader.GetRegistryURL() - for _, artifact := range r.state_reader.GetArtifacts() { + sourceRegistry := r.stateReader.GetRegistryURL() + + for _, artifact := range r.stateReader.GetArtifacts() { // Extract the image name from the repository of the artifact repo, image, err := utils.GetRepositoryAndImageNameFromArtifact(artifact.GetRepository()) if err != nil { log.Error().Msgf("Error getting repository and image name: %v", err) return err } - log.Info().Msgf("Pulling image %s from repository %s at registry %s", image, repo, source_registry) + log.Info().Msgf("Pulling image %s from repository %s at registry %s", image, repo, sourceRegistry) // Pull the image at the given repository at the source registry - srcImage, err := crane.Pull(fmt.Sprintf("%s/%s/%s", source_registry, repo, image), options...) + srcImage, err := crane.Pull(fmt.Sprintf("%s/%s/%s", sourceRegistry, repo, image), options...) if err != nil { - logger.FromContext(ctx).Error().Msgf("Failed to pull image: %v", err) + log.Error().Msgf("Failed to pull image: %v", err) return err } // Push the image to the local registry - err = crane.Push(srcImage, fmt.Sprintf("%s/%s", r.zot_url, image), options...) + err = crane.Push(srcImage, fmt.Sprintf("%s/%s", r.zotURL, image), options...) if err != nil { - logger.FromContext(ctx).Error().Msgf("Failed to push image: %v", err) + log.Error().Msgf("Failed to push image: %v", err) return err } log.Info().Msgf("Image %s pushed successfully", image) diff --git a/internal/state/state.go b/internal/state/state.go index f8d063e..3d9bdeb 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -1,25 +1,15 @@ package state import ( - "context" - "encoding/json" "fmt" - "os" - "path/filepath" - - "container-registry.com/harbor-satellite/internal/config" - "oras.land/oras-go/v2" - "oras.land/oras-go/v2/content/file" - "oras.land/oras-go/v2/registry/remote" - "oras.land/oras-go/v2/registry/remote/auth" - "oras.land/oras-go/v2/registry/remote/retry" + "strings" ) // Registry defines an interface for registry operations type StateReader interface { // GetRegistryURL returns the URL of the registry after removing the "https://" or "http://" prefix if present and the trailing "/" GetRegistryURL() string - // GetRegistryType returns the list of artifacts that needs to be pulled + // GetArtifacts returns the list of artifacts that needs to be pulled GetArtifacts() []ArtifactReader // GetArtifactByRepository takes in the repository name and returns the artifact associated with it GetArtifactByRepository(repo string) (ArtifactReader, error) @@ -39,32 +29,27 @@ func NewState() StateReader { func (a *State) GetRegistryURL() string { registry := a.Registry - if len(registry) >= 8 && registry[:8] == "https://" { - registry = registry[8:] - } else if len(registry) >= 7 && registry[:7] == "http://" { - registry = registry[7:] - } - if len(registry) > 0 && registry[len(registry)-1] == '/' { - registry = registry[:len(registry)-1] - } + registry = strings.TrimPrefix(registry, "https://") + registry = strings.TrimPrefix(registry, "http://") + registry = strings.TrimSuffix(registry, "/") return registry } func (a *State) GetArtifacts() []ArtifactReader { - var artifact_readers []ArtifactReader - for _, artifact := range a.Artifacts { - artifact_readers = append(artifact_readers, &artifact) + var artifacts_reader []ArtifactReader + for i := range a.Artifacts { + artifacts_reader = append(artifacts_reader, &a.Artifacts[i]) } - return artifact_readers + return artifacts_reader } func (a *State) GetArtifactByRepository(repo string) (ArtifactReader, error) { - for _, artifact := range a.Artifacts { - if artifact.GetRepository() == repo { - return &artifact, nil + for i := range a.Artifacts { + if a.Artifacts[i].GetRepository() == repo { + return &a.Artifacts[i], nil } } - return &Artifact{}, fmt.Errorf("artifact not found in the list") + return nil, fmt.Errorf("artifact not found in the list") } func (a *State) HasStateChanged(newState StateReader) bool { @@ -83,151 +68,3 @@ func (a *State) HasStateChanged(newState StateReader) bool { } return false } - -type StateFetcher interface { - // Fetches the state artifact from the registry - FetchStateArtifact() (StateReader, error) -} - -type URLStateFetcher struct { - url string - group_name string - state_artifact_name string - state_artifact_reader StateReader -} - -func NewURLStateFetcher() StateFetcher { - url := config.GetRemoteRegistryURL() - // Trim the "https://" or "http://" prefix if present - if len(url) >= 8 && url[:8] == "https://" { - url = url[8:] - } else if len(url) >= 7 && url[:7] == "http://" { - url = url[7:] - } - state_artifact_reader := NewState() - return &URLStateFetcher{ - url: url, - group_name: config.GetGroupName(), - state_artifact_name: config.GetStateArtifactName(), - state_artifact_reader: state_artifact_reader, - } -} - -type FileStateArtifactFetcher struct { - filePath string - group_name string - state_artifact_name string - state_artifact_reader StateReader -} - -func NewFileStateFetcher() StateFetcher { - filePath := config.GetInput() - state_artifact_reader := NewState() - return &FileStateArtifactFetcher{ - filePath: filePath, - group_name: config.GetGroupName(), - state_artifact_name: config.GetStateArtifactName(), - state_artifact_reader: state_artifact_reader, - } -} - -func (f *FileStateArtifactFetcher) FetchStateArtifact() (StateReader, error) { - /// Read the state artifact file from the file path - content, err := os.ReadFile(f.filePath) - if err != nil { - return nil, fmt.Errorf("failed to read the state artifact file: %v", err) - } - state_reader, err := FromJSON(content, f.state_artifact_reader.(*State)) - if err != nil { - return nil, fmt.Errorf("failed to parse the state artifact file: %v", err) - } - return state_reader, nil -} - -func (f *URLStateFetcher) FetchStateArtifact() (StateReader, error) { - cwd, err := os.Getwd() - if err != nil { - return nil, fmt.Errorf("failed to get current working directory: %v", err) - } - // Creating a file store in the current working directory will be deleted later after reading the state artifact - fs, err := file.New(fmt.Sprintf("%s/state-artifact", cwd)) - if err != nil { - return nil, fmt.Errorf("failed to create file store: %v", err) - } - defer fs.Close() - - ctx := context.Background() - - repo, err := remote.NewRepository(fmt.Sprintf("%s/%s/%s", f.url, f.group_name, f.state_artifact_name)) - if err != nil { - return nil, fmt.Errorf("failed to create remote repository: %v", err) - } - - // Setting up the authentication for the remote registry - repo.Client = &auth.Client{ - Client: retry.DefaultClient, - Cache: auth.NewCache(), - Credential: auth.StaticCredential( - f.url, - auth.Credential{ - Username: config.GetHarborUsername(), - Password: config.GetHarborPassword(), - }, - ), - } - // Copy from the remote repository to the file store - tag := "latest" - _, err = oras.Copy(ctx, repo, tag, fs, tag, oras.DefaultCopyOptions) - if err != nil { - return nil, fmt.Errorf("failed to copy from remote repository to file store: %v", err) - } - stateArtifactDir := filepath.Join(cwd, "state-artifact") - - var state_reader StateReader - // Find the state artifact file in the state-artifact directory that is created temporarily - err = filepath.Walk(stateArtifactDir, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - if filepath.Ext(info.Name()) == ".json" { - content, err := os.ReadFile(path) - if err != nil { - return err - } - fmt.Printf("Contents of %s:\n", info.Name()) - fmt.Println(string(content)) - - state_reader, err = FromJSON(content, f.state_artifact_reader.(*State)) - if err != nil { - return fmt.Errorf("failed to parse the state artifact file: %v", err) - } - - } - return nil - }) - - if err != nil { - return nil, fmt.Errorf("failed to read the state artifact file: %v", err) - } - // Clean up everything inside the state-artifact folder - err = os.RemoveAll(stateArtifactDir) - if err != nil { - return nil, fmt.Errorf("failed to remove state-artifact directory: %v", err) - } - return state_reader, nil -} - -// FromJSON parses the input JSON data into a StateArtifactReader -func FromJSON(data []byte, reg *State) (StateReader, error) { - - if err := json.Unmarshal(data, ®); err != nil { - fmt.Print("Error in unmarshalling") - return nil, err - } - fmt.Print(reg) - // Validation - if reg.Registry == "" { - return nil, fmt.Errorf("registry URL is required") - } - return reg, nil -} diff --git a/internal/state/state_process.go b/internal/state/state_process.go index e9c4c44..ecc758d 100644 --- a/internal/state/state_process.go +++ b/internal/state/state_process.go @@ -3,6 +3,7 @@ package state import ( "context" "fmt" + "sync" "container-registry.com/harbor-satellite/internal/notifier" "container-registry.com/harbor-satellite/logger" @@ -20,29 +21,36 @@ type FetchAndReplicateStateProcess struct { isRunning bool stateReader StateReader notifier notifier.Notifier + mu *sync.Mutex } -func NewFetchAndReplicateStateProcess(id uint64, cronExpr string, stateFetcher StateFetcher, notifier notifier.Notifier) FetchAndReplicateStateProcess { - return FetchAndReplicateStateProcess{ +func NewFetchAndReplicateStateProcess(id uint64, cronExpr string, stateFetcher StateFetcher, notifier notifier.Notifier) *FetchAndReplicateStateProcess { + return &FetchAndReplicateStateProcess{ id: id, name: FetchAndReplicateStateProcessName, cronExpr: cronExpr, isRunning: false, stateArtifactFetcher: stateFetcher, notifier: notifier, + mu: &sync.Mutex{}, } } func (f *FetchAndReplicateStateProcess) Execute(ctx context.Context) error { log := logger.FromContext(ctx) + f.mu.Lock() if f.IsRunning() { + f.mu.Unlock() log.Warn().Msg("Process is already running") return fmt.Errorf("process %s is already running", f.GetName()) } log.Info().Msg("Starting process to fetch and replicate state") f.isRunning = true + f.mu.Unlock() defer func() { + f.mu.Lock() f.isRunning = false + f.mu.Unlock() }() newStateFetched, err := f.stateArtifactFetcher.FetchStateArtifact() @@ -58,11 +66,12 @@ func (f *FetchAndReplicateStateProcess) Execute(ctx context.Context) error { log.Error().Err(err).Msg("Error sending notification") } - replicator := BasicNewReplicator(newStateFetched) + replicator := NewBasicReplicator(newStateFetched) if err := replicator.Replicate(ctx); err != nil { log.Error().Err(err).Msg("Error replicating state") return err } + f.stateReader = newStateFetched return nil } diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 9e0323d..b427564 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -82,7 +82,7 @@ func HasInvalidPathChars(input string) bool { func GetRepositoryAndImageNameFromArtifact(repository string) (string, string, error) { parts := strings.Split(repository, "/") if len(parts) < 2 { - return "", "", fmt.Errorf("invalid repository format") + return "", "", fmt.Errorf("invalid repository format: %s. Expected format: repo/image", repository) } repo := parts[0] image := parts[1] diff --git a/logger/logger.go b/logger/logger.go index 69dfadb..55993e5 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -46,7 +46,11 @@ func AddLoggerToContext(ctx context.Context, logLevel string) context.Context { if i == nil { l = colorize("???", 37) // white } else { - l = strings.ToUpper(fmt.Sprintf("%s", i))[0:3] + lStr := strings.ToUpper(fmt.Sprintf("%s", i)) + if len(lStr) > 3 { + lStr = lStr[:3] + } + l = lStr } } return fmt.Sprintf("| %s |", l) diff --git a/main.go b/main.go index 95218e2..480a748 100644 --- a/main.go +++ b/main.go @@ -48,7 +48,7 @@ func run() error { if err := handleRegistrySetup(g, log, cancel); err != nil { return err } - scheduler := scheduler.NewBasicScheduler(&ctx) + scheduler := scheduler.NewBasicScheduler(ctx) ctx = context.WithValue(ctx, scheduler.GetSchedulerKey(), scheduler) err := scheduler.Start() if err != nil {