Skip to content

Commit

Permalink
Use time of create MPU on backend storage as time of MPU (#8311)
Browse files Browse the repository at this point in the history
* Use time of create MPU on backend storage as time of MPU

This time is either reported by the block adapter during creation, or it can
be read by stat'ting the underlying object after complete-MPU.  Use the
latter method on S3.

* [golangci-lint] fix repeated assignment

* [bug] S3 gateway protocol reports time with 1-second precision

Bump timeSlippage accordingly.

* [bug] In Esti check that mtimes match on lakeFS and underlying store

Azure and GCS do _not_ use the time of create-MPU.  The important thing is
that the two share the same time.

Anything else confuses not only people but also programs.  For instance
DataBricks with presigned URLs can get confused by getting one time from
lakeFS and another from underlying storage.

* [CR] Sleep only on S3 in multipart test, and fix some comments
  • Loading branch information
arielshaqed authored Oct 30, 2024
1 parent 529ffc3 commit c2e44a0
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 10 deletions.
38 changes: 38 additions & 0 deletions esti/multipart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"sync"
"testing"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/thanhpk/randstr"
"github.com/treeverse/lakefs/pkg/api/apigen"
"github.com/treeverse/lakefs/pkg/block"
"github.com/treeverse/lakefs/pkg/logging"
)

Expand All @@ -23,6 +25,13 @@ const (
)

func TestMultipartUpload(t *testing.T) {
// timeResolution is a duration greater than the timestamp resolution of the backing
// store. Multipart object on S3 is the time of create-MPU, waiting before completion
// ensures that lakeFS did not use the current time. For other blockstores MPU
// completion time is used, meaning it will be hard to detect if the underlying and
// lakeFS objects share the same time.
const timeResolution = time.Second

ctx, logger, repo := setupTest(t)
defer tearDownTest(repo)
file := "multipart_file"
Expand All @@ -44,6 +53,13 @@ func TestMultipartUpload(t *testing.T) {
}

completedParts := uploadMultipartParts(t, ctx, logger, resp, parts, 0)

if isBlockstoreType(block.BlockstoreTypeS3) == nil {
// Object should have Last-Modified time at around time of MPU creation. Ensure
// lakeFS fails the test if it fakes it by using the current time.
time.Sleep(2 * timeResolution)
}

completeResponse, err := uploadMultipartComplete(ctx, svc, resp, completedParts)
require.NoError(t, err, "failed to complete multipart upload")

Expand All @@ -55,6 +71,28 @@ func TestMultipartUpload(t *testing.T) {
if !bytes.Equal(partsConcat, getResp.Body) {
t.Fatalf("uploaded object did not match")
}

statResp, err := client.StatObjectWithResponse(ctx, repo, mainBranch, &apigen.StatObjectParams{Path: file, Presign: aws.Bool(true)})
require.NoError(t, err, "failed to get object")
require.Equal(t, http.StatusOK, getResp.StatusCode(), getResp.Status())

// Get last-modified from the underlying store.

presignedGetURL := statResp.JSON200.PhysicalAddress
res, err := http.Get(presignedGetURL)
require.NoError(t, err, "GET underlying")
// The presigned URL is usable only for GET, but we don't actually care about its body.
_ = res.Body.Close()
require.Equal(t, http.StatusOK, res.StatusCode, "%s: %s", presignedGetURL, res.Status)
lastModifiedString := res.Header.Get("Last-Modified")
underlyingLastModified, err := time.Parse(time.RFC1123, lastModifiedString)
require.NoError(t, err, "Last-Modified %s", lastModifiedString)
// Last-Modified header includes a timezone, which is typically "GMT" on AWS. Now GMT
// _is equal to_ UTC!. But Go is nothing if not cautious, and considers UTC and GMT to
// be different timezones. So cannot compare with "==" and must use time.Time.Equal.
lakeFSMTime := time.Unix(statResp.JSON200.Mtime, 0)
require.True(t, lakeFSMTime.Equal(underlyingLastModified),
"lakeFS mtime %s should be same as on underlying object %s", lakeFSMTime, underlyingLastModified)
}

func TestMultipartUploadAbort(t *testing.T) {
Expand Down
15 changes: 12 additions & 3 deletions esti/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,11 +337,20 @@ func listRepositories(t *testing.T, ctx context.Context) []apigen.Repository {
return listedRepos
}

// isBlockstoreType returns nil if the blockstore type is one of requiredTypes, or the actual
// type of the blockstore.
func isBlockstoreType(requiredTypes ...string) *string {
blockstoreType := viper.GetString(config.BlockstoreTypeKey)
if slices.Contains(requiredTypes, blockstoreType) {
return nil
}
return &blockstoreType
}

// requireBlockstoreType Skips test if blockstore type doesn't match the required type
func requireBlockstoreType(t testing.TB, requiredTypes ...string) {
blockstoreType := viper.GetString(config.BlockstoreTypeKey)
if !slices.Contains(requiredTypes, blockstoreType) {
t.Skipf("Required blockstore types: %v, got: %s", requiredTypes, blockstoreType)
if blockstoreType := isBlockstoreType(requiredTypes...); blockstoreType != nil {
t.Skipf("Required blockstore types: %v, got: %s", requiredTypes, *blockstoreType)
}
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,12 @@ func (c *Controller) CompletePresignMultipartUpload(w http.ResponseWriter, r *ht

writeTime := time.Now()
checksum := httputil.StripQuotesAndSpaces(mpuResp.ETag)
// Anything else can be _really_ wrong when the storage layer assigns the time of MPU
// creation. For instance, the S3 block adapter makes sure to return an MTime from
// headObject to ensure that we do have a time here.
if mpuResp.MTime != nil {
writeTime = *mpuResp.MTime
}
entryBuilder := catalog.NewDBEntryBuilder().
CommonLevel(false).
Path(params.Path).
Expand Down
9 changes: 6 additions & 3 deletions pkg/block/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,13 @@ type CreateMultiPartUploadResponse struct {
ServerSideHeader http.Header
}

// CompleteMultiPartUploadResponse complete multipart etag, content length and additional headers (implementation specific) currently it targets s3.
// The ETag is a hex string value of the content checksum
// CompleteMultiPartUploadResponse complete multipart etag, content length and additional headers (implementation specific).
type CompleteMultiPartUploadResponse struct {
ETag string
// ETag is a hex string value of the content checksum
ETag string
// MTime, if non-nil, is the creation time of the resulting object. Typically the
// object store returns it on a Last-Modified header from some operations.
MTime *time.Time
ContentLength int64
ServerSideHeader http.Header
}
Expand Down
1 change: 1 addition & 0 deletions pkg/block/azure/multipart_block_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func completeMultipart(ctx context.Context, parts []block.MultipartPart, contain
etag := string(*res.ETag)
return &block.CompleteMultiPartUploadResponse{
ETag: etag,
MTime: res.LastModified,
ContentLength: size,
}, nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/block/gs/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ func (a *Adapter) CompleteMultiPartUpload(ctx context.Context, obj block.ObjectP
lg.Debug("completed multipart upload")
return &block.CompleteMultiPartUploadResponse{
ETag: targetAttrs.Etag,
MTime: &targetAttrs.Created,
ContentLength: targetAttrs.Size,
}, nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/block/s3/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ func (a *Adapter) CompleteMultiPartUpload(ctx context.Context, obj block.ObjectP
etag := strings.Trim(aws.ToString(resp.ETag), `"`)
return &block.CompleteMultiPartUploadResponse{
ETag: etag,
MTime: headResp.LastModified,
ContentLength: aws.ToInt64(headResp.ContentLength),
ServerSideHeader: extractSSHeaderCompleteMultipartUpload(resp),
}, nil
Expand Down
9 changes: 7 additions & 2 deletions pkg/gateway/operations/operation_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,14 @@ func shouldReplaceMetadata(req *http.Request) bool {
return req.Header.Get(amzMetadataDirectiveHeaderPrefix) == "REPLACE"
}

func (o *PathOperation) finishUpload(req *http.Request, checksum, physicalAddress string, size int64, relative bool, metadata map[string]string, contentType string) error {
func (o *PathOperation) finishUpload(req *http.Request, mTime *time.Time, checksum, physicalAddress string, size int64, relative bool, metadata map[string]string, contentType string) error {
var writeTime time.Time
if mTime == nil {
writeTime = time.Now()
} else {
writeTime = *mTime
}
// write metadata
writeTime := time.Now()
entry := catalog.NewDBEntryBuilder().
Path(o.Path).
RelativeAddress(relative).
Expand Down
2 changes: 1 addition & 1 deletion pkg/gateway/operations/postobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (controller *PostObject) HandleCompleteMultipartUpload(w http.ResponseWrite
return
}
checksum := strings.Split(resp.ETag, "-")[0]
err = o.finishUpload(req, checksum, objName, resp.ContentLength, true, multiPart.Metadata, multiPart.ContentType)
err = o.finishUpload(req, resp.MTime, checksum, objName, resp.ContentLength, true, multiPart.Metadata, multiPart.ContentType)
if errors.Is(err, graveler.ErrWriteToProtectedBranch) {
_ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrWriteToProtectedBranch))
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/gateway/operations/putobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func handlePut(w http.ResponseWriter, req *http.Request, o *PathOperation) {
// write metadata
metadata := amzMetaAsMetadata(req)
contentType := req.Header.Get("Content-Type")
err = o.finishUpload(req, blob.Checksum, blob.PhysicalAddress, blob.Size, true, metadata, contentType)
err = o.finishUpload(req, nil, blob.Checksum, blob.PhysicalAddress, blob.Size, true, metadata, contentType)
if errors.Is(err, graveler.ErrWriteToProtectedBranch) {
_ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrWriteToProtectedBranch))
return
Expand Down

0 comments on commit c2e44a0

Please sign in to comment.