Skip to content

Commit

Permalink
coderabbit fixes for fetcher and replicator
Browse files Browse the repository at this point in the history
  • Loading branch information
Mehul-Kumar-27 committed Sep 29, 2024
1 parent b6b5dea commit 7b35466
Show file tree
Hide file tree
Showing 10 changed files with 224 additions and 227 deletions.
2 changes: 1 addition & 1 deletion internal/satellite/satellite.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s *Satellite) Run(ctx context.Context) error {
// Creating a process to fetch and replicate the state
fetchAndReplicateStateProcess := state.NewFetchAndReplicateStateProcess(scheduler.NextID(), cronExpr, s.stateArtifactFetcher, notifier)
// Add the process to the scheduler
scheduler.Schedule(&fetchAndReplicateStateProcess)
scheduler.Schedule(fetchAndReplicateStateProcess)

return nil
}
10 changes: 4 additions & 6 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ type BasicScheduler struct {
ctx context.Context
}

func NewBasicScheduler(ctx *context.Context) Scheduler {
func NewBasicScheduler(ctx context.Context) Scheduler {
return &BasicScheduler{
cron: cron.New(),
processes: make(map[string]Process),
locks: make(map[string]*sync.Mutex),
mu: sync.Mutex{},
name: BasicSchedulerKey,
ctx: *ctx,
ctx: ctx,
}
}

Expand All @@ -70,10 +70,8 @@ func (s *BasicScheduler) Schedule(process Process) error {
log.Info().Msgf("Scheduling process %s", process.GetName())
s.mu.Lock()
defer s.mu.Unlock()
for _, processes := range s.processes {
if process.GetName() == processes.GetName() {
return fmt.Errorf("process with Name %s already exists", process.GetName())
}
if _, exists := s.processes[process.GetName()]; exists {
return fmt.Errorf("process %s already exists", process.GetName())
}
// Add the process to the scheduler
_, err := s.cron.AddFunc(process.GetCronExpr(), func() {
Expand Down
8 changes: 6 additions & 2 deletions internal/state/artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ type Artifact struct {
Hash string `json:"hash"`
}

func NewArtifact() ArtifactReader {
return &Artifact{}
func NewArtifact(repository, tag, hash string) ArtifactReader {
return &Artifact{
Repository: repository,
Tag: tag,
Hash: hash,
}
}

func (a *Artifact) GetRepository() string {
Expand Down
158 changes: 158 additions & 0 deletions internal/state/fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package state

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"

"container-registry.com/harbor-satellite/internal/config"
"oras.land/oras-go/v2"
"oras.land/oras-go/v2/content/file"
"oras.land/oras-go/v2/registry/remote"
"oras.land/oras-go/v2/registry/remote/auth"
"oras.land/oras-go/v2/registry/remote/retry"
)


type StateFetcher interface {
// Fetches the state artifact from the registry
FetchStateArtifact() (StateReader, error)
}

type URLStateFetcher struct {
url string
group_name string
state_artifact_name string
state_artifact_reader StateReader
}

func NewURLStateFetcher() StateFetcher {
url := config.GetRemoteRegistryURL()
// Trim the "https://" or "http://" prefix if present
url = strings.TrimPrefix(url, "https://")
url = strings.TrimPrefix(url, "http://")
state_artifact_reader := NewState()
return &URLStateFetcher{
url: url,
group_name: config.GetGroupName(),
state_artifact_name: config.GetStateArtifactName(),
state_artifact_reader: state_artifact_reader,
}
}

type FileStateArtifactFetcher struct {
filePath string
group_name string
state_artifact_name string
state_artifact_reader StateReader
}

func NewFileStateFetcher() StateFetcher {
filePath := config.GetInput()
state_artifact_reader := NewState()
return &FileStateArtifactFetcher{
filePath: filePath,
group_name: config.GetGroupName(),
state_artifact_name: config.GetStateArtifactName(),
state_artifact_reader: state_artifact_reader,
}
}

func (f *FileStateArtifactFetcher) FetchStateArtifact() (StateReader, error) {
/// Read the state artifact file from the file path
content, err := os.ReadFile(f.filePath)
if err != nil {
return nil, fmt.Errorf("failed to read the state artifact file: %v", err)
}
state_reader, err := FromJSON(content, f.state_artifact_reader)
if err != nil {
return nil, fmt.Errorf("failed to parse the state artifact file: %v", err)
}
return state_reader, nil
}

func (f *URLStateFetcher) FetchStateArtifact() (StateReader, error) {
cwd, err := os.Getwd()
if err != nil {
return nil, fmt.Errorf("failed to get current working directory: %v", err)
}
// Creating a file store in the current working directory will be deleted later after reading the state artifact
fs, err := file.New(fmt.Sprintf("%s/state-artifact", cwd))
if err != nil {
return nil, fmt.Errorf("failed to create file store: %v", err)
}
defer fs.Close()

ctx := context.Background()

repo, err := remote.NewRepository(fmt.Sprintf("%s/%s/%s", f.url, f.group_name, f.state_artifact_name))
if err != nil {
return nil, fmt.Errorf("failed to create remote repository: %v", err)
}

// Setting up the authentication for the remote registry
repo.Client = &auth.Client{
Client: retry.DefaultClient,
Cache: auth.NewCache(),
Credential: auth.StaticCredential(
f.url,
auth.Credential{
Username: config.GetHarborUsername(),
Password: config.GetHarborPassword(),
},
),
}
// Copy from the remote repository to the file store
tag := "latest"
_, err = oras.Copy(ctx, repo, tag, fs, tag, oras.DefaultCopyOptions)
if err != nil {
return nil, fmt.Errorf("failed to copy from remote repository to file store: %v", err)
}
stateArtifactDir := filepath.Join(cwd, "state-artifact")

var state_reader StateReader
// Find the state artifact file in the state-artifact directory that is created temporarily
err = filepath.Walk(stateArtifactDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if filepath.Ext(info.Name()) == ".json" {
content, err := os.ReadFile(path)
if err != nil {
return err
}
state_reader, err = FromJSON(content, f.state_artifact_reader)
if err != nil {
return fmt.Errorf("failed to parse the state artifact file: %v", err)
}
return nil
}
return nil
})

if err != nil {
return nil, fmt.Errorf("failed to read the state artifact file: %v", err)
}
// Clean up everything inside the state-artifact folder
err = os.RemoveAll(stateArtifactDir)
if err != nil {
return nil, fmt.Errorf("failed to remove state-artifact directory: %v", err)
}
return state_reader, nil
}

// FromJSON parses the input JSON data into a StateArtifactReader
func FromJSON(data []byte, reg StateReader) (StateReader, error) {
if err := json.Unmarshal(data, &reg); err != nil {
fmt.Print("Error in unmarshalling")
return nil, err
}
// Validation
if reg.GetRegistryURL()== "" {
return nil, fmt.Errorf("registry URL is required")
}
return reg, nil
}
59 changes: 23 additions & 36 deletions internal/state/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,20 @@ type Replicator interface {
}

type BasicReplicator struct {
username string
password string
use_unsecure bool
zot_url string
state_reader StateReader
username string
password string
useUnsecure bool
zotURL string
stateReader StateReader
}

type ImageInfo struct {
Name string `json:"name"`
}

type Repository struct {
Repository string `json:"repository"`
Images []ImageInfo `json:"images"`
}

type RegistryInfo struct {
RegistryUrl string `json:"registryUrl"`
Repositories []Repository `json:"repositories"`
}

func BasicNewReplicator(state_reader StateReader) Replicator {
func NewBasicReplicator(state_reader StateReader) Replicator {
return &BasicReplicator{
username: config.GetHarborUsername(),
password: config.GetHarborPassword(),
use_unsecure: config.UseUnsecure(),
zot_url: config.GetZotURL(),
state_reader: state_reader,
username: config.GetHarborUsername(),
password: config.GetHarborPassword(),
useUnsecure: config.UseUnsecure(),
zotURL: config.GetZotURL(),
stateReader: state_reader,
}
}

Expand All @@ -57,30 +43,31 @@ func (r *BasicReplicator) Replicate(ctx context.Context) error {
})

options := []crane.Option{crane.WithAuth(auth)}
if r.use_unsecure {
if r.useUnsecure {
options = append(options, crane.Insecure)
}
source_registry := r.state_reader.GetRegistryURL()
for _, artifact := range r.state_reader.GetArtifacts() {
sourceRegistry := r.stateReader.GetRegistryURL()

for _, artifact := range r.stateReader.GetArtifacts() {
// Extract the image name from the repository of the artifact
repo, image, err := utils.GetRepositoryAndImageNameFromArtifact(artifact.GetRepository())
if err != nil {
log.Error().Msgf("Error getting repository and image name: %v", err)
return err
}
log.Info().Msgf("Pulling image %s from repository %s at registry %s", image, repo, source_registry)
log.Info().Msgf("Pulling image %s from repository %s at registry %s", image, repo, sourceRegistry)
// Pull the image at the given repository at the source registry
srcImage, err := crane.Pull(fmt.Sprintf("%s/%s/%s", source_registry, repo, image), options...)
_, err = crane.Pull(fmt.Sprintf("%s/%s/%s", sourceRegistry, repo, image), options...)
if err != nil {
logger.FromContext(ctx).Error().Msgf("Failed to pull image: %v", err)
log.Error().Msgf("Failed to pull image: %v", err)
return err
}
// Push the image to the local registry
err = crane.Push(srcImage, fmt.Sprintf("%s/%s", r.zot_url, image), options...)
if err != nil {
logger.FromContext(ctx).Error().Msgf("Failed to push image: %v", err)
return err
}
// err = crane.Push(srcImage, fmt.Sprintf("%s/%s", r.zotURL, image), options...)
// if err != nil {
// log.Error().Msgf("Failed to push image: %v", err)
// return err
// }
log.Info().Msgf("Image %s pushed successfully", image)
}
// Delete ./local-oci-layout directory
Expand Down
Loading

0 comments on commit 7b35466

Please sign in to comment.