Skip to content

Commit

Permalink
Revert "concurent upload to R2"
Browse files Browse the repository at this point in the history
This reverts commit 3a9d41b.
  • Loading branch information
xtuc committed Feb 7, 2024
1 parent 44c4160 commit f3bdfb4
Showing 1 changed file with 4 additions and 23 deletions.
27 changes: 4 additions & 23 deletions functions/r2-pump/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"net/http"
"os"
"path/filepath"
"strconv"
"time"

"github.com/cdnjs/tools/audit"
Expand All @@ -39,19 +38,12 @@ var (
// be 50/50 for gz and br. Unknown for woff2.
// Example: FILE_EXTENSION=gz
FILE_EXTENSION = os.Getenv("FILE_EXTENSION")

UPLOAD_CONCURENCY = os.Getenv("UPLOAD_CONCURENCY")
)

func Invoke(ctx context.Context, e gcp.GCSEvent) error {
sentry.Init()
defer sentry.PanicHandler()

uploadConcurency, err := strconv.Atoi(UPLOAD_CONCURENCY)
if err != nil {
return fmt.Errorf("could not read UPLOAD_CONCURENCY: %v", err)
}

pkgName := e.Metadata["package"].(string)
version := e.Metadata["version"].(string)
log.Printf("Invoke %s %s\n", pkgName, version)
Expand Down Expand Up @@ -83,12 +75,8 @@ func Invoke(ctx context.Context, e gcp.GCSEvent) error {
}

s3Client := s3.NewFromConfig(cfg)
keys := make([]string, 0)
uploadQueue := make(chan s3.PutObjectInput)

for w := 1; w <= uploadConcurency; w++ {
go uploadWorker(ctx, s3Client, uploadQueue)
}
keys := make([]string, 0)

onFile := func(name string, r io.Reader) error {
ext := filepath.Ext(name)
Expand All @@ -112,14 +100,15 @@ func Invoke(ctx context.Context, e gcp.GCSEvent) error {
Key: aws.String(key),
Metadata: meta,
}
uploadQueue <- s3Object
if err := uploadFile(ctx, s3Client, &s3Object); err != nil {
return errors.Wrap(err, "failed to upload file")
}
}
return nil
}
if err := gcp.Inflate(bytes.NewReader(archive), onFile); err != nil {
return fmt.Errorf("could not inflate archive: %s", err)
}
close(uploadQueue)

if len(keys) == 0 {
log.Printf("%s: no files to publish\n", pkgName)
Expand Down Expand Up @@ -162,11 +151,3 @@ func uploadFile(ctx context.Context, s3Client *s3.Client, obj *s3.PutObjectInput

return nil
}

func uploadWorker(ctx context.Context, s3Client *s3.Client, queue <-chan s3.PutObjectInput) {
for item := range queue {
if err := uploadFile(ctx, s3Client, &item); err != nil {
panic(errors.Wrap(err, "failed to upload file"))
}
}
}

0 comments on commit f3bdfb4

Please sign in to comment.