Skip to content

Commit

Permalink
Merge pull request #276 from cdnjs/sven/concurent
Browse files Browse the repository at this point in the history
concurent upload to R2
  • Loading branch information
xtuc authored Feb 5, 2024
2 parents a7abca0 + 3a9d41b commit f72ff3b
Showing 1 changed file with 23 additions and 4 deletions.
27 changes: 23 additions & 4 deletions functions/r2-pump/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/http"
"os"
"path/filepath"
"strconv"
"time"

"github.com/cdnjs/tools/audit"
Expand All @@ -38,12 +39,19 @@ var (
// be 50/50 for gz and br. Unknown for woff2.
// Example: FILE_EXTENSION=gz
FILE_EXTENSION = os.Getenv("FILE_EXTENSION")

UPLOAD_CONCURENCY = os.Getenv("UPLOAD_CONCURENCY")
)

func Invoke(ctx context.Context, e gcp.GCSEvent) error {
sentry.Init()
defer sentry.PanicHandler()

uploadConcurency, err := strconv.Atoi(UPLOAD_CONCURENCY)
if err != nil {
return fmt.Errorf("could not read UPLOAD_CONCURENCY: %v", err)
}

pkgName := e.Metadata["package"].(string)
version := e.Metadata["version"].(string)
log.Printf("Invoke %s %s\n", pkgName, version)
Expand Down Expand Up @@ -75,8 +83,12 @@ func Invoke(ctx context.Context, e gcp.GCSEvent) error {
}

s3Client := s3.NewFromConfig(cfg)

keys := make([]string, 0)
uploadQueue := make(chan s3.PutObjectInput)

for w := 1; w <= uploadConcurency; w++ {
go uploadWorker(ctx, s3Client, uploadQueue)
}

onFile := func(name string, r io.Reader) error {
ext := filepath.Ext(name)
Expand All @@ -100,15 +112,14 @@ func Invoke(ctx context.Context, e gcp.GCSEvent) error {
Key: aws.String(key),
Metadata: meta,
}
if err := uploadFile(ctx, s3Client, &s3Object); err != nil {
return errors.Wrap(err, "failed to upload file")
}
uploadQueue <- s3Object
}
return nil
}
if err := gcp.Inflate(bytes.NewReader(archive), onFile); err != nil {
return fmt.Errorf("could not inflate archive: %s", err)
}
close(uploadQueue)

if len(keys) == 0 {
log.Printf("%s: no files to publish\n", pkgName)
Expand Down Expand Up @@ -151,3 +162,11 @@ func uploadFile(ctx context.Context, s3Client *s3.Client, obj *s3.PutObjectInput

return nil
}

func uploadWorker(ctx context.Context, s3Client *s3.Client, queue <-chan s3.PutObjectInput) {
for item := range queue {
if err := uploadFile(ctx, s3Client, &item); err != nil {
panic(errors.Wrap(err, "failed to upload file"))
}
}
}

0 comments on commit f72ff3b

Please sign in to comment.