From c2e44a0431020007c9c70843d4eb011606399f91 Mon Sep 17 00:00:00 2001 From: "Ariel Shaqed (Scolnicov)" Date: Wed, 30 Oct 2024 17:31:49 +0200 Subject: [PATCH] Use time of create MPU on backend storage as time of MPU (#8311) * Use time of create MPU on backend storage as time of MPU This time is either reported by the block adapter during creation, or it can be read by stat'ting the underlying object after complete-MPU. Use the latter method on S3. * [golangci-lint] fix repeated assignment * [bug] S3 gateway protocol reports time with 1-second precision Bump timeSlippage accordingly. * [bug] In Esti check that mtimes match on lakeFS and underlying store Azure and GCS do _not_ use the time of create-MPU. The important thing is that the two share the same time. Anything else confuses not only people but also programs. For instance DataBricks with presigned URLs can get confused by getting one time from lakeFS and another from underlying storage. * [CR] Sleep only on S3 in multipart test, and fix some comments --- esti/multipart_test.go | 38 +++++++++++++++++++++++ esti/system_test.go | 15 +++++++-- pkg/api/controller.go | 6 ++++ pkg/block/adapter.go | 9 ++++-- pkg/block/azure/multipart_block_writer.go | 1 + pkg/block/gs/adapter.go | 1 + pkg/block/s3/adapter.go | 1 + pkg/gateway/operations/operation_utils.go | 9 ++++-- pkg/gateway/operations/postobject.go | 2 +- pkg/gateway/operations/putobject.go | 2 +- 10 files changed, 74 insertions(+), 10 deletions(-) diff --git a/esti/multipart_test.go b/esti/multipart_test.go index d4cfbde85b8..62a375f5975 100644 --- a/esti/multipart_test.go +++ b/esti/multipart_test.go @@ -6,6 +6,7 @@ import ( "net/http" "sync" "testing" + "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -14,6 +15,7 @@ import ( "github.com/stretchr/testify/require" "github.com/thanhpk/randstr" "github.com/treeverse/lakefs/pkg/api/apigen" + "github.com/treeverse/lakefs/pkg/block" "github.com/treeverse/lakefs/pkg/logging" ) @@ -23,6 +25,13 @@ const ( ) func TestMultipartUpload(t *testing.T) { + // timeResolution is a duration greater than the timestamp resolution of the backing + // store. Multipart object on S3 is the time of create-MPU, waiting before completion + // ensures that lakeFS did not use the current time. For other blockstores MPU + // completion time is used, meaning it will be hard to detect if the underlying and + // lakeFS objects share the same time. + const timeResolution = time.Second + ctx, logger, repo := setupTest(t) defer tearDownTest(repo) file := "multipart_file" @@ -44,6 +53,13 @@ func TestMultipartUpload(t *testing.T) { } completedParts := uploadMultipartParts(t, ctx, logger, resp, parts, 0) + + if isBlockstoreType(block.BlockstoreTypeS3) == nil { + // Object should have Last-Modified time at around time of MPU creation. Ensure + // lakeFS fails the test if it fakes it by using the current time. + time.Sleep(2 * timeResolution) + } + completeResponse, err := uploadMultipartComplete(ctx, svc, resp, completedParts) require.NoError(t, err, "failed to complete multipart upload") @@ -55,6 +71,28 @@ func TestMultipartUpload(t *testing.T) { if !bytes.Equal(partsConcat, getResp.Body) { t.Fatalf("uploaded object did not match") } + + statResp, err := client.StatObjectWithResponse(ctx, repo, mainBranch, &apigen.StatObjectParams{Path: file, Presign: aws.Bool(true)}) + require.NoError(t, err, "failed to get object") + require.Equal(t, http.StatusOK, getResp.StatusCode(), getResp.Status()) + + // Get last-modified from the underlying store. + + presignedGetURL := statResp.JSON200.PhysicalAddress + res, err := http.Get(presignedGetURL) + require.NoError(t, err, "GET underlying") + // The presigned URL is usable only for GET, but we don't actually care about its body. + _ = res.Body.Close() + require.Equal(t, http.StatusOK, res.StatusCode, "%s: %s", presignedGetURL, res.Status) + lastModifiedString := res.Header.Get("Last-Modified") + underlyingLastModified, err := time.Parse(time.RFC1123, lastModifiedString) + require.NoError(t, err, "Last-Modified %s", lastModifiedString) + // Last-Modified header includes a timezone, which is typically "GMT" on AWS. Now GMT + // _is equal to_ UTC!. But Go is nothing if not cautious, and considers UTC and GMT to + // be different timezones. So cannot compare with "==" and must use time.Time.Equal. + lakeFSMTime := time.Unix(statResp.JSON200.Mtime, 0) + require.True(t, lakeFSMTime.Equal(underlyingLastModified), + "lakeFS mtime %s should be same as on underlying object %s", lakeFSMTime, underlyingLastModified) } func TestMultipartUploadAbort(t *testing.T) { diff --git a/esti/system_test.go b/esti/system_test.go index 8ceb57c855f..acb79a9a5bd 100644 --- a/esti/system_test.go +++ b/esti/system_test.go @@ -337,11 +337,20 @@ func listRepositories(t *testing.T, ctx context.Context) []apigen.Repository { return listedRepos } +// isBlockstoreType returns nil if the blockstore type is one of requiredTypes, or the actual +// type of the blockstore. +func isBlockstoreType(requiredTypes ...string) *string { + blockstoreType := viper.GetString(config.BlockstoreTypeKey) + if slices.Contains(requiredTypes, blockstoreType) { + return nil + } + return &blockstoreType +} + // requireBlockstoreType Skips test if blockstore type doesn't match the required type func requireBlockstoreType(t testing.TB, requiredTypes ...string) { - blockstoreType := viper.GetString(config.BlockstoreTypeKey) - if !slices.Contains(requiredTypes, blockstoreType) { - t.Skipf("Required blockstore types: %v, got: %s", requiredTypes, blockstoreType) + if blockstoreType := isBlockstoreType(requiredTypes...); blockstoreType != nil { + t.Skipf("Required blockstore types: %v, got: %s", requiredTypes, *blockstoreType) } } diff --git a/pkg/api/controller.go b/pkg/api/controller.go index c39b304e07a..9e29817418f 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -384,6 +384,12 @@ func (c *Controller) CompletePresignMultipartUpload(w http.ResponseWriter, r *ht writeTime := time.Now() checksum := httputil.StripQuotesAndSpaces(mpuResp.ETag) + // Anything else can be _really_ wrong when the storage layer assigns the time of MPU + // creation. For instance, the S3 block adapter makes sure to return an MTime from + // headObject to ensure that we do have a time here. + if mpuResp.MTime != nil { + writeTime = *mpuResp.MTime + } entryBuilder := catalog.NewDBEntryBuilder(). CommonLevel(false). Path(params.Path). diff --git a/pkg/block/adapter.go b/pkg/block/adapter.go index 684c24d9773..4945c8ac517 100644 --- a/pkg/block/adapter.go +++ b/pkg/block/adapter.go @@ -96,10 +96,13 @@ type CreateMultiPartUploadResponse struct { ServerSideHeader http.Header } -// CompleteMultiPartUploadResponse complete multipart etag, content length and additional headers (implementation specific) currently it targets s3. -// The ETag is a hex string value of the content checksum +// CompleteMultiPartUploadResponse complete multipart etag, content length and additional headers (implementation specific). type CompleteMultiPartUploadResponse struct { - ETag string + // ETag is a hex string value of the content checksum + ETag string + // MTime, if non-nil, is the creation time of the resulting object. Typically the + // object store returns it on a Last-Modified header from some operations. + MTime *time.Time ContentLength int64 ServerSideHeader http.Header } diff --git a/pkg/block/azure/multipart_block_writer.go b/pkg/block/azure/multipart_block_writer.go index e82b1f0dec4..72248c0a4a6 100644 --- a/pkg/block/azure/multipart_block_writer.go +++ b/pkg/block/azure/multipart_block_writer.go @@ -106,6 +106,7 @@ func completeMultipart(ctx context.Context, parts []block.MultipartPart, contain etag := string(*res.ETag) return &block.CompleteMultiPartUploadResponse{ ETag: etag, + MTime: res.LastModified, ContentLength: size, }, nil } diff --git a/pkg/block/gs/adapter.go b/pkg/block/gs/adapter.go index 43328cddeb4..704e8922658 100644 --- a/pkg/block/gs/adapter.go +++ b/pkg/block/gs/adapter.go @@ -524,6 +524,7 @@ func (a *Adapter) CompleteMultiPartUpload(ctx context.Context, obj block.ObjectP lg.Debug("completed multipart upload") return &block.CompleteMultiPartUploadResponse{ ETag: targetAttrs.Etag, + MTime: &targetAttrs.Created, ContentLength: targetAttrs.Size, }, nil } diff --git a/pkg/block/s3/adapter.go b/pkg/block/s3/adapter.go index 7704d6a33ff..d9c12bbd24f 100644 --- a/pkg/block/s3/adapter.go +++ b/pkg/block/s3/adapter.go @@ -779,6 +779,7 @@ func (a *Adapter) CompleteMultiPartUpload(ctx context.Context, obj block.ObjectP etag := strings.Trim(aws.ToString(resp.ETag), `"`) return &block.CompleteMultiPartUploadResponse{ ETag: etag, + MTime: headResp.LastModified, ContentLength: aws.ToInt64(headResp.ContentLength), ServerSideHeader: extractSSHeaderCompleteMultipartUpload(resp), }, nil diff --git a/pkg/gateway/operations/operation_utils.go b/pkg/gateway/operations/operation_utils.go index 94fccb116ed..c9ae38c3fc9 100644 --- a/pkg/gateway/operations/operation_utils.go +++ b/pkg/gateway/operations/operation_utils.go @@ -40,9 +40,14 @@ func shouldReplaceMetadata(req *http.Request) bool { return req.Header.Get(amzMetadataDirectiveHeaderPrefix) == "REPLACE" } -func (o *PathOperation) finishUpload(req *http.Request, checksum, physicalAddress string, size int64, relative bool, metadata map[string]string, contentType string) error { +func (o *PathOperation) finishUpload(req *http.Request, mTime *time.Time, checksum, physicalAddress string, size int64, relative bool, metadata map[string]string, contentType string) error { + var writeTime time.Time + if mTime == nil { + writeTime = time.Now() + } else { + writeTime = *mTime + } // write metadata - writeTime := time.Now() entry := catalog.NewDBEntryBuilder(). Path(o.Path). RelativeAddress(relative). diff --git a/pkg/gateway/operations/postobject.go b/pkg/gateway/operations/postobject.go index 3cdda78258a..984512ca551 100644 --- a/pkg/gateway/operations/postobject.go +++ b/pkg/gateway/operations/postobject.go @@ -124,7 +124,7 @@ func (controller *PostObject) HandleCompleteMultipartUpload(w http.ResponseWrite return } checksum := strings.Split(resp.ETag, "-")[0] - err = o.finishUpload(req, checksum, objName, resp.ContentLength, true, multiPart.Metadata, multiPart.ContentType) + err = o.finishUpload(req, resp.MTime, checksum, objName, resp.ContentLength, true, multiPart.Metadata, multiPart.ContentType) if errors.Is(err, graveler.ErrWriteToProtectedBranch) { _ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrWriteToProtectedBranch)) return diff --git a/pkg/gateway/operations/putobject.go b/pkg/gateway/operations/putobject.go index 4fe5be16002..f54f601ead2 100644 --- a/pkg/gateway/operations/putobject.go +++ b/pkg/gateway/operations/putobject.go @@ -309,7 +309,7 @@ func handlePut(w http.ResponseWriter, req *http.Request, o *PathOperation) { // write metadata metadata := amzMetaAsMetadata(req) contentType := req.Header.Get("Content-Type") - err = o.finishUpload(req, blob.Checksum, blob.PhysicalAddress, blob.Size, true, metadata, contentType) + err = o.finishUpload(req, nil, blob.Checksum, blob.PhysicalAddress, blob.Size, true, metadata, contentType) if errors.Is(err, graveler.ErrWriteToProtectedBranch) { _ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrWriteToProtectedBranch)) return