Skip to content

Commit

Permalink
feat: add concurrency support for pull and push operations
Browse files Browse the repository at this point in the history
Signed-off-by: chlins <[email protected]>
  • Loading branch information
chlins committed Dec 17, 2024
1 parent 44c0c19 commit dc09343
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 33 deletions.
5 changes: 2 additions & 3 deletions cmd/login.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,8 @@ func runLogin(ctx context.Context, registry string) error {

fmt.Println("\nLogging In...")

opts := []backend.Option{}
if loginConfig.PlainHTTP {
opts = append(opts, backend.WithPlainHTTP())
opts := []backend.Option{
backend.WithPlainHTTP(loginConfig.PlainHTTP),
}

if err := b.Login(ctx, registry, loginConfig.Username, loginConfig.Password, opts...); err != nil {
Expand Down
12 changes: 9 additions & 3 deletions cmd/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,18 @@ var pullCmd = &cobra.Command{
SilenceUsage: true,
FParseErrWhitelist: cobra.FParseErrWhitelist{UnknownFlags: true},
RunE: func(cmd *cobra.Command, args []string) error {
if err := pullConfig.Validate(); err != nil {
return err
}

return runPull(context.Background(), args[0])
},
}

// init initializes pull command.
func init() {
flags := pullCmd.Flags()
flags.IntVar(&pullConfig.Concurrency, "concurrency", pullConfig.Concurrency, "specify the number of concurrent pull operations (default: 3)")
flags.BoolVar(&pullConfig.PlainHTTP, "plain-http", false, "use plain HTTP instead of HTTPS")
flags.BoolVar(&pullConfig.Insecure, "insecure", false, "use insecure connection for the pull operation and skip TLS verification")
flags.StringVar(&pullConfig.Proxy, "proxy", "", "use proxy for the pull operation")
Expand All @@ -66,9 +71,10 @@ func runPull(ctx context.Context, target string) error {
return fmt.Errorf("target is required")
}

opts := []backend.Option{backend.WithInsecure(pullConfig.Insecure)}
if pullConfig.PlainHTTP {
opts = append(opts, backend.WithPlainHTTP())
opts := []backend.Option{
backend.WithInsecure(pullConfig.Insecure),
backend.WithPlainHTTP(pullConfig.PlainHTTP),
backend.WithConcurrency(pullConfig.Concurrency),
}

if pullConfig.Proxy != "" {
Expand Down
11 changes: 8 additions & 3 deletions cmd/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,18 @@ var pushCmd = &cobra.Command{
SilenceUsage: true,
FParseErrWhitelist: cobra.FParseErrWhitelist{UnknownFlags: true},
RunE: func(cmd *cobra.Command, args []string) error {
if err := pushConfig.Validate(); err != nil {
return err
}

return runPush(context.Background(), args[0])
},
}

// init initializes push command.
func init() {
flags := pushCmd.Flags()
flags.IntVar(&pushConfig.Concurrency, "concurrency", pushConfig.Concurrency, "specify the number of concurrent push operations (default: 3)")
flags.BoolVar(&pushConfig.PlainHTTP, "plain-http", false, "use plain HTTP instead of HTTPS")

if err := viper.BindPFlags(flags); err != nil {
Expand All @@ -59,9 +64,9 @@ func runPush(ctx context.Context, target string) error {
return err
}

opts := []backend.Option{}
if pushConfig.PlainHTTP {
opts = append(opts, backend.WithPlainHTTP())
opts := []backend.Option{
backend.WithPlainHTTP(pushConfig.PlainHTTP),
backend.WithConcurrency(pushConfig.Concurrency),
}

if err := b.Push(ctx, target, opts...); err != nil {
Expand Down
20 changes: 14 additions & 6 deletions pkg/backend/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,24 @@ package backend
type Option func(*Options)

type Options struct {
plainHTTP bool
proxy string
insecure bool
output string
concurrency int
plainHTTP bool
proxy string
insecure bool
output string
}

// WithConcurrency sets the concurrency option.
func WithConcurrency(concurrency int) Option {
return func(opts *Options) {
opts.concurrency = concurrency
}
}

// WithPlainHTTP sets the plain HTTP option.
func WithPlainHTTP() Option {
func WithPlainHTTP(plainHTTP bool) Option {
return func(opts *Options) {
opts.plainHTTP = true
opts.plainHTTP = plainHTTP
}
}

Expand Down
12 changes: 8 additions & 4 deletions pkg/backend/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"net/url"

"github.com/CloudNativeAI/modctl/pkg/storage"
"golang.org/x/sync/errgroup"

ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/registry/remote"
Expand Down Expand Up @@ -110,12 +111,15 @@ func (b *backend) Pull(ctx context.Context, target string, opts ...Option) error
// note: the order is important, manifest should be pushed at last.

// copy the layers.
// TODO: parallelize the layer copy.
dst := b.store
g := &errgroup.Group{}
g.SetLimit(options.concurrency)
for _, layer := range manifest.Layers {
if err := pullIfNotExist(ctx, pb, promptCopyingBlob, src, dst, layer, repo, tag); err != nil {
return fmt.Errorf("failed to pull blob to local: %w", err)
}
g.Go(func() error { return pullIfNotExist(ctx, pb, promptCopyingBlob, src, dst, layer, repo, tag) })
}

if err := g.Wait(); err != nil {
return fmt.Errorf("failed to pull blob to local: %w", err)
}

// copy the config.
Expand Down
12 changes: 8 additions & 4 deletions pkg/backend/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"

"github.com/CloudNativeAI/modctl/pkg/storage"
"golang.org/x/sync/errgroup"

godigest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
Expand Down Expand Up @@ -91,11 +92,14 @@ func (b *backend) Push(ctx context.Context, target string, opts ...Option) error
// note: the order is important, manifest should be pushed at last.

// copy the layers.
// TODO: parallelize the layer copy.
g := &errgroup.Group{}
g.SetLimit(options.concurrency)
for _, layer := range manifest.Layers {
if err := pushIfNotExist(ctx, pb, promptCopyingBlob, src, dst, layer, repo, tag); err != nil {
return fmt.Errorf("failed to push blob to remote: %w", err)
}
g.Go(func() error { return pushIfNotExist(ctx, pb, promptCopyingBlob, src, dst, layer, repo, tag) })
}

if err := g.Wait(); err != nil {
return fmt.Errorf("failed to push blob to remote: %w", err)
}

// copy the config.
Expand Down
33 changes: 25 additions & 8 deletions pkg/config/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,35 @@

package config

import "fmt"

const (
// defaultPullConcurrency is the default number of concurrent pull operations.
defaultPullConcurrency = 3
)

type Pull struct {
PlainHTTP bool
Proxy string
Insecure bool
ExtractDir string
Concurrency int
PlainHTTP bool
Proxy string
Insecure bool
ExtractDir string
}

func NewPull() *Pull {
return &Pull{
PlainHTTP: false,
Proxy: "",
Insecure: false,
ExtractDir: "",
Concurrency: defaultPullConcurrency,
PlainHTTP: false,
Proxy: "",
Insecure: false,
ExtractDir: "",
}
}

func (p *Pull) Validate() error {
if p.Concurrency < 1 {
return fmt.Errorf("invalid concurrency: %d", p.Concurrency)
}

return nil
}
21 changes: 19 additions & 2 deletions pkg/config/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,29 @@

package config

import "fmt"

const (
// defaultPushConcurrency is the default number of concurrent push operations.
defaultPushConcurrency = 3
)

type Push struct {
PlainHTTP bool
Concurrency int
PlainHTTP bool
}

func NewPush() *Pull {
return &Pull{
PlainHTTP: false,
Concurrency: defaultPushConcurrency,
PlainHTTP: false,
}
}

func (p *Push) Validate() error {
if p.Concurrency < 1 {
return fmt.Errorf("invalid concurrency: %d", p.Concurrency)
}

return nil
}

0 comments on commit dc09343

Please sign in to comment.