Skip to content

Commit

Permalink
concurrent implementation (#7822)
Browse files Browse the repository at this point in the history
  • Loading branch information
nadavsteindler committed Jul 7, 2024
1 parent 11b9351 commit 1e4d502
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 26 deletions.
16 changes: 5 additions & 11 deletions pkg/block/gs/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,8 +612,7 @@ func (a *Adapter) listMultipartUploadParts(ctx context.Context, bucketName strin
func (a *Adapter) composeMultipartUploadParts(ctx context.Context, bucketName string, uploadID string, parts []string) (*storage.ObjectAttrs, error) {
// compose target from all parts
bucket := a.client.Bucket(bucketName)
var targetAttrs *storage.ObjectAttrs
err := ComposeAll(uploadID, parts, func(target string, parts []string) error {
targetAttrs, err := ComposeAll(uploadID, parts, func(target string, parts []string) (*storage.ObjectAttrs, error) {
objs := make([]*storage.ObjectHandle, len(parts))
for i := range parts {
h := storageObjectHandle{bucket.Object(parts[i])}
Expand All @@ -624,11 +623,9 @@ func (a *Adapter) composeMultipartUploadParts(ctx context.Context, bucketName st
composer := h.withWriteHandle(a).newComposer(a, objs...)
attrs, err := composer.Run(ctx)
if err != nil {
return err
}
if target == uploadID {
targetAttrs = attrs
return nil, err
}

// delete parts
for _, o := range objs {
if err := o.Delete(ctx); err != nil {
Expand All @@ -638,15 +635,12 @@ func (a *Adapter) composeMultipartUploadParts(ctx context.Context, bucketName st
}).Warn("Failed to delete multipart upload part while compose")
}
}
return nil
return attrs, nil
})
if err == nil && targetAttrs == nil {
return nil, ErrMissingTargetAttrs
}
if err != nil {
return nil, err
}
return targetAttrs, nil
return targetAttrs, err
}

func (a *Adapter) Close() error {
Expand Down
25 changes: 20 additions & 5 deletions pkg/block/gs/compose.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ package gs

import (
"fmt"
"sync"

"cloud.google.com/go/storage"
)

const MaxPartsInCompose = 32

type ComposeFunc func(target string, parts []string) error
type ComposeFunc func(target string, parts []string) (*storage.ObjectAttrs, error)

func ComposeAll(target string, parts []string, composeFunc ComposeFunc) error {
func ComposeAll(target string, parts []string, composeFunc ComposeFunc) (*storage.ObjectAttrs, error) {
var wg sync.WaitGroup
for layer := 1; len(parts) > MaxPartsInCompose; layer++ {
var nextParts []string
for i := 0; i < len(parts); i += MaxPartsInCompose {
Expand All @@ -21,13 +25,24 @@ func ComposeAll(target string, parts []string, composeFunc ComposeFunc) error {
nextParts = append(nextParts, chunk...)
} else {
targetName := fmt.Sprintf("%s_%d", chunk[0], layer)
if err := composeFunc(targetName, chunk); err != nil {
return err
}
wg.Add(1)
go composeChunkConcurrent(targetName, chunk, composeFunc, &wg)
nextParts = append(nextParts, targetName)
}
}
parts = nextParts
}
wg.Wait()

// no compose the chunks we made
return composeFunc(target, parts)
}

func composeChunkConcurrent(target string, parts []string, composeFunc ComposeFunc, wg *sync.WaitGroup) {
// ctx context.Context, a *Adapter, target string, parts []string, bucketName string, bucket *storage.BucketHandle, wg *sync.WaitGroup) {
_, err := composeFunc(target, parts)
if err != nil {
fmt.Println("Compose error: ", err)
}
wg.Done()
}
23 changes: 13 additions & 10 deletions pkg/block/gs/compose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package gs
import (
"fmt"
"strconv"
"sync"
"testing"

"cloud.google.com/go/storage"
)

func TestComposeAll(t *testing.T) {
Expand All @@ -18,32 +21,32 @@ func TestComposeAll(t *testing.T) {
}

// map to track
usedParts := make(map[string]struct{})
usedTargets := make(map[string]struct{})
var usedParts sync.Map
var usedTargets sync.Map

// compose
err := ComposeAll(targetFile, parts, func(target string, parts []string) error {
_, err := ComposeAll(targetFile, parts, func(target string, parts []string) (*storage.ObjectAttrs, error) {
for _, part := range parts {
if _, found := usedParts[part]; found {
if _, found := usedParts.Load(part); found {
t.Errorf("Part '%s' already composed", part)
}
usedParts[part] = struct{}{}
usedParts.Store(part, struct{}{})
}
if _, found := usedTargets[target]; found {
if _, found := usedTargets.Load(target); found {
t.Errorf("Target '%s' already composed with %s", target, parts)
}
usedTargets[target] = struct{}{}
return nil
usedTargets.Store(target, struct{}{})
return nil, nil
})
if err != nil {
t.Fatal(err)
}
for _, part := range parts {
if _, ok := usedParts[part]; !ok {
if _, ok := usedParts.Load(part); !ok {
t.Error("Missing part:", part)
}
}
if _, ok := usedTargets[targetFile]; !ok {
if _, ok := usedTargets.Load(targetFile); !ok {
t.Error("Missing target:", targetFile)
}
})
Expand Down

0 comments on commit 1e4d502

Please sign in to comment.