Skip to content

Commit

Permalink
generate names (#7822)
Browse files Browse the repository at this point in the history
  • Loading branch information
nadavsteindler committed Jul 8, 2024
1 parent 1e4d502 commit 9af961a
Showing 1 changed file with 7 additions and 81 deletions.
88 changes: 7 additions & 81 deletions pkg/block/gs/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"io"
"net/http"
"net/url"
"sort"
"strings"
"time"

Expand All @@ -26,10 +25,7 @@ const (
)

var (
ErrMismatchPartETag = errors.New("mismatch part ETag")
ErrMismatchPartName = errors.New("mismatch part name")
ErrMaxMultipartObjects = errors.New("maximum multipart object reached")
ErrPartListMismatch = errors.New("multipart part list mismatch")
ErrMissingTargetAttrs = errors.New("missing target attributes")
)

Expand Down Expand Up @@ -502,7 +498,7 @@ func (a *Adapter) CompleteMultiPartUpload(ctx context.Context, obj block.ObjectP
"key": obj.Identifier,
})

parts, err := a.getPartNames(ctx, bucketName, uploadID, multipartList)
parts, err := a.getPartNames(bucketName, uploadID, multipartList)
if err != nil {
return nil, err
}
Expand All @@ -527,88 +523,18 @@ func (a *Adapter) CompleteMultiPartUpload(ctx context.Context, obj block.ObjectP
}, nil
}

func (a *Adapter) getPartNames(ctx context.Context, bucketName, uploadID string, multipartList *block.MultipartUploadCompletion) ([]string, error) {
// list bucket parts because maybe this is a retry and some have already been concatenated
bucketParts, err := a.listMultipartUploadParts(ctx, bucketName, uploadID)
if err != nil {
return nil, err
}
// validate bucketParts match the request multipartList
err = a.validateMultipartUploadParts(uploadID, multipartList, bucketParts)
if err != nil {
return nil, err
func (a *Adapter) getPartNames(bucketName, uploadID string, multipartList *block.MultipartUploadCompletion) ([]string, error) {
if len(multipartList.Part) > MaxMultipartObjects {
return nil, fmt.Errorf("listing bucket '%s' upload '%s': %w", bucketName, uploadID, ErrMaxMultipartObjects)
}

// prepare names
parts := make([]string, len(bucketParts))
for i, part := range bucketParts {
parts[i] = part.Name
parts := make([]string, len(multipartList.Part))
for i := 0; i < len(parts); i++ {
parts[i] = formatMultipartFilename(uploadID, i+1)
}
return parts, nil
}

func (a *Adapter) validateMultipartUploadParts(uploadID string, multipartList *block.MultipartUploadCompletion, bucketParts []*storage.ObjectAttrs) error {
if len(multipartList.Part) < len(bucketParts) {
return fmt.Errorf("part list mismatch - expected %d parts, got %d: %w", len(bucketParts), len(multipartList.Part), ErrPartListMismatch)
}

// build validation set of Name to ETag
validationSet := make(map[string]string)
for _, p := range multipartList.Part {
objName := formatMultipartFilename(uploadID, p.PartNumber)
validationSet[objName] = p.ETag
}

// validate no found items that aren't in the original parts set
for i, p := range bucketParts {
bucketPartName := p.Name
if bucketPartName[len(bucketPartName)-2] == '_' {
// retrying a previously composed file, remove the _1 at the end to validate
bucketPartName = bucketPartName[:len(bucketPartName)-2]
etag := validationSet[bucketPartName]
if etag == "" {
return fmt.Errorf("invalid composed part name: %s, position %d: %w", p.Name, i, ErrMismatchPartName)
}
} else {
etag := validationSet[bucketPartName]
if etag == "" {
return fmt.Errorf("invalid part name: %s, position %d: %w", p.Name, i, ErrMismatchPartName)
}
if etag != p.Etag {
return fmt.Errorf("invalid part ETag: %s, name: %s, position %d: %w", p.Etag, p.Name, i, ErrMismatchPartETag)
}
}
}
return nil
}

func (a *Adapter) listMultipartUploadParts(ctx context.Context, bucketName string, uploadID string) ([]*storage.ObjectAttrs, error) {
bucket := a.client.Bucket(bucketName)
var bucketParts []*storage.ObjectAttrs
it := bucket.Objects(ctx, &storage.Query{
Delimiter: delimiter,
Prefix: uploadID + partSuffix,
})
for {
attrs, err := it.Next()
if errors.Is(err, iterator.Done) {
break
}
if err != nil {
return nil, fmt.Errorf("listing bucket '%s' upload '%s': %w", bucketName, uploadID, err)
}
bucketParts = append(bucketParts, attrs)
if len(bucketParts) > MaxMultipartObjects {
return nil, fmt.Errorf("listing bucket '%s' upload '%s': %w", bucketName, uploadID, ErrMaxMultipartObjects)
}
}
// sort by name - assume natual sort order
sort.Slice(bucketParts, func(i, j int) bool {
return bucketParts[i].Name < bucketParts[j].Name
})
return bucketParts, nil
}

func (a *Adapter) composeMultipartUploadParts(ctx context.Context, bucketName string, uploadID string, parts []string) (*storage.ObjectAttrs, error) {
// compose target from all parts
bucket := a.client.Bucket(bucketName)
Expand Down

0 comments on commit 9af961a

Please sign in to comment.